感觉自己还是把握不住mq消息模型,先鸽
一: RabbitMq相关概念
问题:
- 复杂接口高峰访问,-秒杀
- 一个功能调用多个微服务接口,接口间没有依赖关系
- 多个数据源,其中一个数据更新
1.1 什么是消息队列(MQ)
队列:先进先出的数据结构, 存放消息的数据结构模型
MQ是消息通信的模型,并不是具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。


两者间的区别和联系:
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模型;而AMQP的消息模型更加丰富
1.2 常见的mq产品

- ActiveMQ:基于JMS
- RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
- RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
- Kafka:分布式消息系统,高吞吐量
1.2.1 RabbitMQ
RabbitMQ是基于AMQP的一款消息管理系统
官方教程:http://www.rabbitmq.com/getstarted.html
docker run -d --name rabbitmq --publish 5671:5671 \--publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 \rabbitmq:management
注:
4369 — erlang发现口
5672 —client端通信口
15672 — 管理界面ui端口
25672 — server间内部通信口
1.2.2 RabbitMQ 工作模型

(1)Broker:中介。提供一种传输服务,维护一条从生产者到消费者的传输线路,保证消息数据能按照指定的方式传输。粗略的可以将图中的RabbitMQ Server当作Broker。
(2)Exchange:消息交换机。指定消息按照什么规则路由到哪个队列Queue。生产者不能直接和Queue建立连接,而是通过交换机进行消息分发。
(3)Queue:消息队列。消息的载体,每条消息都会被投送到一个或多个队列中。
(4)Binding:绑定。作用就是将Exchange和Queue按照某种路由规则绑定起来。
(5)RoutingKey:路由关键字。消息所携带的标志,Exchange根据RoutingKey进行消息投递。
(6)Vhost:虚拟主机。一个Broker可以有多个虚拟主机,用作不同用户的权限分离。一个虚拟主机持有一组Exchange、Queue和Binding。
(7)Producer:消息生产者。主要将消息投递到对应的Exchange上面。一般是独立的程序。
(8)Consumer:消息消费者。消息的接收者,一般是独立的程序。
(9)Channel:消息通道,也称信道,是连接消费者和Broker的虚拟连接,如果直接让消费者和Broker建立TCP的连接,会让Broker有性能损耗。在客户端的每个连接里可以建立多个Channel,每个Channel代表一个会话任务。
1.2.2 路由消息模型

P(producer/ publisher):生产者,一个发送消息的用户应用程序。
C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
消息类型
目前共四种类型:direct、tanout、topic、headers ,header匹配AMQP消息的 header 而不是路由键,headers 交换器和 direct 交换器完全一致,但性能差能多,目前几乎用不到了,
Direct
全匹配式传递。当RoutingKey和消息标志完全一样才会存放到对应的队列

Topic
广播式全部传递。息都会被投递到所有与此Exchange绑定的queue中

Fanout
匹配式传递。 # 表示0个或多个单词, *表示1个

