介绍
消息队列通常有三个概念
- 发送消息(生产者)
- 队列
- 接收消息(消费者)。
RabbitMQ在这个基本概念之上,多做了一层抽象,在发送消息和队列之间,加入了交换机。这样发送消息和队列就没有直接关系,而是通过交换机来做转发,交换机会根据分发策略把消息转给队列。
RabbitMQ比较重要的几个概念:
- 虚拟主机:RabbitMQ支持权限控制,但是最小控制粒度为虚拟主机。一个虚拟主机可以包含多个交换机、队列、绑定。
- 交换机:RabbitMQ分发器,根据不同的策略将消息分发到相关的队列。
- 队列:缓存消息的容器。
- 绑定:设置交换机与队列的关系。
黄色的圈圈就是我们的消息推送服务,将消息推送到 中间方框里面也就是 rabbitMq的服务器,然后经过服务器里面的交换机、队列等各种关系将数据处理入列后,最终右边的蓝色圈圈消费者获取对应监听的消息。

队列
位于org.springframework.amqp.core这个包
当声明一个队列的时候,它会自动绑定到默认交换机上,并以队列名字作为路由键。
队列构造器参数
- name 队列名字
- durable=”true” ,持久化 rabbitmq重启的时候不需要创建新的队列,默认是true
- exclusive 表示该消息队列是否只在当前connection生效,默认是false
- autoDelete 表示消息队列没有在使用时将被自动删除 默认是false
交换机
服务器必须实现Direct类型交换机,包含一个空白字符串名称的默认交换器。
交换机构造器参数
- durable=”true” ,rabbitmq重启的时候不需要创建新的交换机
- auto-delete 表示交换机没有在使用时将被自动删除 默认是false
交换机类型
- DirectExchange交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列
- topic交换器,采用模糊匹配路由键的原则进行转发消息到队列中
- fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列中。
DirectExchange 直连型交换机
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。
- 配置交换机、队列 ```java import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class DirectExchangeConfig {
@Bean(name = "queueDirect")public Queue directQueue(){return new Queue("queue-direct");}@Bean(name = "directExchange")public DirectExchange directExchange() {return new DirectExchange("directExchange", true, false);}@Beanpublic Binding bindingDirect(Queue queueDirect, DirectExchange directExchange) {return BindingBuilder.bind(queueDirect).to(directExchange).with("routingKey-direct");}
}
- 生产者```javaimport org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class Producter {private Logger logger = LoggerFactory.getLogger(Producter.class);@Autowiredprivate RabbitTemplate rabbitTemplate;public void direct(String message){logger.info("product direct:" + message);rabbitTemplate.convertAndSend("directExchange","routingKey-direct",message);}}
- 消费者 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class Consumer { private Logger logger = LoggerFactory.getLogger(Consumer.class);
@RabbitHandler@RabbitListener(queues = "queue-direct")public void direct(String message){logger.info("consume direct:"+message);}
}
- 测试```java@RestController@RequestMapping(value = "mq-test")public class MQTest {private Logger logger = LoggerFactory.getLogger(MQTest.class);@Autowiredprivate Producter producter;@GetMappingpublic void test(){producter.direct("direct message");}}

TopicExchange 主题交换机
TopicExchange 主题交换机,采用模糊匹配路由键的原则进行转发消息到队列中,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
简单地介绍下规则:
*(星号) 用来表示一个单词 (必须出现的)#(井号) 用来表示任意数量(零个或多个)单词
通配符的绑定键是跟队列进行绑定的,举个小例子
队列Q1 绑定键为 .TT. 队列Q2绑定键为 TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;
主题交换机是非常强大的,理由有二:
- 当一个队列的绑定键为 “#”(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
- 当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。
- 配置交换机、队列 ```java import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
// topic交换器 采用模糊匹配路由键的原则进行转发消息到队列中 @Configuration public class TopicExchangeConfig { //topic模式 public static final String TOPIC_QUEUE_FIRST = “topicQueueFirst”; public static final String TOPIC_QUEUE_SECOND = “topicQueueSecond”; //路由key public static final String ROUTING_KEY_TOPIC_FIRST = “routingKey-topic-first.*”; public static final String ROUTING_KEY_TOPIC_SECOND = “routingKey-topic-second.#”; // topic交换机 public static final String TOPIC_EXCHANGE = “topicExchange”; @Bean(name = “firstTopicQueue”) public Queue firstTopicQueue() { return new Queue(TOPIC_QUEUE_FIRST,true,false,false); }
@Bean(name = "secondTopicQueue")public Queue secondTopicQueue() {return new Queue(TOPIC_QUEUE_SECOND,true,false,false);}@Bean(value = "topicExchange")public TopicExchange topicExchange() {return new TopicExchange(TOPIC_EXCHANGE);}@Beanpublic Binding bindingFirstTopicExchange(Queue firstTopicQueue, TopicExchange topicExchange) {return BindingBuilder.bind(firstTopicQueue).to(topicExchange).with(ROUTING_KEY_TOPIC_FIRST);}@Beanpublic Binding bindingSecondTopicExchange(Queue secondTopicQueue, TopicExchange topicExchange) {return BindingBuilder.bind(secondTopicQueue).to(topicExchange).with(ROUTING_KEY_TOPIC_SECOND);}
}
- 生产者```javaimport org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class Producter {private Logger logger = LoggerFactory.getLogger(Producter.class);@Autowiredprivate RabbitTemplate rabbitTemplate;public void topicFirstOne(String message){logger.info("product topic first:" + message);rabbitTemplate.convertAndSend("topicExchange","routingKey-topic-first.one",message);}// 并未发送到topicQueueSecond队列里public void topicFirstTwo(String message){logger.info("product topic first:" + message);rabbitTemplate.convertAndSend("topicExchange","routingKey-topic-first.one.two",message);}public void topicSecondOne(String message){logger.info("product topic second:" + message);rabbitTemplate.convertAndSend("topicExchange","routingKey-topic-second.one",message);}public void topicSecondTwo(String message){logger.info("product topic second:" + message);rabbitTemplate.convertAndSend("topicExchange","routingKey-topic-second.one.two",message);}}
- 消费者 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class Consumer { private Logger logger = LoggerFactory.getLogger(Consumer.class); @Autowired private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "topicQueueFirst")@RabbitHandlerpublic void topicFirst(String message){logger.info("consume topic first:"+message);}@RabbitHandler@RabbitListener(queues = "topicQueueSecond")public void topicSecond(String message){logger.info("consume topic second:"+message);}
}
- 测试```java@RestController@RequestMapping(value = "mq-test")public class MQTest {private Logger logger = LoggerFactory.getLogger(MQTest.class);@Autowiredprivate Producter producter;@GetMappingpublic void test(){producter.topicFirstOne("First One");producter.topicFirstTwo("First Two");producter.topicSecondOne("Second One");producter.topicSecondTwo("Second Two");}}

