交换机
当消息提供者向RabbitMQ Server发布消息时,消息先过经过交换机,再根据不同的交换机以不同的规则,按照RoutingKey规则绑定到队列,将消息放入队列中。NoName Exchange (无名交换机): 如果我们使用""标识交换机,默认交换机。在无名交换机中RountingKey是可以指定队列名称的。Direct Exchange(直连交换机): 将一个名为Q的消息队列与某个名为D的直连交换机通过值为R的路由键绑定在一起,当一个消息和路由键R发送 到直连交换机D上时,直连交换机D会把消息根据路由键R分发到Q队列,这种模式类似于一对一。Fanout Exchange(扇型交换机): 当一个消息发送到扇形交换机F上时,则扇形交换机F会将消息分别发送给所有绑定到F上的消息队列。 扇形交换机将消息路由给绑定到自身的所有消息队列,也就是说路由键在扇形交换机里没有作用, 故消息队列绑定扇形交换机时,路由键可为空。这个模式类似于广播。Topic Exchange(主题交换机): 主题交换机是一种发布/订阅的模式,结合了直连交换机与扇形交换机的特点,消息队列与主题交换机的绑定 也是通过路由键的。当一个Msg和路由键规则发送到一个主题交换机T时,T会根据路由键规则来筛选出符合 规则的绑定到自身消息队列的路由键(可能是1个,也可能是N个,也可能是0个),根据符合的路由键, 将消息发送到其对应的消息队列里。这个模式类似于多播,当消息的路由规则只匹配到一个路由键时, 此时主题交换机可以看作是直连交换机,当路由规则匹配了主题交换机上所有绑定的队列的路由键时, 此时主题交换机可以看作是扇形交换机。
消息模型
1.简单消息模型
2.工作队列模型
3.发布订阅模型
4.路由模型
5.主题模型
绑定
交换机和队列之间的联系,是通过RoutingKey路由规则绑定。
直接交换机
直接交换机一般使用路由模式绑定队列,指定路由规则选择性分发。
Consumer
//路由名称private static final String EXCHANGE = "direct_routing";public static void main(String[] args) { //获取通道 Channel channel = RabbitMqUtils.getChannel(); try { //声明交换机 channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT); //声明临时队列 String queue = channel.queueDeclare().getQueue(); //队列,交换机,路由Key channel.queueBind(queue, EXCHANGE, "google"); //消费消息 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费异常"); }; System.out.println("ConsumerA准备就绪"); channel.basicConsume(queue, true, deliverCallback, cancelCallback); } catch (IOException e) { e.printStackTrace(); }}
Provider
//路由名称 private static final String EXCHANGE = "direct_routing"; public static void main(String[] args) throws IOException { Connection connection = RabbitMqUtils.getConnection(); Channel channel = connection.createChannel(); try { for (int i = 0; i < 10; i++) { //发送消息 channel.basicPublish(EXCHANGE, "google", null, ("msg" + i).getBytes()); } } catch (IOException e) { e.printStackTrace(); } finally { RabbitMqUtils.close(connection, channel); } }
扇出交换机
Provider
//交换机名称public static final String EXCHANGE = "my.fanout";public static void main(String[] args) { Channel channel = RabbitMqUtils.getChannel(); try { for (int i = 0; i < 100; i++) { channel.basicPublish(EXCHANGE, "", null, ("message" + i).getBytes()); } } catch (IOException e) { e.printStackTrace(); }}
ConsumerA
//交换机名称public static final String EXCHANGE = "my.fanout";public static void main(String[] args) { //获取队列 Channel channel = RabbitMqUtils.getChannel(); try { //声明交换机(交换机名称,交换机类型) channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT); //声明一个临时队列(断开TCP连接即删除队列) String queue = channel.queueDeclare().getQueue(); //绑定交换机和队列 channel.queueBind(queue, EXCHANGE, ""); System.out.println("ConsumerA准备就绪..."); //消费消息 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消费错误..."); }; channel.basicConsume(queue, true, deliverCallback, cancelCallback); } catch (IOException e) { e.printStackTrace(); }}
ConsumerB
//交换机名称public static final String EXCHANGE = "my.fanout";public static void main(String[] args) { //获取队列 Channel channel = RabbitMqUtils.getChannel(); try { //声明交换机(交换机名称,交换机类型) channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT); //声明一个临时队列(断开TCP连接即删除队列) String queue = channel.queueDeclare().getQueue(); //绑定交换机和队列 channel.queueBind(queue, EXCHANGE, ""); System.out.println("ConsumerB准备就绪..."); //消费消息 DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println(new String(message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消费错误..."); }; channel.basicConsume(queue, true, deliverCallback, cancelCallback); } catch (IOException e) { e.printStackTrace(); }}
主题交换机
Topic交换机的路由需要满足一定的条件,必须是一个单词列表。#表示匹配0个或多个单词,*表示一个单词。*.user.* 中间带user的三个单词组合user.# 多个单词,第一个是user
Provider
//交换机名称private static final String EXCHANGE = "my.topic";public static void main(String[] args) { Connection connection = RabbitMqUtils.getConnection(); Channel channel = null; try { channel = connection.createChannel(); for (int i = 0; i < 20; i++) { channel.basicPublish(EXCHANGE, "v.e.rabbit", null, ("msg" + i).getBytes()); } } catch (IOException e) { e.printStackTrace(); } finally { RabbitMqUtils.close(connection, channel); }}
ConsumerA
//交换机名称 private static final String EXCHANGE = "my.topic"; public static void main(String[] args) throws IOException { //获取通道 Channel channel = RabbitMqUtils.getChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC); //创建队列 channel.queueDeclare("Q1", false, false, false, null); //交换机绑定队列 channel.queueBind("Q1", EXCHANGE, "*.orange.*"); //消费消息 DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("NodeA:" + new String(message.getBody())); CancelCallback cancelCallback = consumerTag -> System.out.println("消息消费异常"); channel.basicConsume("Q1", true, deliverCallback, cancelCallback); }
ConsumerB
//交换机名称 private static final String EXCHANGE = "my.topic"; public static void main(String[] args) throws IOException { //获取通道 Channel channel = RabbitMqUtils.getChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC); //创建队列 channel.queueDeclare("Q2", false, false, false, null); //交换机绑定队列 channel.queueBind("Q2", EXCHANGE, "*.*.rabbit"); channel.queueBind("Q2", EXCHANGE, "lazy.#"); //消费消息 DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("NodeB:" + new String(message.getBody())); CancelCallback cancelCallback = consumerTag -> System.out.println("消息消费异常"); channel.basicConsume("Q2", true, deliverCallback, cancelCallback); }