一、消息中间件介绍:
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)常见的消息中间件产品:
(1)ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。我们在本次课程中介绍 ActiveMQ的使用。
(2)RabbitMQ(elang语言)
AMQP协议的领导实现,支持多种场景。淘宝的MySQL集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用。
(3)ZeroMQ
史上最快的消息队列系统
(4)Kafka
Apache下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统。适合处理海量数据。
(5)RocketMQ:
阿里的开源项目,2012年己交给Apache。
二、目前项目中面临的问题:

问题: 我们已经完成了5个web模块和4个服务模块。其中运营商后台的调用关系最多,用到了商家商品服务、广告内容服务、搜索服务和页面生成服务。这种模块之间的依赖也称之为耦合。而耦合越多,之后的维护工作就越困难。那么如果改善系统模块调用关系、减少模块之间的耦合呢?我们接下来就介绍一种解决方案——消息中间件。
2.1 解决问题:
2.1.1 改造系统模块调用关系(解耦)
2.2 JMS简介
2.2.1什么是JMS
JMS(JavaMessaging Service)是Java平台上有关面向消息中间件的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发。
JMS本身只定义了一系列的接口规范,是一种与厂商无关的 API,用来访问消息收发系统。它类似于 JDBC(javaDatabase Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JML 客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。
JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一
些不同形式的数据,提供现有消息格式的一些级别的兼容性。
· TextMessage—一个字符串对象 · MapMessage—一套名称-值对 · ObjectMessage—一个序列化的 Java 对象 · BytesMessage—一个字节的数据流 · StreamMessage — Java 原始值的数据流
2.2.2 JMS消息传递类型
对于消息的传递有两种类型:
一种是点对点的,即一个生产者和一个消费者一一对应;
另一种是发布/ 订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
三、使用消息中间件ActiveMQ:
3.1 在docker下安装activeMQ:
【下载镜像】:
docker pull rmohr/activemq
【安装容器】:
docker run -id --name activemq -p 61616:61616 -p 8161:8161 rmohr/activemq
【查看效果】:
说明: 第一次进入,用户名,密码都是admin
3.2 在springboot下使用activemq:
3.2.1 创建一个生产者工程:
3.2.2 生产者工程使用步骤如下:
第一步:添加依赖如下 :
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><!--4.引入单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-test</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId></dependency></dependencies>
第二步:创建application.yml文件:
server:port: 8801spring:activemq:broker-url: tcp://192.168.56.10:61616jms:pub-sub-domain: true #代表发布的是:发布/订阅消息,默认是点对点消息
第三步:创建启动类:
/*** ------------------------------* 功能:* 作者:WF* 微信:hbxfwf13590332912* 创建时间:2021/8/7-15:21* ------------------------------*/@SpringBootApplicationpublic class JmsProducerAppliation {public static void main(String[] args) {SpringApplication.run(JmsProducerAppliation.class);}}
第四步:创建单元测试类:
/*** ------------------------------* 功能:* 作者:WF* 微信:hbxfwf13590332912* 创建时间:2021/8/7-15:07* ------------------------------*/@RunWith(SpringRunner.class)@SpringBootTestpublic class Test01 {@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;/*** 功能: 发送[点对点]消息* 参数:* 返回值: void* 时间: 2021/8/7 15:08*/@Testpublic void test01(){//1. 发送点对点消息jmsMessagingTemplate.convertAndSend("zelin-01","发送第一个消息。。。");}/*** 功能: 发送 [发布/订阅] 消息* 参数:* 返回值:* 时间: 2021/8/7 15:37*/@Testpublic void test02(){//2. 发送发布/订阅消息Destination dest = new ActiveMQTopic("zelin-topic");jmsMessagingTemplate.convertAndSend(dest,"发送发布/订阅消息!");}}
3.2.3 创建一个消费者工程:
3.2.3 消费者工程使用步骤:
第一步:在工程pom.xml添加依赖:
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><!--4.引入单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-test</artifactId></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId></dependency></dependencies>
第二步:添加application.yml映射文件:
server:port: 8802spring:activemq:broker-url: tcp://192.168.56.10:61616jms:pub-sub-domain: true
第三步:复制配置,修改端口,创建一个新的消费者:

第四步:根据实际情况,定义监听方代码实现:
/*** ------------------------------* 功能:监听点对点消息(P2P)* 作者:WF* 微信:hbxfwf13590332912* 创建时间:2021/8/7-15:11* ------------------------------*/@Componentpublic class MyMessageListener {@JmsListener(destination = "zelin-01")public void getMessage01(String message){System.out.println("得到消息:" + message);}}
/*** ------------------------------* 功能:监听: [发布/订阅] 消息* 作者:WF* 微信:hbxfwf13590332912* 创建时间:2021/8/7-15:35* ------------------------------*/@Componentpublic class MyMessageListener2 {@JmsListener(destination = "zelin-topic")public void getMessage(String message){System.out.println("得到消息:" + message);}}
四、完成更新索引库的操作:
4.1 在zyg-manager-web中添加activemq依赖,并注掉对搜索服务的依赖:
4.2 在application.yml文件中添加配置:
spring:activemq:broker-url: tcp://192.168.56.10:61616
4.3 修改goodsController中的updateStatus方法:
/*** 功能: 批量审核数据[数据一致性同步方案二: 使用消息中间件]* 参数:* 返回值: com.zelin.utils.R* 时间: 2021/7/30 16:11*/@RequestMapping("updateStatus")public R updateStatus(Long[] ids,String status){//1. 审核商品goodsService.updateStatus(ids,status);//2. 根据商品id查询sku列表List<ItemEntity> entities = itemService.findItemsbyGoodsId(ids);//3. 将商品列表包装成字符串发送出去if(entities != null && entities.size() > 0) {jmsMessagingTemplate.convertAndSend("updateStatus", JSON.toJSONString(entities));}else{return R.error().put("msg","没有要审核的商品!");}return R.ok();}
4.4 在zyg-search-service中添加依赖:
<!--引入activemq--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency>
4.5 在zyg-search-service中application.yml配置
spring:activemq:broker-url: tcp://192.168.56.10:61616
4.6 定义监听器:
/*** ------------------------------* 功能:定义监听sku商品列表* 作者:WF* 微信:hbxfwf13590332912* 创建时间:2021/8/9-14:41* ------------------------------*/@Componentpublic class MyMessageListener {@Autowiredprivate ItemSearchService itemSearchService;@JmsListener(destination = "updateStatus")public void getSkuList(String skuList){//1. 得到sku列表,并转换为集合List<ItemEntity> entities = JSON.parseArray(skuList, ItemEntity.class);//2. 将sku商品列表添加到索引库中itemSearchService.updateToIndex(entities);//3. 打印提示信息System.out.println("更新到索引库成功!");}}
4.7 商品审核效果:
五、完成生成静态页面的效果:
5.1 在zyg-manager-web中注掉对于生成静态页面服务依赖:
<!--引入生成静态页面的服务--><!--<dependency>--><!-- <groupId>com.zelin</groupId>--><!-- <artifactId>zyg-page-interface</artifactId>--><!-- <version>2.0</version>--><!--</dependency>-->
5.2 在zyg-manager-web中的goodsController修改生成静态页面的方法:
/*** 功能: 根据商品id生成商品静态面[利用消息中间件发送消息]* 参数:* 返回值:* 时间: 2021/8/6 14:31*/@RequestMapping("/item")public void createHtml(String goodsId) throws IOException {jmsMessagingTemplate.convertAndSend("createHtml",goodsId);}
5.3 在zyg-page-service中添加依赖:
<!--2. 添加activemq依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency>
5.4 在zyg-page-service中的application.yml中添加配置:
spring:activemq:broker-url: tcp://192.168.56.10:61616
5.5 在zyg-page-service中添加监听器:
/*** ------------------------------* 功能:* 作者:WF* 微信:hbxfwf13590332912* 创建时间:2021/8/9-15:02* ------------------------------*/@Componentpublic class MyMessageListener {@Autowiredprivate PageSerivce pageSerivce;//1. 根据得到的商品id,生成静态页面@JmsListener(destination = "createHtml")public void createHtml(String goodsId) throws IOException {System.out.println("goodsId = " + goodsId);pageSerivce.createHtml(Long.parseLong(goodsId));}}
5.6 最后生成静态页面:
六、当我们删除商品时,从索引库删除相关sku商品及商品详情页面:
6.1 处理删除数据的序列化问题:(application.yml文件配置)
spring:activemq:broker-url: tcp://192.168.56.10:61616packages:trust-all: true
说明: 1、默认只处理java.lang包中的一些类的序列化,其它类不能处理 2、上面的配置packages.trust-all: 代表所有的包都可以进行序列化操作. 3、我们需要在消息发送方及消息监听方都做这一配置。(即: zyg-search-service,zyg-page-service,zyg-manager-web都要进行配置).
6.2 在goodsController中删除商品时,发送消息:
/*** 逻辑删除* 思路:* ① 在删除商品时,同时删除索引库中相关sku的列表信息* ② 同时删除此商品生成的静态页面*/@RequestMapping("/delete")public R delete(@RequestBody Long[] ids){//1. 根据商品id删除商品(逻辑删除)goodsService.removeByIds(Arrays.asList(ids));//2. 发送消息,访问搜索服务,从索引库中删除相关的sku列表jmsMessagingTemplate.convertAndSend("deleteSku",Arrays.asList(ids));//3. 发送消息,访问页面静态化服务,从生成的静态页面中删除此相关的静态页jmsMessagingTemplate.convertAndSend("deletePage",Arrays.asList(ids));return R.ok();}
6.3 在zyg-search-service中监听删除的消息:
【在服务中定义删除功能】:
/*** 功能: 从索引库中根据goodsid删除商品* 参数:* 返回值:* 时间: 2021/8/9 15:45*/public void deleteSku(List<Long> goodsIds){//1. 构造查询条件Query query = new CriteriaQuery(new Criteria("goodsId").in(goodsIds));//2. 从索引库中删除记录restTemplate.delete(query,ItemEntity.class,IndexCoordinates.of("item"));}
【定义监听器方法】:
/*** 功能: 从索引库中根据商品id删除数据* 参数:* 返回值:* 时间: 2021/8/9 15:44*/@JmsListener(destination = "deleteSku")public void deleteSku(List<Long> ids){itemSearchService.deleteSku(ids);}
6.4 在zyg-page-service中监听删除的消息:
【在服务中定义删除功能】:
/*** 功能: 根据商品id删除静态页面* 参数:* 返回值: void* 时间: 2021/8/9 15:51*/@Overridepublic void deletePage(List<Long> ids) {for (Long id : ids) {String path = FILE_PATH + id + ".html";System.out.println("path = " + path);new File(path).delete();}}
【定义监听器方法】:
//2. 删除静态页面@JmsListener(destination = "deletePage")public void deletePage(List<Long> ids){pageSerivce.deletePage(ids);}


