尚品汇
笔记

项目地址:https://gitee.com/leifengyang/gmall-parent

1、环境
1、compose
docker-compose -f compose.yaml up -d #后台启动所有docker容器

compose文件如果有修改
#1、先删除被修改的容器 docker rm -f xxx
#2、重新部署compose让其忽略已经创建的容器
docker-compose -f compose.yaml up -d —no-recreat

2、项目架构
1、从git初始化一个项目
.gitignore:默认应该,除了src和pom.xml; 特别target不能上传;

 gitee创建出新项目
 git clone到本地
 准备一个pom.xml文件

2、提交项目
 先编写项目 .gitignore 文件,忽略掉所有不需提交的垃圾文件
 刷新git
 unversioned files:代表所有未被git进行版本控制的文件。需要 “右键” -> “Add To VCS”
 项目上右键 “git commit”
 最后 “git push”

3、前端项目
npm config set registry https://registry.npm.taobao.org

npm install

npm run dev

修改config目录,dev.env.js BASE_API: ‘“http://192.168.200.1“‘

npm run build #打包项目

将dist文件夹下所有内容上传到 nginx 服务器即可

2、业务
1、商品详情(service-item)
1、自己写前端 web-all ,页面,js,自己控制逻辑,自己想办法跳到指定页的时候,把数据要来
2、搭建好域名访问环境
3、浏览器访问域名—> 域名是网关地址 —> 网关根据域名转发给web-all —> web-all控制页面跳转逻辑 —> web-all跳页之前远程调用其他微服务获取数据

商品详情逻辑:
1、service-item 去 找service-product要到SkuInfo基本数据
2、service-item 去 找service-product要到SkuInfo图片数据
3、service-item 去 找service-product要到Sku价格
4、service-item 去 找service-product要到Sku所有销售属性数据
5、service-item 去 找service-product要到 销售属性值跳转sku的数据
6、把以上数据组装成页面需要的Map
7、web-all 远程 调用 service-item 要到数据

核心SQL
#查询sku对应的所有spu销售属性值组合,以及标识出当前sku属于哪组销售属性值
SELECT ssa.id attr_id,ssav.*,IF(sku_sav.sale_attr_value_id IS NULL,’0’,’1’) is_checked
FROM spu_sale_attr_value ssav
LEFT JOIN spu_sale_attr ssa ON ssa.spu_id = ssav.spu_id AND ssa.base_sale_attr_id=ssav.base_sale_attr_id
LEFT JOIN sku_sale_attr_value sku_sav ON ssav.spu_id=sku_sav.spu_id
AND ssav.id=sku_sav.sale_attr_value_id AND sku_sav.sku_id=49
WHERE ssav.spu_id=28 ORDER BY ssav.id

查询当前sku对应的spu下有多少个sku,以及所有sku的销售属性组合
SELECT sku_sav.sku_id, GROUP_CONCAT(sku_sav.sale_attr_value_id
ORDER BY sku_sav.sale_attr_value_id ASC SEPARATOR ‘|’) value_ids
FROM sku_sale_attr_value sku_sav
LEFT JOIN spu_sale_attr_value spu_sav ON sku_sav.sale_attr_value_id=spu_sav.id
WHERE sku_sav.spu_id = 26 GROUP BY sku_sav.sku_id

2、高并发
Java:高并发有三宝:缓存、异步、队排好。生态周边Spring;

Go:天然支持高并发。内存泄露,web应用Java为王;

无限堆机器;保证单个服务并发性能好。

3、压力测试
JMeter:https://jmeter.apache.org/
压测同时配合jvisualvm

首页(三级分类)
路径
http://localhost:8000/api/index/categorys
web-all 到 商品
网关到web-all到商品
全链路

首页三级分类:gmall.com —-> 网关 —-> web-all —> product —-> mysql
0)、服务空载(2w多)

网关:(正常空载都在1w多)

1)单压商品服务:
a. 路径:http://localhost:8000/api/index/categorys
b. 表现:

吞吐量: 68/s

