1、实现步骤
1:jdk1.8
2:构建一个maven工程
3:导入rabbitmq的maven依赖
4:启动rabbitmq-server服务
5:定义生产者
6:定义消费者
7:观察消息的在rabbitmq-server服务中的过程
2、构建一个maven工程

3、导入rabbitmq的maven依赖
3.1、Java原生依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.10.0</version></dependency>
3.2、spring依赖
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-amqp</artifactId><version>2.2.5.RELEASE</version></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId><version>2.2.5.RELEASE</version></dependency>
3.3、springboot依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
上面根据自己的项目环境进行选择即可。
:::info 番外:rabbitmq和spring同属一个公司开放的产品,所以他们的支持也是非常完善,这也是为什么推荐使用rabbitmq的一个原因。 :::
4、启动rabbitmq-server服务
systemctl start rabbitmq-server或者docker start myrabbit
5、定义生产者
package com.theory.simple;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) {// 所有的中间件技术都是支持基于tcp/ip协议基础之上构建新型的协议规范,只不过RabbitMQ遵循的是amqp// IP port// 1. 创建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2. 设置连接属性connectionFactory.setHost("192.168.222.111"); // ip地址connectionFactory.setPort(5672); // 端口号connectionFactory.setUsername("admin"); // 账号connectionFactory.setPassword("admin"); // 密码connectionFactory.setVirtualHost("/"); //路由地址Connection connection = null;Channel channel = null;try {// 3. 从连接工厂中获取连接 Connectionconnection = connectionFactory.newConnection("生产者");// 4. 通过连接获取通道Channelchannel = connection.createChannel();// 5. 通过通道创建交换机,声明队列,绑定关系,如有key,发送消息,接收消息String queueName = "queue1";/*** @params1: 队列的名称,如果队列不存在,则会创建* @params2: 是否要持久化 durable=false所谓的持久化消息就是是否存盘,如果为false即非持久化,true是持久化 非持久化也会存盘,但是重启服务器会丢失* @params3: 排他性,是否是独占队列,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭* @params4: 是否自动删除,随着消费者消息完毕消息以后是否把队列自动删除* @params5: 携带附属参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。*/channel.queueDeclare(queueName,false,false,false,null);// 6. 准备发送消息内容String message = "Hello theory";// 7. 发送消息给队列queue/*** @params1: 交换机exchange* @params2: 队列、路由key* @params3: 消息的状态控制,即属性* @params4: 消息主题,即发送消息的内容*/// 面试题:可以存在没有交换机的队列吗?不可能,虽然没有指定交换机但是一定存在一个默认的交换机channel.basicPublish("",queueName,null,message.getBytes());System.out.println("消息发送成功!!!");} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} finally {// 8. 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}// 9. 释放连接if (connection != null && connection.isOpen()) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}}
1:执行发送,这个时候可以在web控制台查看到这个队列queue的信息

2:我们可以进行对队列的消息进行预览和测试如下:
3:进行预览和获取消息进行测试
6、定义消费者
package com.theory.routing;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) {// 所有的中间件技术都是支持基于tcp/ip协议基础之上构建新型的协议规范,只不过RabbitMQ遵循的是amqp// IP port// 1. 创建连接工厂,并设置参数ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.222.111");connectionFactory.setPort(5672);connectionFactory.setUsername("admin");connectionFactory.setPassword("admin");connectionFactory.setVirtualHost("/");Connection connection = null;Channel channel = null;try {// 2. 创建连接 Connectionconnection = connectionFactory.newConnection("生产者");// 3. 通过连接获取通道Channelchannel = connection.createChannel();// 4. 通过通道创建交换机,声明队列,绑定关系,如有key,发送消息,接收消息channel.basicConsume("queue1", true, new DeliverCallback() {@Overridepublic void handle(String s, Delivery delivery) throws IOException {System.out.println("收到消息是" + new String(delivery.getBody(), "utf-8"));}}, new CancelCallback() {@Overridepublic void handle(String s) throws IOException {System.out.println("接收失败了...");}});System.out.println("开始接收消息");System.in.read();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();} finally {// 7. 关闭通道if (channel != null && channel.isOpen()) {try {channel.close();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}// 8. 关闭通道if (connection != null && connection.isOpen()) {try {connection.close();} catch (IOException e) {e.printStackTrace();}}}}}
