支付订单的核心流程(未接入MQ)

每一次的订单支付成功后,都需要执行一系列的动作,包括
- 更新订单状态。
- 扣减库存。
- 增加积分。
- 发送优惠劵,发送红包。
- 发短信。
- 通知发货。
如果上述的所有操作都是同步执行的话,那可能就会导致处理的时间过长。
比如:
- 更新订单状态需要30ms;
- 调用库存服务的接口进行扣减库存需要耗费80ms;
- 增加积分需要耗费50ms;
- 发送优惠券需要耗时60ms;
- 发送短信需要耗费100ms(涉及与第三方短信系统交互,可能性能抖动会达到1秒+);
- 通知发货需要耗时500ms(因为涉及到跟第三方物流系统交互以及与仓库管理系统交互,比较耗费时间,可能会性能抖动达到1秒+)。
总共耗时时间就有 30 + 80 + 50 + 60 + 100 + 500 = 820ms。
伪代码逻辑实现
/*** 收到订单支付成功的通知*/public void payOrderSuccess(Order order) {updateOrderStatus(order); // 更新本地订单数据库里的订单状态stockService.updateProductStock(order); // 调用库存服务的接口,扣减库存creditService.updateCredit(order); // 调用积分库存的接口,增加积分marketingService.addVoucher(order); // 调用营销服务的接口,增加优惠券pushService.sendMessage(order); // 调用推送服务的接口,发送短信warehouseService.deliveryGoods(order); // 调用仓储服务的接口,通知发货}
支付订单的核心流程(接入MQ)

在用户支付完毕后,只要执行最核心的更新订单状态以及扣减库存就可以了。然后诸如增加积分、发送优惠券、发送短信、通知发货等操作,都可以通过MQ实现异步化执行。
具体就是,订单系统仅会同步执行更新订单状态和扣减库存这两个关键操作,因为一旦支付成功,只要保证了订单状态变为【已支付】,库存扣减掉,就可以保证核心数据不错乱。然后订单系统接着就会发送一个订单支付的消息给RocketMQ,后面相关的积分系统,营销系统,第三方短信系统,仓储系统,都会从RocketMQ里获取订单支付的消息,然后进行后续的业务逻辑处理。
改造后的执行流程时长:
- 更新订单状态30ms。
- 扣减库存80ms。
- 发送订单支付消息到RocketMQ 10ms。
总耗时时间是 30 + 80 + 10 = 120ms。
对于其他系统(积分系统、营销系统等),都会自己从RocketMQ里去获取订单支付消息执行自己要处理的业务逻辑,不会再影响订单核心链路的性能。
伪代码逻辑实现
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.3.0</version></dependency>
/*** 生产者发送消息到RocketMQ里的一个Topic中*/public class RocketMQProducer {// 这个是RocketMQ的生产者类,用这个就可以发送消息到RocketMQprivate static DefaultMQProducer producer;static {// 这里就是构建一个Producer实例对象producer = new DefaultMQProducer("order_producer_group");// 这个是为Producer设置NameServer的地址,让他可以拉取路由信息// 这样才能知道每个Topic的数据分散在哪些Broker机器上// 然后才可以把消息发送到Broker上去producer.setNamesrvAddr("localhost:9876");// 这里启动一个Producerproducer.start();}public static void send(String topic, String message) throws Exception {// 这里是构建一条消息对象Message msgObj = new Message(topic, // 指定发送消息到哪一个Topic上去"", // 消息的Tagmessage.getBytes(RemotingHelper.DEFAULT_CHARSET) // 消息);// 利用Producer发送消息SendResult sendResult = producer.send(msgObj);System.out.printf("%s%n", sendResult);}}
Topic是一个逻辑的概念,实际上Topic的数据时分布式存储在Master Broker中的。生产者系统发送一个订单消息过去的时候,会根据一定的负载均衡算法和容错算法把消息发送到一个Broker中去。
/*** 消费者系统(积分系统、营销系统、推送系统、仓储系统)* 从RocketMQ里消费 “TopicOrderPaySuccess”中的订单消息,然后执行* 增加积分、发送优惠券、发送短信、通知发货之类的业务逻辑*/public class RocketMQConsumer {public static void start() {new Thread() {public void run() {try {// RocketMQ消费者示例对象// "consumer_group" 之类的就是消费者分组// 一般来说比如积分系统就是 "credit_consumer_group"// 营销系统就是 "marketing_consumer_group"DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("credit_consumer_group");// 这里给消费者设置NameServer的地址// 这样就可以拉取到路由信息,知道Topic的数据在哪些broker上// 然后就可以从对应的broker上拉取数据consumer.setNamesrvAddr("localhost:9876");// 选择订阅 "TopicOrderPaySuccess"的消息// 这样就从这个Topic的broker机器上拉取订单消息过来consumer.subscribe("TopicOrderPaySuccess", "*");// 注册消息监听器来处理拉取到的订单消息// 如果consumer拉取到订单消息,就会回调这个方法交给你处理consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MesageExt> msgs, ConsumeConcurrentlyContext context) {// 在这里对获取到的msgs订单消息进行处理// 比如增加积分、发送优惠券、通知发货等等return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();System.out.printf("Consumer Started. %n");while(true) { // 别让线程退出,就让创建好的consumer不停消费数据Thread.sleep(1000);}} catch(Exception e) {e.printStackTrace();}}}.start();}}