c. 优化
a) 加内存:当前应用调内存影响不大,临时数据用的多,调伊甸园的大小

b) 关日志:对并发几乎没影响
c) 数据库SQL优化:建索引

d) 加缓存:

e) 远不止上面的

作业:商品详情并发得有多少

总结:
1、缓存极大提升吞吐量。
2、数据库SQL的执行效率也很重要。
3、如果压测发现吞吐量接近样本数量,说明并没有压到瓶颈
优化:
1、加缓存(本地缓存比分布式缓存快很多,本地缓存要解决的问题也很多)、优化SQL
2、尽量缩短请求链路
3、限流。为了让超过系统峰值的流量直接得到快速返回,不用执行业务,系统就不会卡
4、业务逻辑。
Semaphore semaphore = new Semaphore(6000);

@Scheduled(cron = “ * ?”)
public void incresemaphore(){
//信号清空
semaphore.drainPermits();
//重置6000
semaphore.release(6000);
}

@GetMapping(“/categorys”)
public List getCategoryAll(){

  1. //加入限流手段,压测,得到系统峰值,给每个系统进行限流
  2. boolean b = semaphore.tryAcquire();
  3. if(b){
  4. //
  5. List<WebCategoryAllVo> vos = category1Service.getAllCategoryForWeb();
  6. return vos;
  7. }
  8. return null;

}

4、缓存
查一个数据:
1、先看缓存
(1) 缓存中有,直接返回
(2) 缓存中没有,再查数据库
① 查库
② 放缓存

1、整合redis作为分布式缓存

org.springframework.boot
spring-boot-starter-data-redis

SpringBoot已经自动配置了redis

使用redis:

分布式缓存下,获取三级分类压测

2、缓存失效 - 三件套
缓存怎么用?
读模式:
1、先去缓存中获取数据
(1) 如果缓存中有,直接返回
(2) 如果缓存中没有,查数据库(回源)缓存的目的,就是为了提高缓存命中率,避免回源
① 数据库数据查询
② 查询完后数据放缓存
缓存中的任何数据都应该设置过期时间。

缓存穿透:(由于缓存中没有数据,所以穿透缓存,查数据库)
普通穿透攻击:一直查询缓存中没有的数据。 “abc”
随机值穿透攻击:一直去查询任意随机值,每个值在数据库都没有,而且都会给缓存放null,缓存炸了。
防止:
1、Null值也要放缓存。以后查询 “abc’这个key,只要数据库查过一次,就算数据库没有,缓存中 也有null值。
2、机制:防止随机值攻击;思想:缓存中保存了数据库的全量数据。缓存中没有也不用查库。太浪费空间?
(1) 解决办法:缓存既能知道数据库有哪些数据,还要不占用大量空间。《布隆过滤器》

缓存击穿:(同一时刻,百万并发全部进来,查同一个数据(热点数据),正好数据失效,缓存中没有了,百万并发同时判断到redis中无此数据,同时查库)
防止:
1、通过加锁解决;保证查同类数据的只有一个人。

缓存雪崩:(缓存中的所有key,大面积失效【同时过期】。导致大量查询这个key的请求,都必须回源,导致数据库崩溃)
防止:
1、防止同时过期,设置每个key的过期时间的时候随机一点。30min+Math.random(1~10min);
2、弄巧成拙
3、以上都不用管。唯一需要防止的是redis宕机丢数据(可以丢小部分数据,不能丢大量数据【开启redis持久化】,因为缓存中的数据都是读多的数据,再开机数据还有)

3、核心解决方案-布隆过滤器

4、核心解决方案-如何加锁

总结:原子加锁 + 原子解锁 + 锁续期 + 自旋周期 + 超时直接查缓存
效果: 百万并发数据库只放一个。百万并发只等待1s