二.springBoot整合RabbitMq
Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。
2.1基础使用
2.1.1引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.2.2配置文件
spring:rabbitmq:host: 192.168.1.83username: guestpassword: guestvirtual-host: /template:exchange: gmall.item.exchangepublisher-confirms: true
- virtual-host 指定虚拟主机
- template:有关
AmqpTemplate的配置- exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
- publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试
2.2.3 相关api
- AmqpAdmin 声明交换机(Exchange) , 队列 ,绑定(Binding)等资源对象; ```java @Autowired private AmqpAdmin amqpAdmin
@Testpublic void createBinding() {Binding binding = new Binding("hello-java-queue",Binding.DestinationType.QUEUE,"hello-java-exchange","hello.java",null);amqpAdmin.declareBinding(binding);log.info("Binding[{}]创建成功:","hello-java-binding");}@Testpublic void create() {HashMap<String, Object> arguments = new HashMap<>();//死信队列arguments.put("x-dead-letter-exchange", "order-event-exchange");arguments.put("x-dead-letter-routing-key", "order.release.order");arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟Queue queue = new Queue("order.delay.queue", true, false, false, arguments);amqpAdmin.declareQueue(queue);log.info("Queue[{}]创建成功:","order.delay.queue");}@Testpublic void createExchange() {Exchange directExchange = new DirectExchange("hello-java-exchange",true,false);amqpAdmin.declareExchange(directExchange);log.info("Exchange[{}]创建成功:","hello-java-exchange");}
2. **RabbitTemplate **访问(发送和接收消息)的帮助类```java@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void sendMessageTest() {OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();reasonEntity.setId(1L);reasonEntity.setCreateTime(new Date());reasonEntity.setName("reason");reasonEntity.setStatus(1);reasonEntity.setSort(2);String msg = "Hello World";//1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口//2、发送的对象类型的消息,可以是一个jsonrabbitTemplate.convertAndSend("hello-java-exchange","hello2.java",reasonEntity,new CorrelationData(UUID.randomUUID().toString()));log.info("消息发送完成:{}",reasonEntity);}
RabbitListener 与 RabbitHandler 注解接收消息
RabbitListener 可以放在类和方法上
RabbitHandler 只能放在方法上,用来重载不同参数的方法
2.2 数据丢失问题
消息从生产者到中间件,中间件中交换机到消息队列,中间件到消费者过程中数据都有可能丢失,如何避免此种问题呢?
事务机制:
客户端中与事务机制相关的方法有3个channel.txSelect,channel.txCommit,channel.txRollback。
- channel.txSelect 用于开启事务;
- channel.txCommit 用于提交事务;
- channel.txRollback 用于回滚事务。
在通过 channel.txSelect 方法开启事务之后,我们便可以发送消息给 RabbitMQ了,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交执行之前由于 RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行 channel.txRollback 方法来实现事务回滚。
- RabbitMQ消息确认机制
但因为事务机制容易堵塞,不推荐使用
回调函数
- publisher confirmCallback 投递到exchange
- publisher returnCallback 未投递到 queue 退回
确认机制
- consumer ack 机制
2.2.1 生产者可靠抵达-confirmCallback
```java spring.rabbitmq.publisher-confirm-type: correlated
• 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启 confirmcallback 。<br /> • CorrelationData:用来表示当前消息唯一性。<br /> • 消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有 broker 接收到才会调用 confirmCallback。 <br />• 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递 到目标 queue 里。所以需要用到接下来的 returnCallback<a name="bhSjM"></a>### 2.2.2 生产者可靠抵达- returnCallbackb```java# 开启发送端消息抵达Queue确认spring.rabbitmq.publisher-returns=true# 只要消息抵达Queue,就会异步发送优先回调returnfirmspring.rabbitmq.template.mandatory=true
- confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有 些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到 return 退回模式。
这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数 据,定期的巡检或者自动纠错都需要这些数据
2.2.3 消费者- Ack 消息确认机制
开启手动确认
# 手动ack消息,不使用默认的消费端确认spring.rabbitmq.listener.simple.acknowledge-mode=manual
监听参数内有channel通道参数,通过通道方法确认参数
确认消息:// 参数二:是否批量确认channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
拒绝消息:
// 参数二:是否重新入队,false时消息不再重发,如果配置了死信队列则进入死信队列,没有死信就会被丢弃//重新入队会重新获取消息channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
不确认消息
// 参数二:是否批量; 参数三:是否重新回到队列,true重新入队channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
改造消费者监听器代码如下:
@Componentpublic class Listener {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "spring.test.queue", durable = "true"),exchange = @Exchange(value = "spring.test.exchange",ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),key = {"a.*"}))public void listen(String msg, Channel channel, Message message) throws IOException {try {System.out.println("接收到消息:" + msg);int i = 1 / 0;// 确认收到消息,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {if (message.getMessageProperties().getRedelivered()) {System.out.println("消息重试后依然失败,拒绝再次接收");// 拒绝消息,不再重新入队(如果绑定了死信队列消息会进入死信队列,没有绑定死信队列则消息被丢弃,也可以把失败消息记录到redis或者mysql中),也可以设置为true再重试。channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);} else {System.out.println("消息消费时出现异常,即将再次返回队列处理");// Nack消息,重新入队(重试一次)channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}e.printStackTrace();}}}
