
使用延时队列完成
订单实体
package com.ctgu.sheep.mq.model;import com.fasterxml.jackson.annotation.JsonFormat;import lombok.*;import org.springframework.format.annotation.DateTimeFormat;import javax.persistence.*;import java.time.LocalDateTime;/*** Created By Intellij IDEA** @author ssssheep* @package com.ctgu.sheep.mq.model* @datetime 2022/9/18 星期日*/@Getter@Setter@ToString@AllArgsConstructor@NoArgsConstructor@Entity@Table(name = "t_orders")public class Order {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Integer id;private String goodsName;private String buyerName;@ManyToOne@JoinColumn(name = "buyer_id", referencedColumnName = "id")private User buyer;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime createTime;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime payTime;@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime finishTime;/*** orderStatus: 0:未支付 1:已支付 2:已完成 3:已取消*/private Integer orderStatus;private String cancelReason;}
用户实体
package com.ctgu.sheep.mq.model;import lombok.*;import javax.persistence.*;/*** Created By Intellij IDEA** @author ssssheep* @package com.ctgu.sheep.mq.model* @datetime 2022/9/18 星期日*/@Getter@Setter@ToString@AllArgsConstructor@NoArgsConstructor@Entity@Table(name = "t_users")public class User {@Id@GeneratedValue(strategy = GenerationType.IDENTITY)private Integer id;private String username;}
延时队列配置
/*** Created By Intellij IDEA** @author ssssheep* @package com.ctgu.sheep.mq.config* @datetime 2022/9/18 星期日*/@Configurationpublic class OrderQueueConfig {public static final String ORDER_QUEUE_NAME = "order.queue";public static final String ORDER_EXCHANGE_NAME = "order.exchange";public static final String ORDER_ROUTING_KEY = "order.routingkey";@Bean("orderQueue")public Queue orderQueue() {return new Queue(ORDER_QUEUE_NAME);}@Bean("orderExchange")public CustomExchange orderExchange() {HashMap<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(ORDER_EXCHANGE_NAME, "x-delayed-message", true, false, args);}@Beanpublic Binding bindingOrderQueue(@Qualifier("orderQueue") Queue queue,@Qualifier("orderExchange") CustomExchange customExchange) {return BindingBuilder.bind(queue).to(customExchange).with(ORDER_ROUTING_KEY).noargs();}}
订单业务控制器
/*** Created By Intellij IDEA** @author ssssheep* @package com.ctgu.sheep.mq.controller* @datetime 2022/9/18 星期日*/@RestController@RequestMapping("/order")@Slf4jpublic class OrderController {public static final String ORDER_EXCHANGE_NAME = "order.exchange";public static final String ORDER_ROUTING_KEY = "order.routingkey";@Autowiredprivate OrderRepository orderRepository;@Autowiredprivate UserRepository userRepository;@Autowiredprivate RabbitTemplate rabbitTemplate;@PostMapping("/create")@Transactional(rollbackFor = Exception.class)public String createOrder(@RequestBody CreateOrderParam param) {User user = userRepository.findById(param.getBuyerId()).get();Order order = new Order();order.setGoodsName(param.getGoodsName());order.setBuyerName(user.getUsername());order.setBuyer(user);order.setCreateTime(LocalDateTime.now());order.setOrderStatus(0);Order entity = orderRepository.save(order);log.info("订单创建成功,订单id:{}", entity.getId());rabbitTemplate.convertAndSend(ORDER_EXCHANGE_NAME, ORDER_ROUTING_KEY, String.valueOf(entity.getId()), message -> {message.getMessageProperties().setDelay(30000);return message;});return "ok";}@PostMapping("/pay")public String payOrder(@RequestParam Integer orderId) {Order order = orderRepository.findById(orderId).get();order.setPayTime(LocalDateTime.now());order.setOrderStatus(1);orderRepository.save(order);return "支付成功";}}
超期订单消费者
/*** Created By Intellij IDEA** @author ssssheep* @package com.ctgu.sheep.mq.mq* @datetime 2022/9/18 星期日*/@Slf4j@Componentpublic class DeadOrderQueueConsumer {@Autowiredprivate OrderRepository orderRepository;public static final String ORDER_QUEUE_NAME = "order.queue";@RabbitListener(queues = ORDER_QUEUE_NAME)@Transactional(rollbackFor = Exception.class)public void orderOverTime(Message message) {String oid = new String(message.getBody());log.info("订单延时队列收到消息,订单id:{}", oid);Order order = orderRepository.findById(Integer.valueOf(oid)).get();if (order.getOrderStatus() == 0) {order.setOrderStatus(3);order.setCancelReason("订单超时未支付");orderRepository.save(order);}}}
测试
请求下单的接口
请求成功,订单数据存入到数据库中
30s后,消息进入延时队列中,查询到订单还未支付,因此就将订单的状态改为超时
当我们创建订单后,在30s内进行了支付,那么后续消息队列就不会再修改订单的状态