5、核心解决方案 - 锁续期
new Thread(()->{
System.out.println(“正在准备一个定时任务….”);
//1、创建一个表
Timer timer = new Timer();

  1. //2、
  2. // TimerTask task:真正执行的任务
  3. // long delay: 多久以后才开始 ,ms为单位
  4. // long period: 间隔。10
  5. timer.scheduleAtFixedRate(new TimerTask() {
  6. @Override
  7. public void run() {
  8. System.out.println("哈哈: redisTemplate.expire(lockKey)");
  9. }
  10. },10000,5000); //10s以后开始,每隔5s运行一次

}).start();

6、缓存使用
1、缓存的每个key都应该有规划好的前缀 AA:BB:
2、缓存的每个数据都应该有过期时间

5、检索

6、登陆

7、订单
1、准备订单确认页数据
2、创建订单
(1) 发送消息,完成定时关单
(2) 发送消息,完成删除购物车

N、技术点
1、网关

2、MyBatis-Plus
1、使用
 1、引入starter

com.baomidou
mybatis-plus-boot-starter

 2、指定扫描包
@MapperScan(basePackages = “com.atguigu.gmall.product.mapper”)
 3、编写JavaBean、Mapper、Service
JavaBean

Mapper

Service

Service实现类

所有的基本CRUD全部搞定

2、自定义Mapper文件
MyBatis-Plus提供的默认mapper只能单表CRUD;
多表联查,需要写自定义逻辑;
1、在resources 文件夹下创建 mapper文件夹
2、为了方便安装MybatisX插件
3、使用插件,在SpuInfoMapper上alt+enter生成mapper文件,自定义方法以后继续生成方法

因为有这个默认约定

mybatis-plus:
mapper-locations: classpath:/mapper/*/.xml

效果:SpuInfoMapper继承的BaseMapper也能用,自定义的在配置文件中也能用

@MapperScan(“com.atguigu.gmall.*.mapper”): 配置mapper接口

mybatis-plus:
mapper-locations: classpath:/mapper/*/.xml 配置mapper.xml文件位置的

以上两个都不配;
SpringBoot默认扫描主程序下的包以及所有子包

