1.步骤
- 创建连接工厂
- 获取连接connection
- 通过连接获取通道
- 通过创建交换机,声明队列,绑定关系,路由key,发送消息 和接受消息
- 准备消息内容
- 发送消息给队列
- 关闭连接通道
2.生产者代码
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Producer { private static final String QUEUE_NAME = "LHM"; public static void main(String[] args) { //所有的中间件的技术都是基于 tcp/ip 协议基础上构建的新型协议规范 不过rabbitmq 遵循的amqp // ip port //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("39.102.67.107"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); //2.获取连接connection Connection connection = null; Channel channel = null; try { connection = factory.newConnection("生产者"); //3.通过连接获取通道 channel = connection.createChannel(); //4.通过创建交换机,声明队列,绑定关系,路由key,发送消息 和接受消息 /** * @param1:队列的名称 * @param2:是否要持久化 durable = false 所谓的持久化消息是否存盘,如果false,非持久化 true 是持久化 * @param3:排他性 是否是独占队列 * @param4:是否自动删除,随着最后一个消费者发送消息完毕是否把队列删除 * @param5:携带一些附属参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); //5.准备消息内容 String message = "hello"; //6.发送消息给队列 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); } catch (Exception e) { e.printStackTrace(); } finally { //7.关闭连接通道 if (channel != null && channel.isOpen()){ try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } if (connection != null && connection.isOpen()){ try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } }}
3.消费者代码
import com.rabbitmq.client.*;import java.nio.charset.StandardCharsets;public class Consumer { private static final String QUEUE_NAME = "LHM"; public static void main(String[] args) { //所有的中间件的技术都是基于 tcp/ip 协议基础上构建的新型协议规范 不过rabbitmq 遵循的amqp // ip port //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("39.102.67.107"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); //2.获取连接connection Connection connection = null; Channel channel = null; try { connection = factory.newConnection("生产者"); //3.通过连接获取通道 channel = connection.createChannel(); //4.通过创建交换机,声明队列,绑定关系,路由key,发送消息 和接受消息 channel.basicConsume(QUEUE_NAME, true, (consumerTag, message) -> { System.out.println("收到的消息为:" + new String(message.getBody(), StandardCharsets.UTF_8)); }, consumerTag -> { System.out.println("消息接受失败了"); }); System.out.println("开启接受消息"); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { //7.关闭连接通道 if (channel != null && channel.isOpen()){ try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } if (connection != null && connection.isOpen()){ try { connection.close(); } catch (Exception e) { e.printStackTrace(); } } } }}