静态页面
Feign远程调用丢失请求头问题



Feign异步情况丢失上下文问题

订单确认页流程
下单流程


锁库存逻辑

下单失败
本地事务和分布式事务
本地事务的问题
本地事务
1、事务的基本性质
2、事务的隔离级别
3、事务的传播行为
4、SpringBoot 事务关键点
分布式事务
1、为什么有分布式事务
2、CAP 定理与 BASE 理论
1、CAP 定理

http://thesecretlivesofdata.com/raft/
2、面临的问题
对于多数大型互联网应用的场景,主机众多、部署分散,而且现在的集群规模越来越大,所
以节点故障、网络故障是常态,而且要保证服务可用性达到 99.99999%(N 个 9),即保证
P 和 A,舍弃 C。
3、BASE 理论
是对 CAP 理论的延伸,思想是即使无法做到强一致性(CAP 的一致性就是强一致性),但可以采用适当的采取弱一致性,即最终一致性。
4、强一致性、弱一致性、最终一致性
3、分布式事务几种方案
1)、2PC 模式
数据库支持的 2PC【2 phase commit 二阶提交】,又叫做 XA Transactions。
MySQL 从 5.5 版本开始支持,SQL Server 2005 开始支持,Oracle 7 开始支持。
其中,XA 是一个两阶段提交协议,该协议分为以下两个阶段:
第一阶段:事务协调器要求每个涉及到事务的数据库预提交(precommit)此操作,并反映是否可以提交
第二阶段:事务协调器要求每个数据库提交数据。
其中,如果有任何一个数据库否决此次提交,那么所有数据库都会被要求回滚它们在此事务中的那部分信息。

2)、柔性事务-TCC 事务补偿型方案
刚性事务:遵循 ACID 原则,强一致性。
柔性事务:遵循 BASE 理论,最终一致性;
与刚性事务不同,柔性事务允许一定时间内,不同节点的数据不一致,但要求最终一致。
一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
二阶段 commit 行为:调用 自定义 的 commit 逻辑。
二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。
3)、柔性事务-最大努力通知型方案
按规律进行通知,不保证数据一定能通知成功,但会提供可查询操作接口进行核对。这种方案主要用在与第三方系统通讯时,比如:调用微信或支付宝支付后的支付结果通知。这种方案也是结合 MQ 进行实现,例如:通过 MQ 发送 http 请求,设置最大通知次数。达到通知次数后即不再通知。
案例:银行通知、商户通知等(各大交易业务平台间的商户通知:多次通知、查询校对、对账文件),支付宝的支付成功异步回调
4)、柔性事务-可靠消息+最终一致性方案(异步确保型)
实现:业务处理服务在业务事务提交之前,向实时消息服务请求发送消息,实时消息服务只记录消息数据,而不是真正的发送。业务处理服务在业务事务提交之后,向实时消息服务确认发送。只有在得到确认发送指令后,实时消息服务才会真正发送。
seata控制分布式事务
seata的分布式事务解决方案
消息队列流程
锁库存增强版逻辑


定时任务的时效性问题
延时队列场景
RabbitMQ延时队列(实现定时任务)
消息的TTL(Time To Live)
消息的TTL就是消息的存活时间。
RabbitMQ可以对队列和消息分别设置TTL。
对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的 设置。超过了这个时
间,我们认为这个消息就死了,称之为死信。
如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队 列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x- message-ttl属性来设置时间,两者是一样的效果。
Dead Letter Exchanges(DLX)
一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列, 一个路由可以对应很多队列。(什么是死信)
一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不 会被再次放在队列
里,被其他消费者使用。(basic.reject/ basic.nack)requeue=false
上面的消息的TTL到了,消息过期了。
队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有 消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
我们既可以控制消息在一段时间后变成死信,又可以控制变成死信的消息 被路由到某一个指定的交换机,结合二者,其实就可以实现一个延时队列
延时队列实现-1
延时队列实现-2