1、给某个Mapper接口标注上 @Mapper
2、给classpath:/mapper/*/.xml生成xml也能互相绑定上

总结:SpringBoot拥有很多默认配置,遵循约定大于配置。

3、跨域 CORS

1、什么是跨域
浏览器对AJAX的限制;
《同源策略》:浏览器默认只能给和自己在同一个地方的服务器发送AJAX;否则浏览器必须遵循同源策略;

2、为什么会跨域
浏览器只要把AJAX发到了别的地方。服务器给浏览器的响应数据,浏览器就必须看服务器是否允许他使用这个响应数据。

Access to XMLHttpRequest at ‘http://192.168.200.1/admin/product/getCategory1‘ from origin ‘http://192.168.200.130‘ has been blocked by CORS policy: No ‘Access-Control-Allow-Origin’ header is present on the requested resource.

服务器只需要给浏览器响应数据的时候加个 Access-Control-Allow-Origin 这个头
问题?跨域的时候请求发出去了没,服务器执行了没?
服务器功能已经执行了,数据已经返回了。只是浏览器一看没有安全头“Access-Control-Allow-Origin”,浏览器直接把数据抛弃了。

解决:
服务器响应的时候加上 头就行;

3、怎么解决

解决:
服务器响应的时候加上 头就行;

实现方式:
1、写个过滤器,响应出去之前 response.addHeader(“Access-Control-Allow-Origin”,””);
2、每个微服务都需要这个过滤器?
3、在网关层写一个filter
4、在网关层配置
spring:
cloud:
gateway:
globalcors:
cors-configurations:
‘[/**]’:
allowedOrigins: “

allowedMethods: “*”

4、跨域细节
https://developer.mozilla.org/zh-CN/docs/Web/HTTP/CORS
跨域。需要区分是否简单请求。简单请求只需要给响应头设置:

复杂请求,比较麻烦;

1、简单请求
简单请求不会触发预检

满足一下是简单请求:

我们由于这个 不符合要求,不属于简单请求;
2、复杂请求
复杂请求会触发预检(OPTIONS 侦查兵);
Access-Control-Allow-Origin: #允许所有来源进行跨域
Access-Control-Allow-Methods:
#允许所有请求方式跨域
Access-Control-Allow-Headers: * #允许所有请求头跨域
Access-Control-Max-Age: 86400 #缓存时间,
每次的跨域都会先发侦察兵

3、最终解决
spring:
cloud:
gateway:
globalcors:
cors-configurations:
‘[/*]’:
allowedOrigins: “

allowedMethods: “
allowedHeaders: “

allowCredentials: true #是否允许携带cookie跨域

面试:什么是跨域?

1、跨域是指浏览器所在的地址(协议名://域名:端口号)和他要发送AJAX请求的地址不一致导致的问题;
2、出现跨域以后。浏览器默认会使用《同源策略》拒绝跨域请求的数据。
(1) 简单请求的真实请求已经发出去了,服务器都执行了。数据也交给浏览器
3、解决:(服务端返回数据的时候设置响应头即可)
(1) 简单请求,不用发预检请求,只需要允许 Origins:
(2) 复杂请求,需要发预检请求,我们需要允许复杂请求带请求头进行访问

4、git
怎么用:
1、每天回家之前,先commit;
2、push 之前 把远程代码 pull下来,如果相同文件不一样会提示修改冲突。
3、第二天来,先pull,拉取到远程最新代码,再进行修改
(1) 解决完冲突再修改

5、minio
1、登陆系统
2、创建用户
3、创建桶,设置桶为Public
4、上传文件
5、访问文件
http://192.168.200.130:9000/mall0624/搜狗截图20210925110212.png

6、单元测试
1、导入单元测试starter

org.springframework.boot
spring-boot-starter-test
test

2、编写测试类
测试类的包名必须和主程序的包名一致。
@Test 不是 junit。是import org.junit.jupiter.api.Test;

7、openfeign

0、SpringBoot对feign自动配置
FeignAutoConfiguration;
@ConfigurationProperties(“feign.client”)
public class FeignClientProperties

0、使用
1、导入依赖

org.springframework.cloud
spring-cloud-starter-openfeign

2、开启feign远程客户端功能
@EnableFeignClients(basePackages = “com.atguigu.gmall.item.feign”)

3、编写feign客户端
@RequestMapping(“/api/sku”) //路径匹配和对面微服务一样
@FeignClient(“service-product”) //指定要掉的微服务
public interface SkuInfoFeignClient {
}

1、抽取

2、bean定义重写
别忘了开这个

3、自定义每个微服务的feign配置

8、Redisson
官方文档:https://github.com/redisson/redisson/wiki/Table-of-Content

1、分布式锁
本地锁由于只能锁当前的这一台机器,在分布式情况下,没办法锁住所有人。所以分布式情况下,线程安全的操作共享数据就必须用 分布式锁;

方法内的局部变量永远是安全

2、分布式对象与分布式集合
几乎所有企业都在用免费版

3、如何使用
我们的使用:分布式锁+布隆过滤器

1、引入依赖

org.redisson
redisson
3.15.3

2、编写配置
@Import(MybatisPlusConfig.class) //导入即可
//@MapperScan(basePackages = “com.atguigu.gmall.product.mapper”)
@Configuration
public class ProductConfig {

// @Value(“${spring.redis.host}”)
// private String redisHost;

  1. @Autowired //SpringBoot已经把和redis有关的所有属性封装到了 RedisProperties 类中。只需要我们自动注入即可
  2. RedisProperties redisProperties;
  3. @Bean
  4. RedissonClient redissonClient(/*@Value("${spring.redis.host}") String host*/){
  5. //0、指定配置
  6. Config config = new Config();
  7. config.useSingleServer()

// .setConnectionMinimumIdleSize(redisProperties.getJedis().getPool().getMinIdle()) //没配过池,NPE
.setPassword(redisProperties.getPassword()) //有密码
.setAddress(“redis://“+redisProperties.getHost()+”:”+redisProperties.getPort());
// redis://: 协议名
// rediss://: 安全协议。 redis加载了证书

  1. //1、创建redisson客户端
  2. RedissonClient client = Redisson.create(config);
  3. return client;
  4. }

}