FanoutExchang 扇型交换机
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
- 配置交换机、队列 ```java import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class FanoutExchangeConfig {
@Bean(name = "fanoutQueueFirst")public Queue firstFanoutQueue() {return new Queue("queue-fanout-first");}@Bean(name = "fanoutQueueSecond")public Queue secondFanoutQueue() {return new Queue("queue-fanout-second");}@Bean(name = "fanoutExchange")public FanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}@Beanpublic Binding bindingFirstFanoutExchange(Queue fanoutQueueFirst, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueueFirst).to(fanoutExchange);}@Beanpublic Binding bindingSecondFanoutExchange(Queue fanoutQueueSecond, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueueSecond).to(fanoutExchange);}
}
- 生产者```javaimport org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class Producter {private Logger logger = LoggerFactory.getLogger(Producter.class);@Autowiredprivate RabbitTemplate rabbitTemplate;public void fanout(String message){logger.info("product fanout: "+message);rabbitTemplate.convertAndSend("fanoutExchange", null, message);}}
- 消费者 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class Consumer { private Logger logger = LoggerFactory.getLogger(Consumer.class); @RabbitHandler @RabbitListener(queues = “queue-fanout-first”) public void fanoutFirst(String message){ logger.info(“consume fanout first:”+message); } @RabbitHandler @RabbitListener(queues = “queue-fanout-second”) public void fanoutSecond(String message){ logger.info(“consume fanout second:”+message); } }
- 测试```java@RestController@RequestMapping(value = "mq-test")public class MQTest {private Logger logger = LoggerFactory.getLogger(MQTest.class);@Autowiredprivate Producter producter;@GetMappingpublic void test(){producter.fanout("fanout message");}}

HeadersExchange交换机
- 配置交换机、队列 ```java import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.HeadersExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
import java.util.HashMap; import java.util.Map;
@Configuration public class HeadersExchangeConfig { @Bean(name = “headersQueueFirst”) public Queue firstHeadersQueue() { return new Queue(“queue-headers-first”); }
@Bean(name = "headersQueueSecond")public Queue secondHeadersQueue() {return new Queue("queue-headers-second");}@Bean(name = "headersExchangeFirst")public HeadersExchange firstHeadersExchange() {return new HeadersExchange("headersExchange-first");}@Bean(name = "headersExchangeSecond")public HeadersExchange secondHeadersExchange() {return new HeadersExchange("headersExchange-second");}@Beanpublic Binding bindingFirstHeadersExchange(Queue headersQueueFirst, HeadersExchange headersExchangeFirst) {Map<String,Object> headerValues = new HashMap<>();headerValues.put("type", "message");headerValues.put("name", "headers");return BindingBuilder.bind(headersQueueFirst).to(headersExchangeFirst).whereAll(headerValues).match();}@Beanpublic Binding bindingSecondHeadersExchange(Queue headersQueueSecond, HeadersExchange headersExchangeSecond) {Map<String,Object> headerValues = new HashMap<>();headerValues.put("type", "message");headerValues.put("name", "headers");return BindingBuilder.bind(headersQueueSecond).to(headersExchangeSecond).whereAny(headerValues).match();}
}
- 生产者```javaimport org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageDeliveryMode;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.util.Map;@Componentpublic class Producter {private Logger logger = LoggerFactory.getLogger(Producter.class);@Autowiredprivate RabbitTemplate rabbitTemplate;public void headersFirst(Map<String, Object> headers,String message){logger.info("product headers first: "+ message);rabbitTemplate.convertAndSend("headersExchange-first",null, getMessage(headers, message));}public void headersSecond(Map<String, Object> headers,String message){logger.info("product headers second: "+ message);rabbitTemplate.convertAndSend("headersExchange-second", null, getMessage(headers, message));}private Object getMessage(Map<String, Object> head, String msg) {// 声明消息 (消息体, 消息属性)MessageProperties messageProperties = new MessageProperties();// 设置消息是否持久化。Persistent表示持久化,Non-persistent表示不持久化messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);messageProperties.setContentType("UTF-8");messageProperties.getHeaders().putAll(head);return new Message(msg.getBytes(), messageProperties);}}
- 消费者 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class Consumer { private Logger logger = LoggerFactory.getLogger(Consumer.class); @RabbitHandler @RabbitListener(queues = “queue-headers-first”) public void headersFirst(String message){ logger.info(“consume headers first:”+message); } @RabbitHandler @RabbitListener(queues = “queue-headers-second”) public void headersSecond(String message){ logger.info(“consume headers second:”+message); } }
- 测试```java@RestController@RequestMapping(value = "mq-test")public class MQTest {private Logger logger = LoggerFactory.getLogger(MQTest.class);@Autowiredprivate Producter producter;@GetMappingpublic void test(){Map<String,Object> headers = new HashMap<>();headers.put("type", "message");Map<String,Object> headersAll = new HashMap<>();headersAll.put("type", "message");headersAll.put("name", "headers");producter.headersFirst(headers,"headers message first");producter.headersFirst(headersAll,"headers message first all");producter.headersSecond(headers,"headers message second");producter.headersSecond(headersAll,"headers message second all");}}
拓展
发送对象
注意:传递的对象必须支持序列化(实现了Serializable接口)
- 对象类 ```java import java.io.Serializable;
public class Message implements Serializable { private Integer status; private String content;
public Message() {}public Message(Integer status, String content) {this.status = status;this.content = content;}@Overridepublic String toString() {return "Message{" +"status=" + status +", content='" + content + '\'' +'}';}public Integer getStatus() {return status;}public void setStatus(Integer status) {this.status = status;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}
}
- 配置交换机、队列```javaimport org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class DirectConfig {@Bean(name = "classQueue")public Queue classQueue(){return new Queue("queue-class");}@Beanpublic DirectExchange directExchange() {return new DirectExchange("directExchange-class", true, false);}@Beanpublic Binding bindingDirect(Queue classQueue, DirectExchange directExchange) {return BindingBuilder.bind(classQueue).to(directExchange).with("routingKey-class");}}
- 生产者 ```java import com.example.demo.rabbit.entity.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class Producter { private Logger logger = LoggerFactory.getLogger(Producter.class);
@Autowiredprivate RabbitTemplate rabbitTemplate;public void classTest(Message message){logger.info("product class:" + message.toString());String routingKey = "routingKey-class";rabbitTemplate.convertAndSend("directExchange-class",routingKey,message);}
}
- 消费者```javaimport com.example.demo.rabbit.entity.Message;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class Consumer {private Logger logger = LoggerFactory.getLogger(Consumer.class);@RabbitHandler@RabbitListener(queues = "queue-class")public void classTest(Message message){logger.info("consume class:"+message);}}
- 测试 ```java
@RestController @RequestMapping(value = “mq-test”) public class MQTest { private Logger logger = LoggerFactory.getLogger(MQTest.class);
@Autowiredprivate Producter producter;@GetMappingpublic void test(){Message message = new Message(1,"Message");producter.classTest(message);}
}
- 结果<a name="N9A4U"></a>### 工作模式 - 多个消费者采用上面DirectExchange交换机,需启动多个消费者,并不会重复消费- 消费者```javaimport org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class Consumer {private Logger logger = LoggerFactory.getLogger(Consumer.class);@RabbitHandler@RabbitListener(queues = "queue-direct")public void direct(String message){logger.info("consume direct:"+message);}@RabbitHandler@RabbitListener(queues = "queue-direct")public void direct1(String message){logger.info("consume direct 1:"+message);}}
测试
@RestController@RequestMapping(value = "mq-test")public class MQTest {private Logger logger = LoggerFactory.getLogger(MQTest.class);@Autowiredprivate Producter producter;@GetMappingpublic void test(){for (int i=0;i<5;i++){producter.direct("direct message "+i);}}}

RPC
RabbitMQ支持RPC远程调用,同步返回结果。返回的结果可以是一个对象
- 配置交换机、队列,采用上面DirectExchange 交换机配置
- 生产者 ```java import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class Producter { private Logger logger = LoggerFactory.getLogger(Producter.class); @Autowired private RabbitTemplate rabbitTemplate; public void sender(String content){ logger.info(“product message: “+content); String message= (String) rabbitTemplate.convertSendAndReceive(“directExchange”,”routingKey-direct”,content); logger.info(“return message:”+message); } }
- 消费者```javaimport org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class Consumer {private Logger logger = LoggerFactory.getLogger(Consumer.class);@RabbitHandler@RabbitListener(queues = "queue-direct")public String receive(String content){logger.info("consume message: "+content);return "OK";}}
测试
@RestController@RequestMapping(value = "mq-test")public class MQTest {private Logger logger = LoggerFactory.getLogger(MQTest.class);@Autowiredprivate Producter producter;@GetMappingpublic void test(){producter.sender("RPC");}}

