import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.client.producer.TransactionSendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.support.MessageBuilder;import org.springframework.stereotype.Component;@Componentpublic class RocketMqClient { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 同步发送消息 * @param destination topic:tags * @param msg */ public void syncSend(String destination, Object msg) { rocketMQTemplate.send(destination, MessageBuilder.withPayload(msg).build()); } /** * 同步发送消息 * @param destination topic:tags * @param msg * @Returen SendResult 返回结果 */ public SendResult send(String destination, Object msg) { return rocketMQTemplate.syncSend(destination, msg); } /** * 发送延迟短信 * @param destination * @param msg * @param delayLevel messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h * @return */ public SendResult syncSendDelay(String destination, Object msg, int delayLevel) { return rocketMQTemplate.syncSend(destination,MessageBuilder.withPayload(msg).build(),3000,delayLevel); } /** * 发送顺序消息 * @param destination * @param payload * @param hashKey * @return */ public SendResult syncSendOrderly(String destination, Object payload, String hashKey) { return rocketMQTemplate.syncSendOrderly(destination,payload,hashKey); } /** * 只发送一次不关心结果 * @param destination * @param payload */ public void sendOneWay(String destination, Object payload) { rocketMQTemplate.sendOneWay(destination, payload); } /** * 发送事务消息 * @param destination * @param payload * @param arg * @return */ public TransactionSendResult sendMessageInTransaction(String destination, Object payload, Object arg) { return rocketMQTemplate.sendMessageInTransaction(destination,MessageBuilder.withPayload(payload).build(),arg); }}
import javax.annotation.Nullable;import javax.validation.constraints.NotBlank;import java.io.Serializable;import java.util.Map;import java.util.Objects;import java.util.StringJoiner;@Getter@Setter@Slf4jpublic class MessageTO implements Serializable { private static final long serialVersionUID = -1923863198201966526L; /** * 业务主键 */ @NotBlank private String businessKey; /** * 用户Id */ @NotBlank private String userId; /** * 数据 */ @Nullable private Map<String, Object> data; public MessageTO() { } @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof MessageTO)) return false; MessageTO messageTO = (MessageTO) o; if (!businessKey.equals(messageTO.businessKey)) return false; return Objects.equals(data, messageTO.data); } @Override public int hashCode() { int result = businessKey.hashCode(); result = 31 * result + (data != null ? data.hashCode() : 0); return result; } @Override public String toString() { return new StringJoiner(", ", MessageTO.class.getSimpleName() + "[", "]") .add("businessKey='" + businessKey + "'") .add("data=" + data) .add("userId=" + userId) .toString(); }}