9、定时任务
1、开启配置
@EnableScheduling
2、编写表达式

@Scheduled(cron = “0 0 3 /3 ?”)

定时任务在分布式情况下所有机器同时启动,需要加分布式锁让一个机器真正干活

10、配置抽取
1、抽取
1、组件的所有设置/参数项应该作为可配置 【造出属性封装类 xxxProperties】
写一个配置类。就是用来封装配置文件值的
@ConfigurationProperties(prefix = “service.bloom”)
public class BloomFilterProperties

2、由于各种组件容器中都要能使用到,需要做一个 自动配置类(xxxxAutoConfiguration) 来帮我们给容器中导入\注入有功能的组件
//EnableConfigurationProperties 先让 BloomFilterProperties 属性类和配置文件进行绑定
@EnableConfigurationProperties(BloomFilterProperties.class)
@Configuration
public class RedissonAutoConfiguration {

3、让 自动配置类(xxxxAutoConfiguration) 给容器中自定义注入组件
@ConditionalOnProperty(value = “service.bloom.config.sku”,matchIfMissing = false)
//也可以使用条件注解让这个组件在指定配置存在的情况下才生效
@Bean
public RBloomFilter

@Bean
RedissonClient redissonClient(/@Value(“${spring.redis.host}”) String host/)

2、引用
1、依赖导入 自动配置类 所在的jar包
即使这样,本项目也可能扫描不到这个jar的组件。
原因:当前项目包名一般和 导入的jar包 不一样

2、导入 jar包 中的自动配置类
@EnableScheduling
@Import({
MybatisPlusConfig.class,
RedissonAutoConfiguration.class
}) //导入即可 //@MapperScan(basePackages = “com.atguigu.gmall.product.mapper”)
@Configuration
public class ProductConfig

3、终极抽取
spring-boot-starter-web
1、写一个项目 jar,这个项目依赖 spring-boot-starter

2、和以前10.1一样编写自己的 属性类,自动配置类,组件注入 ,
把以前的代码搬家到独立项目

3、2步骤编写完成以后,为了让项目启动自己加载,不用写import

在当前starter的类路径下。创建 META-INF/spring.factories
SpringBoot在启动的时候,会自动加载 所有jar 包中 META-INF/spring.factories 中写的开启的自动配置

4、引用
别人项目导入我们的starter

逻辑:
1、A项目引入了 我们的starter
2、我们的starter 在类路径下的 META-INF/spring.factories 配置了让某个xxxAutoConfiguration 启动生效
3、A项目启动。A的SpringBoot自动扫描A依赖的所有jar包 的 META-INF/spring.factories。然后扫到了我们
4、我们的starter开始自动配置, 开始注入一堆组件
按需配置:按条件配置

5、A项目就能开心的@Autowired了。

4、模式抽取
事务@Transactional、权限控制@PreAuthorize、统一日志、消息100%可靠@RabbitListener ……
共同特点:解决方案的模式是固定的,步骤是固定的。关键环节自己实现。AOP思想

为了几乎很多核心方法,都需要缓存。我们为了方便。希望一个注解搞定。
@GmallCache
AOP面向切面编程+自定义注解就能搞定

11、AOP+自定义注解
0、spring-aop

org.springframework.boot
spring-boot-starter-aop

1、定义了@GmallCache
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME) //功能强大
@Inherited
@Documented
public @interface GmallCache {

  1. /**
  2. * 参数设置项
  3. */

}

只要标在方法上,希望能在运行期间给这个方法自动加上缓存一套逻辑

2、切面(编写好了缓存逻辑,合适的时候直接切入运行)