订单服务创建队列、交换机、绑定关系
package com.atguigu.gulimall.order.config;import com.atguigu.gulimall.order.entity.OrderEntity;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.*;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.io.IOException;import java.util.HashMap;import java.util.Map;@Configurationpublic class MyMQConfig {@RabbitListener(queues = "order.release.order.queue")public void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException {System.out.println("收到过期的订单信息,准备关闭订单:" + orderEntity);channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}/*** 注解@Bean注解:可以在容器中自动创建Binding、Queue、Exchange(RabbitMQ没有的情况)* RabbitMQ只要存在,@Bean声明的属性发生变化,重启服务后也不会覆盖先前的数据,只有删除队列才行* 所有的消息默认会先抵达延迟队列order.delay.queue,延迟一段时间后,才会抵达order.release.order.queue队列。然后才被消费掉* @return*/// 创建延迟队列(过了存活时间就会变成死信,然后将信息抛给orderReleaseOrderQueue队列,orderReleaseOrderQueue队列处理的数据都是过期的信息)@Beanpublic Queue orderDelayQueue() {Map<String, Object> arguments = new HashMap<>();// 指定死信路由arguments.put("x-dead-letter-exchange", "order-event-exchange");// 指定死信路由键arguments.put("x-dead-letter-routing-key", "order.release.order");// 消息过期时间(毫秒)arguments.put("x-message-ttl", 60000);Queue orderDelayQueue = new Queue("order.delay.queue", true, false, false, arguments);return orderDelayQueue;}// 创建普通队列@Beanpublic Queue orderReleaseOrderQueue() {return new Queue("order.release.order.queue", true, false, false);}// 创建topic类型交换机@Beanpublic Exchange orderEventExchange() {return new TopicExchange("order-event-exchange", true, false);}// 创建交换机与队列order.delay.queue的绑定关系。// Binding(目的地,目的地类型,交换机,路由键,参数)@Beanpublic Binding orderCreateBingding() {return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);}// 创建交换机与队列order.release.order.queue的绑定关系@Beanpublic Binding orderReleaseBingding() {return new Binding("order.release.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.order", null);}@Beanpublic Binding orderReleaseOtherBingding() {return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.release.other.#", null);}@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}}
模拟延时队列定时关闭订单
1、模拟下单成功
2、RabbitMQ监听消息
3、发送请求
4、发送消息
5、收到消息
库存服务创建交换机、队列、绑定关系
package com.atguigu.gulimall.ware.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.Exchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configurationpublic class MyRabbitConfig {// 使用JSON序列化机制,进行消息转换@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}// 创建topic类型交换机。路由键采用模糊匹配。TopicExchange(交换机名称,是否持久化,是否自动删除)@Beanpublic Exchange stockEventExchange() {return new TopicExchange("stock-event-exchange", true, false);}// 创建普通队列。Queue(队列名称,是否持久化,是否排他(只允许单个连接它,若支持多个连接,则谁抢到消息算谁的),是否自动删除)@Beanpublic Queue stockReleaseStockQueue() {return new Queue("stock.release.stock.queue", true, false, false);}// 创建延迟队列。库存锁定成功后,消息先发给延迟队列,等待消息过期后,再发给普通队列@Beanpublic Queue stockDelayQueue() {Map<String, Object> arguments = new HashMap<>();// 设置死信路由,表示消息过期后交给哪个交换机arguments.put("x-dead-letter-exchange", "stock-event-exchange");// 设置死信路由键,表示消息过期后交给哪个路由键arguments.put("x-dead-letter-routing-key", "stock.release");// 设置消息过期时间arguments.put("x-message-ttl", 120000);return new Queue("stock.delay.queue", true, false, false, arguments);}// 创建交换机与普通队列的绑定关系@Beanpublic Binding stockReleaseBinding() {return new Binding("stock.release.stock.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.release.#", null);}// 创建交换机与延迟队列的绑定关系@Beanpublic Binding stockLockedBinding() {return new Binding("stock.delay.queue", Binding.DestinationType.QUEUE, "stock-event-exchange", "stock.locked", null);}// 第一次监听消息时,idea会连接RabbitMQ,此时才会创建RabbitMQ中没有的队列、交换机和绑定关系// 如果没有监听消息操作,RabbitMQ中就不会创建队列、交换机和绑定关系// 如果需要修改rabbitMQ中已存在的队列交换机,需要先删除,然后再次创建@RabbitListener(queues = "stock.release.stock.queue") // 需要注释掉,否则此时有两个在监听stock.release.stock.queue队列,导致消息消费异常public void listener(WareInfoEntity entity, Channel channel, Message msg) throws IOException, IOException {System.out.println("收到过期的订单信息:准备关闭订单" + entity.getId());channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);}}
库存自动解锁
@Service@RabbitListener(queues = "stock.release.stock.queue")public class StockReleaseListener {@AutowiredWareSkuService wareSkuService;/*** 监听解锁库存功能*/@RabbitHandlerpublic void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {try {System.out.println("收到接收解锁库存的信息......");wareSkuService.unlockStock(stockLockedTo);// 手动确认RabbitMQ中order.release.order.queue队列的的消息(即消费这条消息)channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch (Exception e){System.out.println("rabbitMQ错误:"+e.getMessage());// 只要有任何异常,回退消息。拒绝消息以后重放到队列里面,让其他服务继续消费解锁(防止因自身原因误删RabbitMQ中的消息)channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
/*** 1、库存自动解锁* 下订单成功,库存锁定成功,接下来的业务调用失败,导致订单回滚,之前锁定的库存就要自动解锁* 订单失败:锁库存失败** 只要解锁库存的消息失败,一定要告诉服务解锁失败,需要手动ACK,回复消息** 解锁库存* 查询数据库关于这个订单的锁定库存信息wms_ware_order_task、wms_ware_order_task_detail* 若有数据,即表示库存锁定成功。此时根据订单情况判断是否需要解锁库存* 1、订单不存在,表示订单数据自身已回滚,此时必须解锁库存* 2、订单存在,根据订单状态确认是否需要解锁库存* 1)订单状态为已取消,此时需要解锁库存* 2)订单状态未取消,此时不能解锁库存* 若没有数据,表示库存锁定失败,库存已回滚,这种情况就无需解锁库存*/@Overridepublic void unlockStock(StockLockedTo stockLockedTo){System.out.println("收到解锁库存的消息......");StockDetailTo detail = stockLockedTo.getDetail();Long detailId = detail.getId();// 解锁库存// 1.查询关于这个订单的锁定库存信息WareOrderTaskDetailEntity orderTaskDetailEntity = orderDetailService.getById(detailId);if (orderTaskDetailEntity != null) {// 有锁定库存信息,即库存锁定成功,根据订单情况解锁Long id = stockLockedTo.getId(); //库存工作单wms_ware_order_task表的IdWareOrderTaskEntity taskEntity = wareOrderTaskService.getById(id);String orderSn = taskEntity.getOrderSn();// 远程调用订单服务,根据订单号获取订单实体R r = orderFeignService.getOrderStatus(orderSn);if (r.getCode() == 0) {OrderVo data = r.getData(new TypeReference<OrderVo>() { });if (data == null || data.getStatus() == 4) {// 订单不存在(订单数据已经回滚) 或者 有订单但订单状态是已取消,才可以解锁库存// 只有状态是1(已锁定),才能解锁if (orderTaskDetailEntity.getLockStatus() == 1) {unLockStock(detail.getSkuId(), detail.getWareId(), detail.getSkuNum(), detailId);// 手动确认RabbitMQ中order.release.order.queue队列的的消息(即消费这条消息)// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}} else {// 其它状态(包含订单成功)不解锁// 拒绝消息以后重放到队列里面,让其他服务继续消费解锁(防止因自身原因误删RabbitMQ中的消息)// channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);throw new RuntimeException("远程服务失败....."); // 需要重新解锁。监听器中已实现}} else {// 若不存在锁定库存信息,即库存锁定失败,库存回滚,这种情况无需解锁// 手动确认RabbitMQ中order.release.order.queue队列的的消息(即消费这条消息)// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}}
// 库存解锁,将锁定库存数stock_locked减去num。即恢复原库存public void unLockStock(Long skuId, Long wareId, Integer num, Long taskDetailId) {// TODO 恢复stock为原库存数wareSkuDao.unlockStock(skuId, wareId, num);// 更新库存工作单状态WareOrderTaskDetailEntity entity = new WareOrderTaskDetailEntity();entity.setId(taskDetailId);entity.setLockStatus(2); // 2-已解锁orderDetailService.updateById(entity);}
订单解锁
package com.atguigu.gulimall.order.listener;import com.atguigu.gulimall.order.entity.OrderEntity;import com.atguigu.gulimall.order.service.OrderService;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.io.IOException;// 监听order.release.order.queue队列中的消息(30分钟后才会到达这个队列)@Component@RabbitListener(queues = "order.release.order.queue")public class OrderCloseListener {@AutowiredOrderService orderService;@RabbitHandlerpublic void listener(OrderEntity entity, Channel channel, Message msg) throws IOException {try {System.out.println("收到过期的订单信息,准备关闭订单:" + entity.getOrderSn());orderService.closeOrder(entity);channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {System.out.println("订单关闭异常,库存解锁异常:" + e.getMessage());// 拒绝消息,让其重新回到消息队列channel.basicReject(msg.getMessageProperties().getDeliveryTag(), true);}}}
/*** 关闭订单* @param entity*/@Overridepublic void closeOrder(OrderEntity entity) {// 查询订单的最新状态OrderEntity orderEntity = this.getById(entity.getId());if (orderEntity.getStatus() == OrderStatusEnum.CREATE_NEW.getCode()) {// 关闭订单,将订单状态修改为已取消OrderEntity update = new OrderEntity();update.setId(entity.getId());update.setStatus(OrderStatusEnum.CANCLED.getCode());this.updateById(update);OrderTo orderTo = new OrderTo();BeanUtils.copyProperties(orderEntity, orderTo);try {// 每一条消息进行日志记录(数据库保存每一条消息的详细信息)// 定期扫描数据库将失败的消息再发送一遍rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other", orderTo);} catch (Exception e) {// 将没法送成功的消息进行重试发送}}}