附加功能:
动态表达式解析
@Test
void computeExpression(){
//1、拿到一个表达式解析器
SpelExpressionParser parser = new SpelExpressionParser();
// #aaa1: 从 setVariable 的变量里面取值
String key = “aaa:#{#aaa1}:#{1+1}”;
EvaluationContext context = SimpleEvaluationContext.forReadOnlyDataBinding().build();
context.setVariable(“aaa1”,”bbb”);

  1. String value = parser.parseExpression(key,new TemplateParserContext()).getValue(context, String.class);
  2. System.out.println(value);

}

12、异步编排

1、回调地狱
$ajax(url1,function(data)=>{
//查学生信息
$ajax(url2,function(data)=>{
//查到老师
$ajax(url3,function(data)=>{
//查到课程

  1. })
  2. })

});

Promise; CompletableFuture

new Promise(url1)
.then((data)=>{})
.then((data)=>{})

CompletableFuture.supplyRunAsync()
.then()
.then()
.then()

2、启动异步
CompletableFuture.runAsync()
CompletableFuture.supplyRunAsync()
CompletableFuture.supplyRunAsync()
CompletableFuture.supplyRunAsync()

3、编排异步顺序
2、then的各种特效;来编排任务顺序
- thenAccept: 收到上一步的结果 thenAccept((data)->{})
- thenApply: 既能接受到上一步结果,还能为下一步提供返回值。 thenApply((data)->{ return xxx; })
- thenRun: 不需要上一步的结果,也不用为下一步提供返回值, thenRun(()->{})
- 以上调用的是没加Async的,不是以异步方式运行; 上一步干活的线程,直接复用干then接下来的活
- 以上调用加上Async。上一步干活的线程干完以后回到线程池由线程池管理,给我开个新线程执行任务

区别异步编排
第一种写法:(起3个线程,同时干3个不同的活)
skuInfoFuture.thenAcceptAsync((aaa)->{
System.out.println(“正在干活1….”+aaa);
System.out.println(“thenAccept:::”+Thread.currentThread().getName() + “:查随便”);
},executor);

  1. skuInfoFuture.thenAcceptAsync((aaa)->{
  2. System.out.println("正在干活2...."+aaa);
  3. System.out.println("thenAccept:::"+Thread.currentThread().getName() + ":查随便");
  4. },executor);
  5. skuInfoFuture.thenAcceptAsync((aaa)->{
  6. System.out.println("正在干活3...."+aaa);
  7. System.out.println("thenAccept:::"+Thread.currentThread().getName() + ":查随便");
  8. },executor);

第二种写法:(一个干完,干另一个)
skuInfoFuture.thenAcceptAsync((aaa)->{
System.out.println(“正在干活….”+aaa);
System.out.println(“thenAccept:::”+Thread.currentThread().getName() + “:查随便”);
},executor).thenAcceptAsync((aaa)->{
System.out.println(“正在干活2….”+aaa);
System.out.println(“thenAccept:::”+Thread.currentThread().getName() + “:查随便”);
},executor).thenAcceptAsync((aaa)->{
System.out.println(“正在干活3….”+aaa);
System.out.println(“thenAccept:::”+Thread.currentThread().getName() + “:查随便”);
},executor);

4、异常与结果感知和处理
()()()()()().whenComplete((t,u)->{ //只能感知
if(u!=null) { //代码的异常处理 }
})

()()()()()().exceptionally((throwable)->{ //进行异常的熔断处理
return 兜底数据;
})

5、汇总结果
CompletableFuture
.allOf(skuFuture,categoryFuture,attrFuture,jsonFuturn,imageFuture,priceFuture)
.join();
所有结果全部完成以后继续往下执行

13、ElasticSearch
1、引入 starter
(es和SpringDataElasticsearch版本最好完全一致)、否则就直接引用 high-level-client。

org.springframework.boot
spring-boot-starter-data-elasticsearch

2、开启ElasticSearch的仓库自动功能
@EnableElasticsearchRepositories(basePackages = “com.atguigu.gmall.list.respository”)
@Configuration
public class AppConfig {
}

3、使用
1、准备JavaBean,并标注Es的注解

@AllArgsConstructor
@NoArgsConstructor
@ToString
@Data
@Document(indexName=”stu”,shards = 3,replicas = 2)
public class Student {

  1. @Id
  2. private Long id; //数据的id
  3. @Field(type = FieldType.Keyword)
  4. private String name;
  5. @Field(type = FieldType.Integer)
  6. private Integer age;
  7. @Field(type = FieldType.Text)
  8. private String address;//全文检索字段

}

2、准备Repository接口

@Repository
public interface PersonEsRepository extends PagingAndSortingRepository {

  1. //查出 address 模糊匹配
  2. List<Person> findAllByAddressLike(String address);
  3. //查出 address like '武汉' 以及 年龄 > 18 并且
  4. List<Person> findAllByAddressLikeAndAgeGreaterThanEqualAndBirthBetween(String address,
  5. Integer age,
  6. Date birth,
  7. Date birth2);
  8. //起名工程师
  9. List<Person> findAllByAgeGreaterThan(Integer age);
  10. //删除
  11. void deleteAllByBirthAfter(Date birth);

}

4、效果

1、基本对Es的CRUD、分页都使用自定义的 PagingAndSortingRepository 接口的子类
2、有条件写好方法名
3、复杂的crud;SpringData自动装配的 ElasticsearchRestTemplate
4、也可以使用 原生的 RestHighLevelClient

14、feign坑
1、使用feign同步远程调用,网关透传下来的请求头丢失
解决:编写feign的拦截器,每次feign调用之前,拿到当前请求,回填请求头

2、开启异步,使用feign远程调用。又炸?
原因:feign每次发送请求,先过拦截器,拦截器线程和主请求已经不是一个线程

15、RabbitMQ
1、引入starter

org.springframework.boot
spring-boot-starter-amqp

2、分析自动配置

  • SpringBoot自动配置了RabbitMQ
  • 1、RabbitAutoConfiguration:
    1. 1)、配置绑定在 RabbitProperties.class
    1. 2)、CachingConnectionFactory:连接工厂,负责创建和RabbitMQ的连接
    1. 3)、RabbitTemplateConfigurerxxxxConfigurer:配置器,自定义修改RabbitTemplate
    1. 4)、RabbitTemplate xxxTemplate:操作RabbitMQ的模板类。 mysql-crud
    1. 4.1)、定制化修改:自己放RabbitTemplate
    1. 4.2)、定制化修改:给容器中放一个 RabbitTemplateConfigurer;就会用我们配置器的规则
    1. 5)、AmqpAdmin 管理RabbitMQ (mysql-ddl\dml\dql);
    1. 高级:
    1. 1、给容器中注入一个 MessageConverter 我们收发消息,转化器就会自动转化消息
    1. 2、默认用jdk序列化+base64编码


  • 2、使用:自动注入 RabbitTemplate

3、配置
spring:
rabbitmq:
host: 192.168.200.130
port: 5672
username: admin
password: admin
publisher-confirm-type: correlated #保证消息不丢, 服务器会确认收到的消息
publisher-returns: true
listener:
simple:
acknowledge-mode: manual #手动确认模式
prefetch: 1 # 消费者每次从队列获取的消息数量。此属性当不设置时为:轮询分发,设置为1为:公平分发

一个队列下的所有消费者是竞争关系。 但是一个交换机可以把消息广播给多个队列。

4、使用
注入:RabbitTemplate 收发消息

高阶:
@Bean
RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory){
RabbitTemplate template = new RabbitTemplate();
configurer.configure(template, connectionFactory);
//设置回调
//CorrelationData correlationData, boolean ack, @Nullable String cause
template.setConfirmCallback((correlationData,ack,cause)->{
System.out.println(“回调::::【ConfirmCallback】:correlationData=”+correlationData+”,ack=”+ack+”,cause=”+cause);
});

  1. //设置回调
  2. //Message message, int replyCode, String replyText, String exchange, String routingKey
  3. template.setReturnCallback((message,replyCode,replyText,exchange,routingKey)->{
  4. System.out.println("回调::::【ReturnCallback】:message="+message+",replyCode="+replyCode+",replyText"+replyText+",exchange="+exchange+",routingKey"+routingKey);
  5. });
  6. //exchange,routingKey,queue direct(直接 点对点) topic(允许通配符 对多) fanout(广播)
  7. return template;

}

5、声明队列交换机等

/**

  • 怎么造一个交换机
  • 订单交换机
    /
    @Bean
    public Exchange orderExchange() {
    /*
    • String name, 交换机名字
    • boolean durable, 持久化
    • boolean autoDelete, 自动删除
    • Map arguments 设置项
      */
      return new TopicExchange(MqConstant.ORDER_EXCHANGE, true, false, null);
      }

@Bean
Queue orderDelayQueue(OrderProperties orderProperties) {
/*
String name, 队列名
boolean durable, 持久化
boolean exclusive, 排他
boolean autoDelete, 自动删除
@Nullable Map arguments 参数设置
x-dead-letter-exchange: order-event-exchange
x-dead-letter-routing-key: order.release
x-message-ttl: 60000
/
//延迟队列的设置项
Map arguments = new HashMap<>();
arguments.put(“x-dead-letter-exchange”, MqConstant.ORDER_EXCHANGE);
arguments.put(“x-dead-letter-routing-key”, MqConstant.ORDER_TIMEOUT_RK);
arguments.put(“x-message-ttl”, orderProperties.getOrderTimeOut() * 1000); //需要毫秒值
return new Queue(MqConstant.ORDER_DELAY_QUEUE, true, false, false, arguments);
}

@Bean
Queue orderDeadQueue(){
return new Queue(MqConstant.ORDER_TIMEOUT_QUEUE,true,false,false);
}

/**

  • 订单交换机与延迟队列绑定
  • @return
    /
    @Bean
    Binding delayQueueBinding(){
    /*
    • String destination, 目的地
    • DestinationType destinationType, 目的地类型
    • String exchange, 交换机
    • String routingKey, 路由键
    • @Nullable Map arguments 参数项

    • 哪个 交换机exchange 和 目的地destination 使用 路由键routingKey 进行绑定
      */
      return new Binding(MqConstant.ORDER_DELAY_QUEUE,
      Binding.DestinationType.QUEUE,
      MqConstant.ORDER_EXCHANGE,
      MqConstant.ORDER_DELAY_RK,
      null);
      }

/**

  • 订单交换机与死信队列绑定
    /
    @Bean
    Binding deadQueueBinding(){
    /*
    • String destination, 目的地
    • DestinationType destinationType, 目的地类型
    • String exchange, 交换机
    • String routingKey, 路由键
    • @Nullable Map arguments 参数项

    • 哪个 交换机exchange 和 目的地destination 使用 路由键routingKey 进行绑定
      */
      return new Binding(MqConstant.ORDER_TIMEOUT_QUEUE,
      Binding.DestinationType.QUEUE,
      MqConstant.ORDER_EXCHANGE,
      MqConstant.ORDER_TIMEOUT_RK,
      null);
      }

6、基于注解进行监听
先要 @EnableRabbit
然后
@RabbitListener(queues = MqConstant.ORDER_TIMEOUT_QUEUE)
public void closeOrder(Message message, Channel channel){
System.out.println(“收到消息”);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//订单关闭
try{
// orderInfoService.updateOrder(orderId,);
//long deliveryTag, boolean multiple
//mq收不到回复,就认为这个消息还没有成功。断开了和mq连接。
//mq会再次把消息派给别人
channel.basicAck(deliveryTag,false);
}catch (Exception e){
try {
channel.basicNack(deliveryTag,false,true);
} catch (IOException ex) {
}
}
//
//1、Ready(就绪的消息,这些是要往出发的)、Unacked(发货了,未签收)、Total(总消息数)
//没干完活?没回复ok?会被统计到 Unacked 中
//此时客户端炸了?完全与MQ断连接?(未unack的所有消息,断连以后,重新进入Ready队列,重新发给别人)
//客户端一直unack?我连着但是不回复?(一直持有这个消息,别人不会有)

  1. }