- 第一章 进程与线程
- 第二章 Java线程
- 第五章 共享模型之无锁
- 第六章 共享模型之不可变
- 第七章 共享模型之工具
- 一、线程池
- 二、J.U.C
黑马程序员:https://www.bilibili.com/video/BV16J411h7Rd?p=221&spm_id_from=pageDriver
第一章 进程与线程
一、进程与线程
1、进程
- 程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至CPU,数据加载至内存。在指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理 IO 的
- 当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进程。
进程就可以视为程序的一个实例。大部分程序可以同时运行多个实例进程(例如记事本、画图、浏览器等),也有的程序只能启动一个实例进程(例如网易云音乐、360 安全卫士等)
2、线程
一个进程之内可以分为一到多个线程。
- 一个线程就是一个指令流,将指令流中的一条条指令以一定的顺序交给CPU 执行
Java 中,线程作为最小调度单位,进程作为资源分配的最小单位。 在 windows 中进程是不活动的,只是作为线程的容器
3、二者对比
进程基本上相互独立的,而线程存在于进程内,是进程的一个子集
- 进程拥有共享的资源,如内存空间等,供其内部的线程共享
- 进程间通信较为复杂:
- 同一台计算机的进程通信称为IPC(Inter-process communication)
- 不同计算机之间的进程通信,需要通过网络,并遵守共同的协议,例如HTTP
- 线程通信相对简单,因为它们共享进程内的内存,一个例子是多个线程可以访问同一个共享变量
-
二、井行与并发
1、单核cpu
单核cpu下,线程实际还是串行执行的。操作系统中有一个组件叫做任务调度器,将cpu的时间片(windows下时间片最小约为 15 毫秒)分给不同的程序使用,只是由于 cpu 在线程间(时间片很短)的切换非常快,人类感觉是同时运行的。总结为一句话就是: 微观串行,宏观并行
一般会将这种线程轮流使用CPU的做法称为并发,concurrent
2、多核cpu
多核cp下,每个核(core)都可以调度运行线程,这时候线程可以是并行的。

3、井行与并发
引Rob Pik的一段描述:
并发(concurrent)是同一时间应对(dealing with)多件事情的能力
- 并行(parallel)是同一时间动手做(doing)多件事情的能力
例子:
- 家庭主妇做饭、打扫卫生、给孩子喂奶,她一个人轮流交替做这多件事,这时就是并发;
- 家庭主妇雇了个保姆,她们一起这些事,这时既有并发,也有并行(这时会产生竞争,例如锅只有一口,一个人用锅时,另一个人就得等待)
雇了3个保姆,一个专做饭、一个专打扫卫生、一个专喂奶,互不干扰,这时是并行
三、应用
1、应用之异步调用
以调用方角度来讲,如果:
需要等待结果返回,才能继续运行就是同步
/*** 同步调用:同一个线程、顺序执行*/@Slf4j(topic = "c.Sync")public class Sync {public static void main(String[] args) {FileReader.read(Constants.MP4_FULL_PATH);log.debug("do other things ...");}}

不需要等待结果返回,就能继续运行就是异步
/*** 异步调用:不同线程、同时执行*/@Slf4j(topic = "c.Async")public class Async {public static void main(String[] args) {new Thread(() -> FileReader.read(Constants.MP4_FULL_PATH)).start();log.debug("do other things ...");}}
1)设计
多线程可以让方法执行变为异步的(即不要巴巴干等着)比如说读取磁盘文件时,假设读取操作花费了5 秒钟,如果没有线程调度机制,这5 秒 cpu 什么都做不了,其它代码都得暂停…
2)结论
比如在项目中,视频文件需要转换格式等操作比较费时,这时开一个新线程处理视频转换,避免阻塞主线程
- tomcat的异步servlet也是类似的目的,让用户线程处理耗时较长的操作,避免阻塞 tomcat 的工作线程
-
2、应用之提高效率
充分利用多核cpu 的优势,提高运行效率。想象下面的场景,执行 3 个计算,最后将计算结果汇总。

如果是串行执行,那么总共花费的时间是10 + 11 + 9 + 1 = 31ms
- 但如果是四核cpu,各个核心分别使用线程 1 执行计算 1,线程 2 执行计算 2,线程 3 执行计算 3,那么 3 个线程是并行的,花费时间只取决于最长的那个线程运行的时间,即 11ms 最后加上汇总时间只会花费 12ms
- 注意:
1)设计
2)结论
- 单核cpu下,多线程不能实际提高程序运行效率,只是为了能够在不同的任务之间切换,不同线程轮流使用cpu,不至于一个线程总占用 cpu,别的线程没法干活
- 多核cpu 可以并行跑多个线程,但能否提高程序运行效率还是要分情况的
- 有些任务,经过精心设计,将任务拆分,并行执行,当然可以提高程序的运行效率。但不是所有计算任务都能拆分(参考后文的【阿姆达尔定律】)
- 也不是所有任务都需要拆分,任务的目的如果不同,谈拆分和效率没啥意义
IO 操作不占用 cpu,只是我们一般拷贝文件使用的是【阻塞 IO】,这时相当于线程虽然不用 cpu,但需要一直等待 IO 结束,没能充分利用线程。所以才有后面的【非阻塞 IO】和【异步 IO】优化
第二章 Java线程
一、创建和运行线程
1、方法一:直接使用Thread
```java // 创建线程对象 Thread t = new Thread() {
public void run() {
// 要执行的任务
} };
// 启动线程 t.start();
```java// 构造方法的参数是给线程指定名字,推荐Thread t1 = new Thread("t1") {@Override// run 方法内实现了要执行的任务public void run() {log.debug("hello");}};t1.start();输出:19:19:00 [t1] c.ThreadStarter - hello
2、方法二:使用Runnable配合Thread
把【线程】和【任务】(要执行的代码)分开:
- Thread 代表线程
- Runnable 可运行的任务(线程要执行的代码) ```java Runnable runnable = new Runnable() { public void run(){ // 要执行的任务 } };
// 创建线程对象 Thread t = new Thread( runnable );
// 启动线程 t.start();
```java// 创建任务对象Runnable task2 = new Runnable() {@Overridepublic void run() {log.debug("hello");}};// 参数1:是任务对象; 参数2:是线程名字,推荐Thread t2 = new Thread(task2, "t2");t2.start();输出:19:19:00 [t2] c.ThreadStarter - hello
// 创建任务对象Runnable task2 = () -> log.debug("hello");// 参数1:是任务对象; 参数2:是线程名字,推荐Thread t2 = new Thread(task2, "t2");t2.start();
3、原理之Thread与Runnable的关系
小结
- 方法1 是把线程和任务合并在了一起,方法2 是把线程和任务分开了
- 用 Runnable 更容易与线程池等高级 API 配合
- 用 Runnable 让任务类脱离了 Thread 继承体系,更灵活
4、方法三:FutureTask配合Thread
FutureTask 能够接收Callable类型的参数,用来处理有返回结果的情况
// 创建任务对象FutureTask<Integer> task3 = new FutureTask<>(() -> {log.debug("hello");Thread.sleep(2000);return 100;});// 参数1:是任务对象; 参数2:是线程名字,推荐new Thread(task3, "t3").start();// 主线程阻塞,同步等待 task 执行完毕的结果Integer result = task3.get();log.debug("结果是:{}", result);输出:19:22:27 [t3] c.ThreadStarter - hello19:22:27 [main] c.ThreadStarter - 结果是:100
二、观察多个线程同时运行
public class TestMultiThread {public static void main(String[] args) {new Thread(() -> {while(true) {log.debug("running");}},"t1").start();new Thread(() -> {while(true) {log.debug("running");}},"t2").start();}}注意:单核CPU不能正常执行
三、查看进程线程的方法
1、windows
- 任务管理器可以查看进程和线程数,也可以用来杀死进程
- tasklist:查看进程
- taskkill 杀死进程
2、linux
- ps -fe:查看所有进程
- ps -fT -p
:查看某个进程(PID)的所有线程 - kill:杀死进程
- top:按大写 H 切换是否显示线程
- top -H -p
:查看某个进程(PID)的所有线程
3、Java
- jps:命令查看所有 Java 进程
- jstack
:查看某个 Java 进程(PID)的所有线程状态 - jconsole:来查看某个 Java 进程中线程的运行情况(图形界面)
4、jconsole 远程监控配置
需要以如下方式运行你的 java 类
java -Djava.rmi.server.hostname=`ip地址` -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=`连接端口` -Dcom.sun.management.jmxremote.ssl=是否安全连接 -Dcom.sun.management.jmxremote.authenticate=是否认证 java类
修改 /etc/hosts 文件将 127.0.0.1 映射至主机名
如果要认证访问,还需要做如下步骤
- 复制 jmxremote.password 文件
- 修改 jmxremote.password 和 jmxremote.access 文件的权限为 600 即文件所有者可读写
连接时填入 controlRole(用户名),R&D(密码)
四、* 原理之线程运行
1、栈与栈帧
Java Virtual Machine Stacks (Java 虚拟机栈)
我们都知道 JVM 中由堆、栈、方法区所组成,其中栈内存是给谁用的呢?其实就是线程,每个线程启动后,虚拟 机就会为其分配一块栈内存。每个栈由多个栈帧(Frame)组成,对应着每次方法调用时所占用的内存
- 每个线程只能有一个活动栈帧,对应着当前正在执行的那个方法

2、线程上下文切换(Thread Context Switch)
因为以下一些原因导致 cpu 不再执行当前的线程,转而执行另一个线程的代码
- 线程的 cpu 时间片用完
- 垃圾回收
- 有更高优先级的线程需要运行
- 线程自己调用了 sleep、yield、wait、join、park、synchronized、lock 等方法
当 Context Switch 发生时,需要由操作系统保存当前线程的状态,并恢复另一个线程的状态,Java 中对应的概念 就是程序计数器(Program Counter Register),它的作用是记住下一条 jvm 指令的执行地址,是线程私有的
- start():启动一个新线程,在新的线程运行 run 方法中的代码
- start 方法只是让线程进入就绪,里面代码不一定立刻运行(CPU 的时间片还没分给它)。每个线程对象的start方法只能调用一次,如果调用了多次会出现IllegalThreadStateException
- run():新线程启动后会调用的方法
- 如果在构造 Thread 对象时传递了 Runnable 参数,则线程启动后会调用 Runnable 中的 run 方法,否则默认不执行任何操作。但可以创建 Thread 的子类对象,来覆盖默认行为
- join():等待线程运行结束
- join(long n):等待线程运行结束,最多等待 n毫秒
- getId() :获取线程长整型的 id。id 唯一
- getName():获取线程名
- setName(String):修改线程名
- getPriority():获取线程优先级
- setPriority(int):修改线程优先级
- java中规定线程优先级是1~10 的整数,较大的优先级能提高该线程被 CPU 调度的机率
- getState():获取线程状态
- Java 中线程状态是用 6 个 enum 表示,分别为:NEW, RUNNABLE, BLOCKED, WAITING, TIMED_WAITING, TERMINATED
- isInterrupted():判断是否被打断。不会清除打断标记
- isAlive():线程是否存活(还没有运行完毕)
- interrupt():打断线程。
- 如果被打断线程正在sleep,wait,join会导致被打断的线程抛出InterruptedException,并清除打断标记;如果打断的正在运行的线程,则会设置打断标记;park的线程被打断,也会设置打断标记
- interrupted():判断当前线程是否被打断。会清除打断标记
- currentThread():获取当前正在执行的线程
- sleep(long n):让当前执行的线程休眠n毫秒,休眠时让出 cpu 的时间片给其它线程
yield() :提示线程调度器让出当前线程对CPU的使用。主要是为了测试和调试
六、start 与 run
```java public static void main(String[] args) {
Thread t1 = new Thread(“t1”) {
@Overridepublic void run() {log.debug(Thread.currentThread().getName());FileReader.read(Constants.MP4_FULL_PATH);}
};
t1.run(); log.debug(“do other things …”); }
输出: 19:39:14 [main] c.TestStart - main 19:39:14 [main] c.FileReader - read [1.mp4] start … 19:39:18 [main] c.FileReader - read [1.mp4] end … cost: 4227 ms 19:39:18 [main] c.TestStart - do other things …
程序仍在 main 线程运行, FileReader.read() 方法调用还是同步的
```javapublic static void main(String[] args) {Thread t1 = new Thread("t1") {@Overridepublic void run() {log.debug(Thread.currentThread().getName());FileReader.read(Constants.MP4_FULL_PATH);}};t1.start();log.debug("do other things ...");}输出:19:41:30 [main] c.TestStart - do other things ...19:41:30 [t1] c.TestStart - t119:41:30 [t1] c.FileReader - read [1.mp4] start ...19:41:35 [t1] c.FileReader - read [1.mp4] end ... cost: 4542 ms程序在 t1 线程运行, FileReader.read() 方法调用是异步的
小结:
- 直接调用 run 是在主线程中执行了 run,没有启动新的线程
使用 start 是启动新的线程,通过新的线程间接执行 run 中的代码
七、sleep 与 yield
1、sleep
- 调用 sleep 会让当前线程从 Running进入 Timed Waiting 状态(阻塞)
- 其它线程可以使用 interrupt 方法打断正在睡眠的线程,这时 sleep 方法会抛出 InterruptedException
- 睡眠结束后的线程未必会立刻得到执行
- 建议用 TimeUnit 的 sleep 代替 Thread 的 sleep 来获得更好的可读性
2、yield
- 调用 yield 会让当前线程从 Running 进入 Runnable就绪状态,然后调度执行其它线程
- 具体的实现依赖于操作系统的任务调度器
3、线程优先级
- 线程优先级会提示(hint)调度器优先调度该线程,但它仅仅是一个提示,调度器可以忽略它
- 如果 cpu 比较忙,那么优先级高的线程会获得更多的时间片,但 cpu 闲时,优先级几乎没作用
八、join方法详解
为什么需要 join ```java static int r = 0; public static void main(String[] args) throws InterruptedException { test1(); }
private static void test1() throws InterruptedException {
log.debug("开始");Thread t1 = new Thread(() -> {log.debug("开始");sleep(1);log.debug("结束");r = 10;});t1.start();log.debug("结果为:{}", r);log.debug("结束");
}
分析:1. 因为主线程和线程 t1 是并行执行的,t1 线程需要 1 秒之后才能算出 r=101. 而主线程一开始就要打印 r 的结果,所以只能打印出 r=0解决方法:1. 用 sleep 行不行?为什么?1. 用 join,加在 t1.start() 之后即可<a name="kxd9C"></a>## 九、**interrupt方法详解**<a name="PiCbh"></a>## 十、**不推荐的方法**<a name="BL8zt"></a>## 十一、**主线程与守护线程**<a name="RI8Nv"></a>## 十二、**五种状态**<a name="BfX63"></a>## 十三、**六种状态**<a name="kumO5"></a>## 十四、**习题**<a name="yKYHw"></a>### 3、**单核cpu**单核cpu下,线程实际还是串行执行的。操作系统中有一个组件叫做任务调度器,将cpu的时间片(windows<br />下时间片最小约为 15 毫秒)分给不同的程序使用,只是由于 cpu 在线程间(时间片很短)的切换非常快,人类感<br />觉是同时运行的。总结为一句话就是:** 微观串行,宏观并行**<br />一般会将这种线程轮流使用CPU的做法称为并发,concurrent<a name="dS47a"></a>### 4、**多核cpu**多核cp下,每个核(core)都可以调度运行线程,这时候线程可以是并行的。<a name="xAjyN"></a>### 5、**井行与并发**引Rob Pik的一段描述:<br />Ø 并发(concurrent)是同一时间应对(dealing with)多件事情的能力<br />Ø 并行(parallel)是同一时间动手做(doing)多件事情的能力<br />例子:<br />Ø 家庭主妇做饭、打扫卫生、给孩子喂奶,她一个人轮流交替做这多件事,这时就是并发;<br />Ø 家庭主妇雇了个保姆,她们一起这些事,这时既有并发,也有并行(这时会产生竞争,例如锅只有一口,一个人用锅时,另一个人就得等待)<br />Ø 雇了3个保姆,一个专做饭、一个专打扫卫生、一个专喂奶,互不干扰,这时是并行<a name="nkkkg"></a># 第三章 **共享模型之管程**<a name="JyPL6"></a>## 一、**共享带来的问题**<a name="pntxi"></a>## 二、**synchronized 解决方案**<a name="CoGh2"></a>## 三、**方法上的synchronized**<a name="mBvWx"></a>## 四、**变量的线程安全分析**<a name="Is416"></a>## 五、**习题**<a name="QinqG"></a>## 六、**Monitor概念**<a name="mYpLc"></a>## 七、**wait notify**<a name="H3hfU"></a>## 八、**wait notify的正确姿势**<a name="BTjc8"></a>## 九、**Park& Unpark**<a name="Dzw5z"></a>## 十、**重新理解线程状态转换**<a name="tpiXy"></a>## 十一、**多把锁**<a name="tZ3Pg"></a>## 十二、**活跃性**<a name="YGMNk"></a>## 十三、**ReentrantLock**<a name="adKI0"></a># 第四章 **共享模型内存**<a name="ZkwYv"></a>## 一、**Java内存模型**JMM 即 Java Memory Model,它定义了主存、工作内存抽象概念,底层对应着 CPU 寄存器、缓存、硬件内存、CPU 指令优化等。<br />JMM 体现在以下几个方面:1. 原子性 - 保证指令不会受到线程上下文切换的影响1. 可见性 - 保证指令不会受 cpu 缓存的影响1. 有序性 - 保证指令不会受 cpu 指令并行优化的影响<a name="jOp8V"></a>## 二、**可见性**<a name="irrSy"></a>### **1、退不出的循环**先来看一个现象,main 线程对 run 变量的修改对于 t 线程不可见,导致了 t 线程无法停止<br /><br /><a name="AVMl3"></a>### **2、原因分析**2.1 初始状态:t 线程刚开始从主内存读取了 run 的值到工作内存。1. 主内存:所有共享信息存储的位置1. 工作内存:每个线程私有信息存储的位置<br />2.2 因为 t 线程要频繁从主内存中读取 run 的值,JIT 编译器会将 run 的值缓存至自己工作内存中的高速缓存中,减少对主存中 run 的访问,提高效率<br /><br />2.3 1秒之后,main 线程修改了 run 的值,并同步至主存,而 t 是从自己工作内存中的高速缓存中读取这个变量的值,结果永远是旧值<br /><a name="U4Dy7"></a>### **3、解决方法****使用volatile(易变关键字)**:<br />它可以用**来修饰成员变量和静态成员变量**,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作volatile 变量都是直接操作主存```javapublic class Test32 {// 易变volatile static boolean run = true;// 锁对象final static Object lock = new Object();public static void main(String[] args) throws InterruptedException {Thread t = new Thread(()->{while(true){synchronized (lock){if(!run) {break;}}}});t.start();sleep(1);log.debug("停止 t");// 线程t会如预想的停下来run = false;System.out.println();}}
4、可见性 vs 原子性
前面例子体现的实际就是可见性,它保证的是在多个线程之间,一个线程对volatile 变量的修改对另一个线程可见, 不能保证原子性,仅用在一个写线程,多个读线程的情况: 上例从字节码理解是这样的:
getstatic run // 线程 t 获取 run truegetstatic run // 线程 t 获取 run truegetstatic run // 线程 t 获取 run truegetstatic run // 线程 t 获取 run trueputstatic run // 线程 main 修改 run 为 false, 仅此一次getstatic run // 线程 t 获取 run false
比较一下之前我们将线程安全时举的例子:两个线程一个i++ 、一个i— ,只能保证看到最新值,不能解决指令交错
注意:synchronized 语句块既可以保证代码块的原子性,也同时保证代码块内变量的可见性。但缺点是synchronized 是属于重量级操作,性能相对更低

5、原理之CPU缓存结构
5.1 CPU缓存结构

查看 cpu 缓存
⚡ root@yihang01 ~ lscpuArchitecture: x86_64CPU op-mode(s): 32-bit, 64-bitByte Order: Little EndianCPU(s): 1On-line CPU(s) list: 0Thread(s) per core: 1Core(s) per socket: 1Socket(s): 1NUMA node(s): 1Vendor ID: GenuineIntelCPU family: 6Model: 142Model name: Intel(R) Core(TM) i7-8565U CPU @ 1.80GHzStepping: 11CPU MHz: 1992.002BogoMIPS: 3984.00Hypervisor vendor: VMwareVirtualization type: fullL1d cache: 32KL1i cache: 32KL2 cache: 256KL3 cache: 8192KNUMA node0 CPU(s): 0
速度比较
查看 cpu 缓存行
⚡ root@yihang01 ~ cat /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size64
cpu 拿到的内存地址格式是这样的
[高位组标记][低位索引][偏移量]
5.2 CPU 缓存读
读取数据流程如下 :
- 根据低位,计算在缓存中的索引
判断是否有效
E、S、M 状态的缓存行都可以满足 CPU 的读请求
- E 状态的缓存行,有写请求,会将状态改为 M,这时并不触发向主存的写
- E 状态的缓存行,必须监听该缓存行的读操作,如果有,要变为 S 状态

- M 状态的缓存行,必须监听该缓存行的读操作,如果有,先将其它缓存(S 状态)中该缓存行变成 I 状态(即 6. 的流程),写入主存,自己变为 S 状态
- S 状态的缓存行,有写请求,走 4. 的流程
- S 状态的缓存行,必须监听该缓存行的失效操作,如果有,自己变为 I 状态
- I 状态的缓存行,有读请求,必须从主存读取
5.4 内存屏障
Memory Barrier(Memory Fence)
可见性:
- 写屏障(sfence)保证在该屏障之前的,对共享变量的改动,都同步到主存当中
- 而读屏障(lfence)保证在该屏障之后,对共享变量的读取,加载的是主存中最新数据
有序性:
- 写屏障会确保指令重排序时,不会将写屏障之前的代码排在写屏障之后
- 读屏障会确保指令重排序时,不会将读屏障之后的代码排在读屏障之前
6、模式之两阶段终止:P138
7、模式之Balking:P139
三、有序性
JVM 会在不影响正确性的前提下,可以调整语句的执行顺序,思考下面一段代码
static int i;static int j;// 在某个线程内执行如下赋值操作i = ...;j = ...;
可以看到,至于是先执行 i 还是 先执行 j ,对最终的结果不会产生影响。所以,上面代码真正执行时,
既可以是:i = ...;j = ...;也可以是:j = ...;i = ...;
这种特性称之为『指令重排』,多线程下『指令重排』会影响正确性。为什么要有重排指令这项优化呢?从 CPU执行指令的原理来理解一下吧
1、原理之指令级并行
1.1 名词
Clock Cycle Time
主频的概念大家接触的比较多,而 CPU 的 Clock Cycle Time(时钟周期时间),等于主频的倒数,意思是 CPU 能够识别的最小时间单位,比如说 4G 主频的 CPU 的 Clock Cycle Time 就是 0.25 ns,作为对比,我们墙上挂钟的 Cycle Time 是 1s
例如,运行一条加法指令一般需要一个时钟周期时间
CPI
有的指令需要更多的时钟周期时间,所以引出了 CPI (Cycles Per Instruction)指令平均时钟周期数
IPC
IPC(Instruction Per Clock Cycle)即 CPI 的倒数,表示每个时钟周期能够运行的指令数
CPU 执行时间
程序的 CPU 执行时间,即我们前面提到的 user + system 时间,可以用下面的公式来表示
程序 CPU 执行时间 = 指令数 * CPI * Clock Cycle Time
1.2 鱼罐头的故事
加工一条鱼需要 50 分钟,只能一条鱼、一条鱼顺序加工… 
可以将每个鱼罐头的加工流程细分为 5 个步骤:
- 去鳞清洗 10分钟
- 蒸煮沥水 10分钟
- 加注汤料 10分钟
- 杀菌出锅 10分钟
- 真空封罐 10分钟

即使只有一个工人,最理想的情况是:他能够在 10 分钟内同时做好这 5 件事,因为对第一条鱼的真空装罐,不会影响对第二条鱼的杀菌出锅…
1.3 指令重排序优化
事实上,现代处理器会设计为一个时钟周期完成一条执行时间最长的 CPU 指令。为什么这么做呢?可以想到指令还可以再划分成一个个更小的阶段,例如,每条指令都可以分为: 取指令 - 指令译码 - 执行指令 - 内存访问 - 数据写回 这 5 个阶段 
术语参考:
- instruction fetch (IF)
- instruction decode (ID)
- execute (EX)
- memory access (MEM)
- register write back (WB)
在不改变程序结果的前提下,这些指令的各个阶段可以通过重排序和组合来实现指令级并行,这一技术在 80’s 中叶到 90’s 中叶占据了计算架构的重要地位。
提示:
- 分阶段,分工是提升效率的关键!
指令重排的前提是,重排指令不能影响结果,例如
// 可以重排的例子int a = 10; // 指令1int b = 20; // 指令2System.out.println( a + b );// 不能重排的例子int a = 10; // 指令1int b = a - 5; // 指令2
1.4 支持流水线的处理器
现代 CPU 支持多级指令流水线,例如支持同时执行取指令 - 指令译码 - 执行指令 - 内存访问 - 数据写回 的处理器,就可以称之为五级指令流水线。这时 CPU 可以在一个时钟周期内,同时运行五条指令的不同阶段(相当于一条执行时间最长的复杂指令),IPC = 1,本质上,流水线技术并不能缩短单条指令的执行时间,但它变相地提高了指令地吞吐率。
提示:
- 奔腾四(Pentium 4)支持高达 35 级流水线,但由于功耗太高被废弃
1.5 SuperScalar 处理器
大多数处理器包含多个执行单元,并不是所有计算功能都集中在一起,可以再细分为整数运算单元、浮点数运算单元等,这样可以把多条指令也可以做到并行获取、译码等,CPU 可以在一个时钟周期内,执行多于一条指令,IPC > 1 
2、诡异的结果
int num = 0;boolean ready = false;// 线程1:执行此方法public void actor1(I_Result r) {if(ready) {r.r1 = num + num;} else {r.r1 = 1;}}// 线程2:执行此方法public void actor2(I_Result r) {num = 2;ready = true; // 由于指令重排的存在,可能导致线程写2先执行ready = true; 再执行num = 2}
I_Result是一个对象,有一个属性 r1 用来保存结果,问,可能的结果有几种?
有同学这么分析
- 情况1:线程1 先执行,这时 ready = false,所以进入 else 分支结果为 1
- 情况2:线程2 先执行 num = 2,但没来得及执行ready=true,线程1执行,还是进入else分支,结果为1
- 情况3:线程2 执行到 ready = true,线程1 执行,这回进入if分支,结果为4(因为num已经执行过了)
但我告诉你,结果还有可能是 0,信不信吧!
- 这种情况下是:线程2 执行 ready = true,切换到线程1,进入 if 分支,相加为 0,再切回线程2 执行 num = 2相信很多人已经晕了
- 这种现象叫做指令重排,是 JIT 编译器在运行时的一些优化,这个现象需要通过大量测试才能复现
借助 java 并发压测工具 jcstress https://wiki.openjdk.java.net/display/CodeTools/jcstress
mvn archetype:generate -DinteractiveMode=false -DarchetypeGroupId=org.openjdk.jcstress -DarchetypeArtifactId=jcstress-java-test-archetype -DarchetypeVersion=0.5 -DgroupId=cn.itcast -DartifactId=ordering -Dversion=1.0
创建 maven 项目,提供如下测试类
@JCStressTest@Outcome(id = {"1", "4"}, expect = Expect.ACCEPTABLE, desc = "ok")@Outcome(id = "0", expect = Expect.ACCEPTABLE_INTERESTING, desc = "!!!!")@Statepublic class ConcurrencyTest {int num = 0;boolean ready = false;@Actorpublic void actor1(I_Result r) {if(ready) {r.r1 = num + num;} else {r.r1 = 1;}}@Actorpublic void actor2(I_Result r) {num = 2;ready = true;}}
执行:
mvn clean installjava -jar target/jcstress.jar
解决方法
volatile修饰的变量,可以禁用指令重排,只需要在ready上加volatile即可,便可以防止ready=true之前的语句被执行重排。(所以不需要在num上加volatile)
3、原理之volatile:P146
4、happens-before
happens-before 规定了对共享变量的写操作对其它线程的读操作可见,它是可见性与有序性的一套规则总结,抛开以下 happens-before 规则,JMM 并不能保证一个线程对共享变量的写,对于其它线程对该共享变量的读可见
4.1 线程解锁 m 之前对变量的写,对于接下来对 m 加锁的其它线程对该变量的读可见 ```java static int x; static Object m = new Object();
new Thread(()->{ synchronized(m) { x = 10; } },”t1”).start();
new Thread(()->{ synchronized(m) { System.out.println(x); } },”t2”).start();
**4.2 线程对 volatile 变量的写,对接下来其它线程对该变量的读可见**```javavolatile static int x;new Thread(()->{x = 10;},"t1").start();new Thread(()->{System.out.println(x);},"t2").start();
4.3 线程 start 前对变量的写,对该线程开始后对该变量的读可见
static int x;x = 10;new Thread(()->{System.out.println(x);},"t2").start();
4.4 线程结束前对变量的写,对其它线程得知它结束后的读可见(比如其它线程调用 t1.isAlive() 或 t1.join()等待它结束)
static int x;Thread t1 = new Thread(()->{x = 10;},"t1");t1.start();t1.join();System.out.println(x);
4.5 线程 t1 打断 t2(interrupt)前对变量的写,对于其他线程得知 t2 被打断后对变量的读可见(通过
t2.interrupted 或 t2.isInterrupted)
static int x;public static void main(String[] args) {Thread t2 = new Thread(()->{while(true) {if(Thread.currentThread().isInterrupted()) {System.out.println(x);break;}}},"t2");t2.start();new Thread(()->{sleep(1);x = 10;t2.interrupt();},"t1").start();while(!t2.isInterrupted()) {Thread.yield();}System.out.println(x);}
4.6 对变量默认值(0,false,null)的写,对其它线程对该变量的读可见
4.7 具有传递性,如果 x hb-> y 并且 y hb-> z 那么有 x hb-> z ,配合 volatile 的防指令重排,有下面的例子
volatile static int x;static int y;new Thread(()->{y = 10;x = 20;},"t1").start();new Thread(()->{// x=20 对 t2 可见, 同时 y=10 也对 t2 可见System.out.println(x);},"t2").start();
5、习题
5.1 balking模式习题
希望 doInit() 方法仅被调用一次,下面的实现是否有问题,为什么?(有问题)
public class TestVolatile {volatile boolean initialized = false;void init() {if (initialized) { // 读取共享变量return;}doInit();initialized = true; // 写入共享变量}private void doInit() {}}// volatile只能保证可见性,并不能保证原子性。所以多个线程执行时,会多次调用doInit();// 此时可以采用同步代码块包裹initialized,使得doInit()方法仅被调用一次
5.2 线程安全单例习题:P154
单例模式有很多实现方法,饿汉、懒汉、静态内部类、枚举类,试分析每种实现下获取单例对象(即调用 getInstance)时的线程安全,并思考注释中的问题
- 饿汉式:类加载就会导致该单实例对象被创建
- 懒汉式:类加载不会导致该单实例对象被创建,而是首次使用该对象时才会创建
实现1:
// 问题1:为什么加 final:担心子类覆盖它的方法,破坏单例// 问题2:如果实现了序列化接口, 还要做什么来防止反序列化破坏单例:public final class Singleton implements Serializable {// 问题3:为什么设置为私有? 是否能防止反射创建新的实例?// 解决问题3-1:设置为私有防止其他类来创建这个对象// 解决问题3-2:不能。即使设置为私有,也不能防止反射来创建新实例。暴力反射得到构造对象private Singleton() {}// 问题4:这样初始化是否能保证单例对象创建时的线程安全?:是线程安全的private static final Singleton INSTANCE = new Singleton();// 问题5:为什么提供静态方法而不是直接将 INSTANCE 设置为 public, 说出你知道的理由public static Singleton getInstance() {return INSTANCE;}// 解决问题2。直接使用readResolve()方法返回的对象,而不是反序列化字节码生成的对象public Object readResolve() {return INSTANCE; // 返回单例对象}}
实现2:
// 问题1:枚举单例是如何限制实例个数的:枚举类的静态成员变量// 问题2:枚举单例在创建时是否有并发问题// 问题3:枚举单例能否被反射破坏单例// 问题4:枚举单例能否被反序列化破坏单例// 问题5:枚举单例属于懒汉式还是饿汉式// 问题6:枚举单例如果希望加入一些单例创建时的初始化逻辑该如何做enum Singleton {INSTANCE;}
实现3:
public final class Singleton {private Singleton() { }private static Singleton INSTANCE = null;// 分析这里的线程安全, 并说明有什么缺点public static synchronized Singleton getInstance() {if( INSTANCE != null ){return INSTANCE;}INSTANCE = new Singleton();return INSTANCE;}}
实现4:DCL
public final class Singleton {private Singleton() { }// 问题1:解释为什么要加 volatile ?private static volatile Singleton INSTANCE = null;// 问题2:对比实现3, 说出这样做的意义public static Singleton getInstance() {if (INSTANCE != null) {return INSTANCE;}synchronized (Singleton.class) {// 问题3:为什么还要在这里加为空判断, 之前不是判断过了吗if (INSTANCE != null) { // t2return INSTANCE;}INSTANCE = new Singleton();return INSTANCE;}}}
实现5:
public final class Singleton {private Singleton() { }// 问题1:属于懒汉式还是饿汉式private static class LazyHolder {static final Singleton INSTANCE = new Singleton();}// 问题2:在创建时是否有并发问题public static Singleton getInstance() {return LazyHolder.INSTANCE;}}
本章小结
本章重点讲解了 JMM 中的
- 可见性 - 由 JVM 缓存优化引起
- 有序性 - 由 JVM 指令重排序优化引起
- happens-before 规则
- 原理方面
- CPU 指令并行
- volatile
模式方面
- 两阶段终止模式的 volatile 改进
-
第五章 共享模型之无锁
一、问题提出
有如下需求,保证 account.withdraw 取款方法的线程安全
interface Account {// 获取余额Integer getBalance();// 取款void withdraw(Integer amount);/*** 方法内会启动 1000 个线程,每个线程做 -10 元 的操作* 如果初始余额为 10000 那么正确的结果应当是 0*/static void demo(Account account) {List<Thread> ts = new ArrayList<>();long start = System.nanoTime();for (int i = 0; i < 1000; i++) {ts.add(new Thread(() -> {account.withdraw(10);}));}ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(account.getBalance() + " cost: " + (end-start)/1000_000 + " ms");}}
原有实现并不是线程安全的:
class AccountUnsafe implements Account {private Integer balance;public AccountUnsafe(Integer balance) {this.balance = balance;}@Overridepublic Integer getBalance() {return balance;}@Overridepublic void withdraw(Integer amount) { // 存在线程安全问题balance -= amount;}}
```java public static void main(String[] args) { Account.demo(new AccountUnsafe(10000)); }
某次的执行结果:330 cost: 306 ms
<a name="cfsTN"></a>### 1、**为什么不安全**1. 单核的指令交错1. 多核的指令交错```javapublic void withdraw(Integer amount) {balance -= amount;}
2、解决思路 - 加锁
首先想到的是给 Account 对象加锁synchronized
class AccountUnsafe implements Account {private Integer balance;public AccountUnsafe(Integer balance) {this.balance = balance;}@Overridepublic synchronized Integer getBalance() {return balance;}@Overridepublic synchronized void withdraw(Integer amount) {balance -= amount;}}结果为:0 cost: 399 ms
3、解决思路 - 无锁
class AccountSafe implements Account {// 原子整数:AtomicIntegerprivate AtomicInteger balance;public AccountSafe(Integer balance) {this.balance = new AtomicInteger(balance);}@Overridepublic Integer getBalance() {return balance.get();}@Overridepublic void withdraw(Integer amount) {while (true) {// 获取余额最新值int prev = balance.get();// 要修改的余额int next = prev - amount;// 真正修改if (balance.compareAndSet(prev, next)) {break;}}// 可以简化为下面的方法// balance.addAndGet(-1 * amount);}}
public static void main(String[] args) {Account.demo(new AccountSafe(10000));}某次的执行结果:0 cost: 302 ms
二、CAS与volatile
前面看到的 AtomicInteger 的解决方法,内部并没有用锁来保护共享变量的线程安全。那么它是如何实现的呢?compareAndSet:CPU指令级别实现原子性
public void withdraw(Integer amount) {while(true) {// 需要不断尝试,直到成功为止while (true) {// 比如拿到了旧值 1000int prev = balance.get();// 在这个基础上 1000-10 = 990int next = prev - amount;/*compareAndSet 正是做这个检查,在 set 前,先比较 prev 与当前值- 不一致了,next 作废,返回 false 表示失败比如,别的线程已经做了减法,当前值已经被减成了 990那么本线程的这次 990 就作废了,进入 while 下次循环重试- 一致,以 next 设置为新值,返回 true 表示成功*/if (balance.compareAndSet(prev, next)) {break;}}}}
其中的关键是compareAndSet,它的简称就是 CAS (也有 Compare And Swap 的说法),它必须是原子操作
注意:其实CAS的底层是lock cmpxchg指令(X86 架构),在单核CPU和多核CPU下都能够保证【比较-交换】的原子性。
在多核状态下,某个核执行到带lock的指令时,CPU 会让总线锁住,当这个核把此指令执行完毕,再开启总线。这个过程中不会被线程的调度机制所打断,保证了多个线程对内存操作的准确性,是原子的。
1、慢动作分析
@Slf4jpublic class SlowMotion {public static void main(String[] args) {AtomicInteger balance = new AtomicInteger(10000);int mainPrev = balance.get();log.debug("try get {}", mainPrev);new Thread(() -> {sleep(1000);int prev = balance.get();balance.compareAndSet(prev, 9000);log.debug(balance.toString());}, "t1").start();sleep(2000);log.debug("try set 8000...");boolean isSuccess = balance.compareAndSet(mainPrev, 8000);log.debug("is success ? {}", isSuccess);if(!isSuccess){mainPrev = balance.get();log.debug("try set 8000...");isSuccess = balance.compareAndSet(mainPrev, 8000);log.debug("is success ? {}", isSuccess);}}private static void sleep(int millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {e.printStackTrace();}}}输出结果:2019-10-13 11:28:37.134 [main] try get 100002019-10-13 11:28:38.154 [t1] 90002019-10-13 11:28:39.154 [main] try set 8000...2019-10-13 11:28:39.154 [main] is success ? false2019-10-13 11:28:39.154 [main] try set 8000...2019-10-13 11:28:39.154 [main] is success ? true
2、Volatile
- 获取共享变量时,为了保证该变量的可见性,需要使用 volatile 修饰。
- 它可以用来修饰成员变量和静态成员变量,他可以避免线程从自己的工作缓存中查找变量的值,必须到主存中获取它的值,线程操作 volatile 变量都是直接操作主存。即一个线程对 volatile 变量的修改,对另一个线程可见。
- 注意:
- volatile仅仅保证了共享变量的可见性,让其它线程能够看到最新值,但不能解决指令交错问题(不能保证原子性)
CAS 必须借助 volatile 才能读取到共享变量的最新值来实现【比较并交换】的效果
3、为什么无锁效率高
无锁情况下,即使重试失败,线程始终在高速运行,没有停歇,而synchronized会让线程在没有获得锁的时候,发生上下文切换(成本较高),进入阻塞。打个比喻:
- 线程就好像高速跑道上的赛车,高速运行时,速度超快,一旦发生上下文切换,就好比赛车要减速、熄火,等被唤醒又得重新打火、启动、加速… 恢复到高速运行,代价比较大
- 但无锁情况下,因为线程要保持运行,需要额外CPU 的支持,CPU 在这里就好比高速跑道,没有额外的跑道,线程想高速运行也无从谈起,虽然不会进入阻塞,但由于没有分到时间片,仍然会进入可运行状态,还是会导致上下文切换。CAS线程数少于CPU核心数时,CAS效率较高;线程数高于CPU核心数时;CAS效率一般
4、CAS的特点
结合 CAS 和 volatile 可以实现无锁并发,适用于线程数少、多核 CPU 的场景下。
- CAS 是基于乐观锁的思想:最乐观的估计,不怕别的线程来修改共享变量,就算改了也没关系,我吃亏点再重试呗。
- synchronized 是基于悲观锁的思想:最悲观的估计,得防着其它线程来修改共享变量,我上了锁你们都别想改,我改完了解开锁,你们才有机会。
CAS 体现的是无锁并发、无阻塞并发,请仔细体会这两句话的意思
// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++ System.out.println(i.getAndIncrement());
// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i System.out.println(i.incrementAndGet());
// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 —i System.out.println(i.decrementAndGet());
// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i— System.out.println(i.getAndDecrement());
// 获取并加值(i = 0, 结果 i = 5, 返回 0) System.out.println(i.getAndAdd(5));
// 加值并获取(i = 5, 结果 i = 0, 返回 0) System.out.println(i.addAndGet(-5));
// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0) // 其中函数中的操作能保证原子,但函数需要无副作用 System.out.println(i.getAndUpdate(p -> p - 2));
// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0) // 其中函数中的操作能保证原子,但函数需要无副作用 System.out.println(i.updateAndGet(p -> p + 2));
// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0) // 其中函数中的操作能保证原子,但函数需要无副作用 // getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的 // getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0) // 其中函数中的操作能保证原子,但函数需要无副作用 System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));
```javapublic class Test34 {public static void main(String[] args) {AtomicInteger i = new AtomicInteger(5);// System.out.println(i.incrementAndGet()); // ++i 1// System.out.println(i.getAndIncrement()); // i++ 2// 当前打印的是i的值,要想获得计算后的值,需要用i.get()获取。2 , 7// System.out.println(i.getAndAdd(5));// System.out.println(i.addAndGet(5)); // 12, 12// 读取到 设置值// i.updateAndGet(value -> value * 10); 先运算,再获取// i.getAndUpdate(value -> value * 10); 先获取,再运算System.out.println(updateAndGet(i, p -> p / 2));System.out.println(i.get());}/*** 自定义方法* @param i* @param operator 操作* @return*/public static int updateAndGet(AtomicInteger i, IntUnaryOperator operator) {while (true) {// 获取当前值int prev = i.get();// 根据当前值计算,根据prev得到nextint next = operator.applyAsInt(prev);// 如果prev与当前线程中的值(共享变量)一致,即共享变量没有被修改,则共享变量更新为为nextif (i.compareAndSet(prev, next)) {return next;}}}}
四、原子引用
为什么需要原子引用类型? 保证共享变量线程安全
- AtomicReference
- AtomicMarkableReference
AtomicStampedReference
public interface DecimalAccount {// 获取余额BigDecimal getBalance();// 取款void withdraw(BigDecimal amount);/*** 方法内会启动 1000 个线程,每个线程做 -10 元 的操作* 如果初始余额为 10000 那么正确的结果应当是 0*/static void demo(DecimalAccount account) {List<Thread> ts = new ArrayList<>();for (int i = 0; i < 1000; i++) {ts.add(new Thread(() -> {account.withdraw(BigDecimal.TEN);}));}ts.forEach(Thread::start);ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(account.getBalance());}}
试着提供不同的 DecimalAccount 实现,实现安全的取款操作
1、不安全实现
class DecimalAccountUnsafe implements DecimalAccount {BigDecimal balance;public DecimalAccountUnsafe(BigDecimal balance) {this.balance = balance;}@Overridepublic BigDecimal getBalance() {return balance;}@Overridepublic void withdraw(BigDecimal amount) {BigDecimal balance = this.getBalance();this.balance = balance.subtract(amount);}}
2、安全实现 - 使用锁
class DecimalAccountSafeLock implements DecimalAccount {private final Object lock = new Object();BigDecimal balance;public DecimalAccountSafeLock(BigDecimal balance) {this.balance = balance;}@Overridepublic BigDecimal getBalance() {return balance;}@Overridepublic void withdraw(BigDecimal amount) {synchronized (lock) {BigDecimal balance = this.getBalance();this.balance = balance.subtract(amount);}}}
3、安全实现 - 使用CAS
class DecimalAccountSafeCas implements DecimalAccount {AtomicReference<BigDecimal> ref;public DecimalAccountSafeCas(BigDecimal balance) {ref = new AtomicReference<>(balance);}@Overridepublic BigDecimal getBalance() {return ref.get();}@Overridepublic void withdraw(BigDecimal amount) {while (true) {BigDecimal prev = ref.get();BigDecimal next = prev.subtract(amount);if (ref.compareAndSet(prev, next)) {break;}}}}
```java DecimalAccount.demo(new DecimalAccountUnsafe(new BigDecimal(“10000”))); DecimalAccount.demo(new DecimalAccountSafeLock(new BigDecimal(“10000”))); DecimalAccount.demo(new DecimalAccountSafeCas(new BigDecimal(“10000”)));
运行结果: 4310 cost: 425 ms 0 cost: 285 ms 0 cost: 274 ms
<a name="jjscX"></a>### 4、**ABA问题及解决**<a name="M5YJO"></a>#### **4.1 ABA 问题**```javastatic AtomicReference<String> ref = new AtomicReference<>("A");public static void main(String[] args) throws InterruptedException {log.debug("main start...");// 获取值 A// 这个共享变量被它线程修改过?String prev = ref.get();other();sleep(1);// 尝试改为 Clog.debug("change A->C {}", ref.compareAndSet(prev, "C"));}private static void other() {new Thread(() -> {log.debug("change A->B {}", ref.compareAndSet(ref.get(), "B"));}, "t1").start();sleep(0.5);new Thread(() -> {log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));}, "t2").start();}输出:11:29:52.325 c.Test36 [main] - main start...11:29:52.379 c.Test36 [t1] - change A->B true11:29:52.879 c.Test36 [t2] - change B->A true11:29:53.880 c.Test36 [main] - change A->C true
主线程仅能判断出共享变量的值与最初值 A 是否相同,不能感知到这种从 A 改为 B 又改回 A 的情况,如果主线程希望:
只要有其它线程【动过了】共享变量,那么自己的 cas 就算失败,这时,仅比较值是不够的,需要再加一个版本号
4.2 解决方案一:AtomicStampedReference
AtomicStampedReference 可以给原子引用加上版本号,追踪原子引用整个的变化过程,如: A -> B -> A -> C ,通过AtomicStampedReference,我们可以知道,引用变量中途被更改了几次。
static AtomicStampedReference<String> ref = new AtomicStampedReference<>("A", 0);public static void main(String[] args) throws InterruptedException {log.debug("main start...");// 获取值 AString prev = ref.getReference();// 获取版本号int stamp = ref.getStamp();log.debug("版本 {}", stamp);// 如果中间有其它线程干扰,发生了 ABA 现象other();sleep(1);// 尝试改为 Clog.debug("change A->C {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));}private static void other() {new Thread(() -> {log.debug("change A->B {}", ref.compareAndSet(ref.getReference(), "B",ref.getStamp(), ref.getStamp() + 1));log.debug("更新版本为 {}", ref.getStamp());}, "t1").start();sleep(0.5);new Thread(() -> {log.debug("change B->A {}", ref.compareAndSet(ref.getReference(), "A",ref.getStamp(), ref.getStamp() + 1));log.debug("更新版本为 {}", ref.getStamp());}, "t2").start();}输出为:15:41:34.891 c.Test36 [main] - main start...15:41:34.894 c.Test36 [main] - 版本 015:41:34.956 c.Test36 [t1] - change A->B true15:41:34.956 c.Test36 [t1] - 更新版本为 115:41:35.457 c.Test36 [t2] - change B->A true15:41:35.457 c.Test36 [t2] - 更新版本为 215:41:36.457 c.Test36 [main] - change A->C false
4.3 解决方案二:AtomicMarkableReference
有时候并不关心引用变量更改了几次,只是单纯的关心是否更改过,所以就有了AtomicMarkableReference
class GarbageBag {String desc;public GarbageBag(String desc) {this.desc = desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return super.toString() + " " + desc;}}
@Slf4jpublic class TestABAAtomicMarkableReference {public static void main(String[] args) throws InterruptedException {GarbageBag bag = new GarbageBag("装满了垃圾");// 参数2 mark 可以看作一个标记,表示垃圾袋满了AtomicMarkableReference<GarbageBag> ref = new AtomicMarkableReference<>(bag, true);log.debug("主线程 start...");GarbageBag prev = ref.getReference();log.debug(prev.toString());new Thread(() -> {log.debug("打扫卫生的线程 start...");bag.setDesc("空垃圾袋");while (!ref.compareAndSet(bag, bag, true, false)) {}log.debug(bag.toString());}).start();Thread.sleep(1000);log.debug("主线程想换一只新垃圾袋?");boolean success = ref.compareAndSet(prev, new GarbageBag("空垃圾袋"), true, false);log.debug("换了么?" + success);log.debug(ref.getReference().toString());}}输出:2019-10-13 15:30:09.264 [main] 主线程 start...2019-10-13 15:30:09.270 [main] cn.itcast.GarbageBag@5f0fd5a0 装满了垃圾2019-10-13 15:30:09.293 [Thread-1] 打扫卫生的线程 start...2019-10-13 15:30:09.294 [Thread-1] cn.itcast.GarbageBag@5f0fd5a0 空垃圾袋2019-10-13 15:30:10.294 [main] 主线程想换一只新垃圾袋?2019-10-13 15:30:10.294 [main] 换了么?false2019-10-13 15:30:10.294 [main] cn.itcast.GarbageBag@5f0fd5a0 空垃圾袋
五、原子数组
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
有如下方法
/**参数1,提供数组、可以是线程不安全数组或线程安全数组参数2,获取数组长度的方法参数3,自增方法,回传 array, index参数4,打印数组的方法*/// supplier 提供者 无中生有 ()->结果// function 函数 一个参数一个结果 (参数)->结果 , BiFunction (参数1,参数2)->结果// consumer 消费者 一个参数没结果 (参数)->void, BiConsumer (参数1,参数2)->private static <T> void demo(Supplier<T> arraySupplier,Function<T, Integer> lengthFun,BiConsumer<T, Integer> putConsumer,Consumer<T> printConsumer ) {List<Thread> ts = new ArrayList<>();T array = arraySupplier.get();int length = lengthFun.apply(array);for (int i = 0; i < length; i++) {// 每个线程对数组作 10000 次操作ts.add(new Thread(() -> {for (int j = 0; j < 10000; j++) {putConsumer.accept(array, j%length);}}));}ts.forEach(t -> t.start()); // 启动所有线程ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}); // 等所有线程结束printConsumer.accept(array);}
1. 不安全的数组
demo(()->new int[10],(array)->array.length,(array, index) -> array[index]++,array-> System.out.println(Arrays.toString(array)));结果:[9870, 9862, 9774, 9697, 9683, 9678, 9679, 9668, 9680, 9698]
2. 安全的数组
demo(()-> new AtomicIntegerArray(10),(array) -> array.length(),(array, index) -> array.getAndIncrement(index),array -> System.out.println(array));结果:[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]
六、字段更新器
- AtomicReferenceFieldUpdater // 域字段
- AtomicIntegerFieldUpdater
- AtomicLongFieldUpdater
利用字段更新器,可以针对对象的某个域(Field)进行原子操作,只能配合 volatile 修饰的字段使用,否则会出现异常:Exception in thread “main” java.lang.IllegalArgumentException: Must be volatile type
public class Test5 {private volatile int field;public static void main(String[] args) {AtomicIntegerFieldUpdater fieldUpdater =AtomicIntegerFieldUpdater.newUpdater(Test5.class, "field");Test5 test5 = new Test5();fieldUpdater.compareAndSet(test5, 0, 10);// 修改成功 field = 10System.out.println(test5.field);// 修改成功 field = 20fieldUpdater.compareAndSet(test5, 10, 20);System.out.println(test5.field);// 修改失败 field = 20fieldUpdater.compareAndSet(test5, 10, 30);System.out.println(test5.field);}}输出:102020
七、原子累加器
1. 累加器性能比较
private static <T> void demo(Supplier<T> adderSupplier, Consumer<T> action) {T adder = adderSupplier.get();long start = System.nanoTime();List<Thread> ts = new ArrayList<>();// 4 个线程,每人累加 50 万for (int i = 0; i < 40; i++) {ts.add(new Thread(() -> {for (int j = 0; j < 500000; j++) {action.accept(adder);}}));}ts.forEach(t -> t.start());ts.forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long end = System.nanoTime();System.out.println(adder + " cost:" + (end - start)/1000_000);}
比较 AtomicLong 与 LongAdder
for (int i = 0; i < 5; i++) {demo(() -> new LongAdder(), adder -> adder.increment());}for (int i = 0; i < 5; i++) {demo(() -> new AtomicLong(), adder -> adder.getAndIncrement());}
输出:
1000000 cost:431000000 cost:91000000 cost:71000000 cost:71000000 cost:71000000 cost:311000000 cost:271000000 cost:281000000 cost:241000000 cost:22
性能提升的原因很简单,就是在有竞争时,设置多个累加单元,Therad-0 累加 Cell[0],而 Thread-1 累加 Cell[1]… 最后将结果汇总。这样它们在累加时操作的不同的 Cell 变量,因此减少了 CAS 重试失败,从而提高性能。
2. 源码之LongAdder
LongAdder 是并发大师 @author Doug Lea (大哥李)的作品,设计的非常精巧
LongAdder 类有几个关键域
// 累加单元数组, 懒惰初始化transient volatile Cell[] cells;// 基础值, 如果没有竞争, 则用 cas 累加这个域transient volatile long base;// 在 cells 创建或扩容时, 置为 1, 表示加锁transient volatile int cellsBusy;
3. cas锁
// 不要用于实践!!!public class LockCas {private AtomicInteger state = new AtomicInteger(0);public void lock() {while (true) {if (state.compareAndSet(0, 1)) {break;}}}public void unlock() {log.debug("unlock...");state.set(0);}}
测试:
LockCas lock = new LockCas();new Thread(() -> {log.debug("begin...");lock.lock();try {log.debug("lock...");sleep(1);} finally {lock.unlock();}}).start();new Thread(() -> {log.debug("begin...");lock.lock();try {log.debug("lock...");} finally {lock.unlock();}}).start();输出:18:27:07.198 c.Test42 [Thread-0] - begin...18:27:07.202 c.Test42 [Thread-0] - lock...18:27:07.198 c.Test42 [Thread-1] - begin...18:27:08.204 c.Test42 [Thread-0] - unlock...18:27:08.204 c.Test42 [Thread-1] - lock...18:27:08.204 c.Test42 [Thread-1] - unlock...
4. 原理之伪共享
其中 Cell 即为累加单元
// 防止缓存行伪共享@sun.misc.Contendedstatic final class Cell {volatile long value;Cell(long x) { value = x; }// 最重要的方法, 用来 cas 方式进行累加, prev 表示旧值, next 表示新值final boolean cas(long prev, long next) {return UNSAFE.compareAndSwapLong(this, valueOffset, prev, next);}// 省略不重要代码}
得从缓存说起
缓存与内存的速度比较

因为 CPU 与 内存的速度差异很大,需要靠预读数据至缓存来提升效率。
而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是 64 byte(8 个 long)
缓存的加入会造成数据副本的产生,即同一份数据会缓存在不同核心的缓存行中
CPU 要保证数据的一致性,如果某个 CPU 核心更改了数据,其它 CPU 核心对应的整个缓存行必须失效
因为 Cell 是数组形式,在内存中是连续存储的,一个 Cell 为 24 字节(16 字节的对象头和 8 字节的 value),因
此缓存行可以存下 2 个的 Cell 对象。这样问题来了:
Core-0 要修改 Cell[0]
Core-1 要修改 Cell[1]
无论谁修改成功,都会导致对方 Core 的缓存行失效,比如 Core-0 中 Cell[0]=6000, Cell[1]=8000 要累加
Cell[0]=6001, Cell[1]=8000 ,这时会让 Core-1 的缓存行失效
@sun.misc.Contended 用来解决这个问题,它的原理是在使用此注解的对象或字段的前后各增加 128 字节大小的 padding,从而让 CPU 将对象预读至缓存时占用不同的缓存行,这样,不会造成对方缓存行的失效
累加主要调用下面的方法
public void add(long x) {// as 为累加单元数组// b 为基础值// x 为累加值Cell[] as; long b, v; int m; Cell a;// 进入 if 的两个条件// 1. as 有值, 表示已经发生过竞争, 进入 if// 2. cas 给 base 累加时失败了, 表示 base 发生了竞争, 进入 ifif ((as = cells) != null || !casBase(b = base, b + x)) {// uncontended 表示 cell 没有竞争boolean uncontended = true;if (// as 还没有创建as == null || (m = as.length - 1) < 0 ||// 当前线程对应的 cell 还没有(a = as[getProbe() & m]) == null ||// cas 给当前线程的 cell 累加失败 uncontended=false ( a 为当前线程的 cell )!(uncontended = a.cas(v = a.value, v + x))) {// 进入 cell 数组创建、cell 创建的流程longAccumulate(x, null, uncontended);}}}
add 流程图
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {int h;// 当前线程还没有对应的 cell, 需要随机生成一个 h 值用来将当前线程绑定到 cellif ((h = getProbe()) == 0) {// 初始化 probeThreadLocalRandom.current();// h 对应新的 probe 值, 用来对应 cellh = getProbe();wasUncontended = true;}// collide 为 true 表示需要扩容boolean collide = false;for (;;) {Cell[] as; Cell a; int n; long v;// 已经有了 cellsif ((as = cells) != null && (n = as.length) > 0) {// 还没有 cellif ((a = as[(n - 1) & h]) == null) {// 为 cellsBusy 加锁, 创建 cell, cell 的初始累加值为 x// 成功则 break, 否则继续 continue 循环}// 有竞争, 改变线程对应的 cell 来重试 caselse if (!wasUncontended)wasUncontended = true;// cas 尝试累加, fn 配合 LongAccumulator 不为 null, 配合 LongAdder 为 nullelse if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))break;// 如果 cells 长度已经超过了最大长度, 或者已经扩容, 改变线程对应的 cell 来重试 caselse if (n >= NCPU || cells != as)collide = false;// 确保 collide 为 false 进入此分支, 就不会进入下面的 else if 进行扩容了else if (!collide)collide = true;// 加锁else if (cellsBusy == 0 && casCellsBusy()) {// 加锁成功, 扩容continue;}// 改变线程对应的 cellh = advanceProbe(h);}// 还没有 cells, 尝试给 cellsBusy 加锁else if (cellsBusy == 0 && cells == as && casCellsBusy()) {// 加锁成功, 初始化 cells, 最开始长度为 2, 并填充一个 cell// 成功则 break;}// 上两种情况失败, 尝试给 base 累加else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))break;}}
longAccumulate 流程图

每个线程刚进入 longAccumulate 时,会尝试对应一个 cell 对象(找到一个坑位)
获取最终结果通过 sum 方法
public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum; }
八、Unsafe
概述
Unsafe 对象提供了非常底层的,操作内存、线程的方法,Unsafe 对象不能直接调用,只能通过反射获得
public class UnsafeAccessor {static Unsafe unsafe;static {try {Field theUnsafe = Unsafe.class.getDeclaredField("theUnsafe");theUnsafe.setAccessible(true);unsafe = (Unsafe) theUnsafe.get(null);} catch (NoSuchFieldException | IllegalAccessException e) {throw new Error(e);}}static Unsafe getUnsafe() {return unsafe;}}
@Dataclass Student {volatile int id;volatile String name;}
Unsafe unsafe = UnsafeAccessor.getUnsafe();Field id = Student.class.getDeclaredField("id");Field name = Student.class.getDeclaredField("name");// 获得成员变量的偏移量long idOffset = UnsafeAccessor.unsafe.objectFieldOffset(id);long nameOffset = UnsafeAccessor.unsafe.objectFieldOffset(name);Student student = new Student();// 使用 cas 方法替换成员变量的值UnsafeAccessor.unsafe.compareAndSwapInt(student, idOffset, 0, 20); // 返回 trueUnsafeAccessor.unsafe.compareAndSwapObject(student, nameOffset, null, "张三"); // 返回 trueSystem.out.println(student);输出:Student(id=20, name=张三)
使用自定义的 AtomicData 实现之前线程安全的原子整数 Account 实现
class AtomicData {private volatile int data;static final Unsafe unsafe;static final long DATA_OFFSET;static {unsafe = UnsafeAccessor.getUnsafe();try {// data 属性在 DataContainer 对象中的偏移量,用于 Unsafe 直接访问该属性DATA_OFFSET = unsafe.objectFieldOffset(AtomicData.class.getDeclaredField("data"));} catch (NoSuchFieldException e) {throw new Error(e);}}public AtomicData(int data) {this.data = data;}public void decrease(int amount) {int oldValue;while(true) {// 获取共享变量旧值,可以在这一行加入断点,修改 data 调试来加深理解oldValue = data;// cas 尝试修改 data 为 旧值 + amount,如果期间旧值被别的线程改了,返回 falseif (unsafe.compareAndSwapInt(this, DATA_OFFSET, oldValue, oldValue - amount)) {return;}}}public int getData() {return data;}}
Account 实现
Account.demo(new Account() {AtomicData atomicData = new AtomicData(10000);@Overridepublic Integer getBalance() {return atomicData.getData();}@Overridepublic void withdraw(Integer amount) {atomicData.decrease(amount);}});
本章小结
- CAS 与 volatile
- API
- 原子整数
- 原子引用
- 原子数组
- 字段更新器
- 原子累加器
- Unsafe
* 原理方面
- LongAdder 源码
- 伪共享
第六章 共享模型之不可变
一、日期转换的问题
1、问题提出
下面的代码在运行时,由于 SimpleDateFormat 不是线程安全的
有很大几率出现 java.lang.NumberFormatException 或者出现不正确的日期解析结果,例如:SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");for (int i = 0; i < 10; i++) {new Thread(() -> {try {log.debug("{}", sdf.parse("1951-04-21"));} catch (Exception e) {log.error("{}", e);}}).start();}
2、思路 - 同步锁
这样虽能解决问题,但带来的是性能上的损失,并不算很好:SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");for (int i = 0; i < 50; i++) {new Thread(() -> {synchronized (sdf) {try {log.debug("{}", sdf.parse("1951-04-21"));} catch (Exception e) {log.error("{}", e);}}}).start();}
3、思路 - 不可变
如果一个对象在不能够修改其内部状态(属性),那么它就是线程安全的,因为不存在并发修改啊!这样的对象在 Java 中有很多,例如在 Java 8 后,提供了一个新的日期格式化类:
可以看 DateTimeFormatter 的文档:DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");for (int i = 0; i < 10; i++) {new Thread(() -> {LocalDate date = dtf.parse("2018-10-01", LocalDate::from);log.debug("{}", date);}).start();}
@implSpecThis class is immutable and thread-safe.

不可变对象,实际是另一种避免竞争的方式。二、不可变设计
另一个大家更为熟悉的 String 类也是不可变的,以它为例,说明一下不可变设计的要素 ```java public final class String implements java.io.Serializable, Comparable, CharSequence { / The value is used for character storage. */ private final char value[]; / Cache the hash code for the string */ private int hash; // Default to 0
// …
}
<a name="FFALO"></a>### 1、**final的使用**发现该类、类中所有属性都是 final 的1. 属性用 fifinal 修饰保证了该属性是只读的,不能修改1. 类用 fifinal 修饰保证了该类中的方法不能被覆盖,防止子类无意间破坏不可变性<a name="wU4Go"></a>### 2、**保护性拷贝**但有同学会说,使用字符串时,也有一些跟修改相关的方法啊,比如 substring 等,那么下面就看一看这些方法是如何实现的,就以 substring 为例:```javapublic String substring(int beginIndex) {if (beginIndex < 0) {throw new StringIndexOutOfBoundsException(beginIndex);}int subLen = value.length - beginIndex;if (subLen < 0) {throw new StringIndexOutOfBoundsException(subLen);}return (beginIndex == 0) ? this : new String(value, beginIndex, subLen);}
发现其内部是调用 String 的构造方法创建了一个新字符串,再进入这个构造看看,是否对 final char[] value 做出了修改:
public String(char value[], int offset, int count) {if (offset < 0) {throw new StringIndexOutOfBoundsException(offset);}if (count <= 0) {if (count < 0) {throw new StringIndexOutOfBoundsException(count);}if (offset <= value.length) {this.value = "".value;return;}}if (offset > value.length - count) {throw new StringIndexOutOfBoundsException(offset + count);}this.value = Arrays.copyOfRange(value, offset, offset+count);}
结果发现也没有,构造新字符串对象时,会生成新的 char[] value,对内容进行复制 。这种通过创建副本对象来避免共享的手段称之为【保护性拷贝(defensive copy)】
3、* 模式之享元
4、* 原理之final
三、无状态
在 web 阶段学习时,设计 Servlet 时为了保证其线程安全,都会有这样的建议,不要为 Servlet 设置成员变量,这种没有任何成员变量的类是线程安全的
- 因为成员变量保存的数据也可以称为状态信息,因此没有成员变量就称之为【无状态】
本章小结
- 不可变类使用
- 不可变类设计
- 原理方面
- final
模式方面
-
第七章 共享模型之工具
一、线程池
1、自定义线程池

步骤1:自定义拒绝策略接口@FunctionalInterface // 拒绝策略interface RejectPolicy<T> {void reject(BlockingQueue<T> queue, T task);}
步骤2:自定义任务队列
@Slf4j(topic = "c.BlockingQueue")class BlockingQueue<T> {// 1. 任务队列private Deque<T> queue = new ArrayDeque<>();// 2. 锁private ReentrantLock lock = new ReentrantLock();// 3. 生产者条件变量private Condition fullWaitSet = lock.newCondition();// 4. 消费者条件变量private Condition emptyWaitSet = lock.newCondition();// 5. 容量private int capcity;public BlockingQueue(int capcity) {this.capcity = capcity;}// 带超时阻塞获取public T poll(long timeout, TimeUnit unit) {lock.lock();try {// 将 timeout 统一转换为 纳秒long nanos = unit.toNanos(timeout);while (queue.isEmpty()) {try {if (nanos <= 0) {return null;}// 返回值是剩余时间nanos = emptyWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}T t = queue.removeFirst();fullWaitSet.signal();return t;} finally {lock.unlock();}}// 阻塞获取,死等public T take() {lock.lock();try {while (queue.isEmpty()) {try {emptyWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}// 拿到并移除第一个元素T t = queue.removeFirst();// 唤醒阻塞添加fullWaitSet.signal();return t;} finally {lock.unlock();}}// 阻塞添加public void put(T task) {lock.lock();try {while (queue.size() == capcity) {try {log.debug("等待加入任务队列 {} ...", task);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}log.debug("加入任务队列 {}", task);queue.addLast(task);// 唤醒阻塞获取emptyWaitSet.signal();} finally {lock.unlock();}}// 带超时时间阻塞添加public boolean offer(T task, long timeout, TimeUnit timeUnit) {lock.lock();try {long nanos = timeUnit.toNanos(timeout);while (queue.size() == capcity) {try {if(nanos <= 0) {return false;}log.debug("等待加入任务队列 {} ...", task);nanos = fullWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();return true;} finally {lock.unlock();}}// 获取大小public int size() {lock.lock();try {return queue.size();} finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy, T task) {lock.lock();try {// 判断队列是否满if(queue.size() == capcity) {rejectPolicy.reject(this, task);} else { // 有空闲log.debug("加入任务队列 {}", task);queue.addLast(task);emptyWaitSet.signal();}} finally {lock.unlock();}}}
步骤3:自定义线程池
@Slf4j(topic = "c.ThreadPool")class ThreadPool {// 任务队列private BlockingQueue<Runnable> taskQueue;// 线程集合private HashSet<Worker> workers = new HashSet<>();// 核心线程数private int coreSize;// 获取任务时的超时时间private long timeout;private TimeUnit timeUnit;private RejectPolicy<Runnable> rejectPolicy;public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity, RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockingQueue<>(queueCapcity);this.rejectPolicy = rejectPolicy;}// 执行任务public void execute(Runnable task) {// 当任务数没有超过 coreSize 时,直接交给 worker 对象执行// 如果任务数超过 coreSize 时,加入任务队列暂存synchronized (workers) {if(workers.size() < coreSize) {Worker worker = new Worker(task);log.debug("新增 worker{}, {}", worker, task);workers.add(worker);worker.start();} else {// taskQueue.put(task);// 1) 死等// 2) 带超时等待// 3) 让调用者放弃任务执行// 4) 让调用者抛出异常// 5) 让调用者自己执行任务taskQueue.tryPut(rejectPolicy, task);}}}// 线程对象class Worker extends Thread{private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 执行任务// 1) 当 task 不为空,执行任务// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行// while(task != null || (task = taskQueue.take()) != null) {while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {try {log.debug("正在执行...{}", task);task.run();} catch (Exception e) {e.printStackTrace();} finally {task = null;}}synchronized (workers) {log.debug("worker 被移除{}", this);workers.remove(this);}}}}
步骤4:测试
@Slf4j(topic = "c.TestPool")public class TestPool {public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1,1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{// 1. 死等// queue.put(task);// 2) 带超时等待// queue.offer(task, 1500, TimeUnit.MILLISECONDS);// 3) 让调用者放弃任务执行// log.debug("放弃{}", task);// 4) 让调用者抛出异常// throw new RuntimeException("任务执行失败 " + task);// 5) 让调用者自己执行任务task.run();});for (int i = 0; i < 4; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}log.debug("{}", j);});}}}
2、Thread Pool Executor
2.1 线程池状态
ThreadPoolExecutor使用int的高3位来表示线程池状态,低29位表示线程数量
-
| 状态名 | 3位 | 接收新任务 | 处理阻塞队列任务 | 说明 |
|---|---|---|---|---|
| RUNNING | 111 | Y | Y | |
| SHUTDOWN | 000 | N | Y | 不会接收新任务,但会处理阻塞队列剩余任务 |
| STOP | 001 | N | N | 会中断正在执行的任务,并抛弃阻塞队列任务 |
| TIDYING | 010 | - | - | 任务全执行完毕,活动线程为 0 即将进入终结 |
| TERMINATED | 011 | - | - | 终结状态 |
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值
// c 为旧值, ctlOf 返回结果为新值ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们private static int ctlOf(int rs, int wc) { return rs | wc; }
2.2 构造方法
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
- corePoolSize 核心线程数目 (最多保留的线程数)
- maximumPoolSize 最大线程数目
- keepAliveTime 生存时间 - 针对救急线程
- unit 时间单位 - 针对救急线程
- workQueue 阻塞队列
- threadFactory 线程工厂 - 可以为线程创建时起个好名字
- handler 拒绝策略
2.3 工作方式


- 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
- 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。
- 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
- 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略jdk提供了4种实现,其它著名框架也提供了实现
- AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
- CallerRunsPolicy 让调用者运行任务
- DiscardPolicy 放弃本次任务
- DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
- Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方便定位问题
- Netty 的实现,是创建一个新线程来执行任务
- ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
- PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
- 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime 和 unit 来控制。

根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池
2.4 newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}
特点:
- 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
- 阻塞队列是无界的,可以放任意数量的任务
2.5 newCachedThreadPool
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}
特点:
- 核心线程数是 0,最大线程数是Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着:
- 全部都是救急线程(60s后可以回收)
- 救急线程可以无限创建
队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货) ```java @Slf4j(topic = “c.TestSynchronousQueue”) public class TestSynchronousQueue {
public static void main(String[] args) {
SynchronousQueue<Integer> integers = new SynchronousQueue<>();new Thread(() -> {try {log.debug("putting {} ", 1);integers.put(1);log.debug("{} putted...", 1);log.debug("putting...{} ", 2);integers.put(2);log.debug("{} putted...", 2);} catch (InterruptedException e) {e.printStackTrace();}},"t1").start();sleep(1);new Thread(() -> {try {log.debug("taking {}", 1);integers.take();} catch (InterruptedException e) {e.printStackTrace();}},"t2").start();sleep(1);new Thread(() -> {try {log.debug("taking {}", 2);integers.take();} catch (InterruptedException e) {e.printStackTrace();}},"t3").start();
} }
输出: 11:48:15.500 c.TestSynchronousQueue [t1] - putting 1 11:48:16.500 c.TestSynchronousQueue [t2] - taking 1 11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted… 11:48:16.500 c.TestSynchronousQueue [t1] - putting…2 11:48:17.502 c.TestSynchronousQueue [t3] - taking 2 11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted…
评价:<br />整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲分钟后释放线程。适合任务数比较密集,但每个任务执行时间较短的情况。<a name="AqQOG"></a>#### **2.6 newSingleThreadExecutor**```javapublic static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}
使用场景:
希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
区别:
- 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
- Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改
- FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法
Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改
对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改
@Slf4j(topic = "c.TestExecutors")public class TestExecutors {public static void main(String[] args) throws InterruptedException {test2();}public static void test2() {ExecutorService pool = Executors.newSingleThreadExecutor();pool.execute(() -> {log.debug("1");int i = 1 / 0;});pool.execute(() -> {log.debug("2");});pool.execute(() -> {log.debug("3");});}private static void test1() {ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {private AtomicInteger t = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "mypool_t" + t.getAndIncrement());}});pool.execute(() -> {log.debug("1");});pool.execute(() -> {log.debug("2");});pool.execute(() -> {log.debug("3");});}}
2.7 提交任务
@Slf4j(topic = "c.TestSubmit")public class TestSubmit {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(1);method3(pool);}private static void method3(ExecutorService pool) throws InterruptedException, ExecutionException {// invokeAny:提交tasks中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消String result = pool.invokeAny(Arrays.asList(() -> {log.debug("begin 1");Thread.sleep(1000);log.debug("end 1");return "1";},() -> {log.debug("begin 2");Thread.sleep(500);log.debug("end 2");return "2";},() -> {log.debug("begin 3");Thread.sleep(2000);log.debug("end 3");return "3";}));log.debug("{}", result);}private static void method2(ExecutorService pool) throws InterruptedException {// invokeAll:提交 tasks 中所有任务List<Future<String>> futures = pool.invokeAll(Arrays.asList(() -> {log.debug("begin");Thread.sleep(1000);return "1";},() -> {log.debug("begin");Thread.sleep(500);return "2";},() -> {log.debug("begin");Thread.sleep(2000);return "3";}));futures.forEach( f -> {try {log.debug("{}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});}private static void method1(ExecutorService pool) throws InterruptedException, ExecutionException {// submit:提交任务task,用返回值Future获得任务执行结果Future<String> future = pool.submit(() -> {log.debug("running....");Thread.sleep(1000);return "ok";});// get():唤醒主线程log.debug(future.get());}}
2.8 关闭线程池
Shutdown: ```java /* 线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
此方法不会阻塞调用线程的执行 */ void shutdown(); public void shutdown() {
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try {
checkShutdownAccess();// 修改线程池状态advanceRunState(SHUTDOWN);// 仅会打断空闲线程interruptIdleWorkers();onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
} // 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等) tryTerminate(); }
**shutdownNow:**java /* 线程池状态变为 STOP- 不会接收新任务
- 会将队列中的任务返回
并用 interrupt 的方式中断正在执行的任务 */ List
shutdownNow(); **其它方法**```java@Slf4j(topic = "c.TestShutDown")public class TestShutDown {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool = Executors.newFixedThreadPool(2);Future<Integer> result1 = pool.submit(() -> {log.debug("task 1 running...");Thread.sleep(1000);log.debug("task 1 finish...");return 1;});Future<Integer> result2 = pool.submit(() -> {log.debug("task 2 running...");Thread.sleep(1000);log.debug("task 2 finish...");return 2;});Future<Integer> result3 = pool.submit(() -> {log.debug("task 3 running...");Thread.sleep(1000);log.debug("task 3 finish...");return 3;});log.debug("shutdown");// 1、不会接收新任务result4 2、已提交任务会执行完 3、此方法不会阻塞调用线程后续的执行// pool.shutdown();// pool.awaitTermination(3, TimeUnit.SECONDS);// 1、不会接收新任务result4 2、会将队列中的任务result3返回 3、并用interrupt的方式中断正在执行的任务result1、result2List<Runnable> runnables = pool.shutdownNow();log.debug("other.... {}" , runnables);Future<Integer> result4 = pool.submit(() -> {log.debug("task 4 running...");Thread.sleep(1000);log.debug("task 4 finish...");return 4;});}}
* 模式之Worker Thread
1. 定义
让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式。
例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message)
注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率
例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工2. 饥饿现象
- 固定大小线程池会有饥饿现象:
- 两个工人是同一个线程池中的两个线程
- 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
- 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
- 后厨做菜:没啥说的,做就是了
- 比如工人A 处理了点餐任务,接下来它要等着 工人B 把菜做好,然后上菜,他俩也配合的蛮好
- 但现在同时来了两个客人,这个时候工人A 和工人B 都去处理点餐了,这时没人做饭了,饥饿
问题描述:
@Slf4j(topic = "c.TestDeadLock")public class TestStarvation {static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");static Random RANDOM = new Random();static String cooking() {return MENU.get(RANDOM.nextInt(MENU.size()));}public static void main(String[] args) {method1();}// 出现饥饿现象,两个点餐线程就把线程占满private static void method1() {ExecutorService pool = Executors.newFixedThreadPool(2);// 两个任务:一个点餐、一个做菜pool.execute(() -> {log.debug("处理点餐...");Future<String> f = pool.submit(() -> {log.debug("做菜");return cooking();});try {log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});pool.execute(() -> {log.debug("处理点餐...");Future<String> f = pool.submit(() -> {log.debug("做菜");return cooking();});try {log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});}}
解决方法可以增加线程池的大小,不过不是根本解决方案,还是前面提到的,不同的任务类型,采用不同的线程池,例如:
@Slf4j(topic = "c.TestDeadLock")public class TestStarvation {static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");static Random RANDOM = new Random();static String cooking() {return MENU.get(RANDOM.nextInt(MENU.size()));}public static void main(String[] args) {method2();}private static void method2() {// 分别定义两个线程池,分别处理不同的线程ExecutorService waiterPool = Executors.newFixedThreadPool(1);ExecutorService cookPool = Executors.newFixedThreadPool(1);// 定义三个线程waiterPool.execute(() -> {log.debug("处理点餐...");Future<String> f = cookPool.submit(() -> {log.debug("做菜");return cooking();});try {log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});waiterPool.execute(() -> {log.debug("处理点餐...");Future<String> f = cookPool.submit(() -> {log.debug("做菜");return cooking();});try {log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});waiterPool.execute(() -> {log.debug("处理点餐...");Future<String> f = cookPool.submit(() -> {log.debug("做菜");return cooking();});try {log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});}}
3. 创建多少线程池合适
- 过小会导致程序不能充分地利用系统资源、容易导致饥饿
-
3.1 CPU密集型运算
通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
3.2 I/O密集型运算
CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。
经验公式如下:
线程数 = 核数 期望 CPU 利用率 总时间(CPU计算时间+等待时间) / CPU 计算时间
例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式
4 100% 100% / 50% = 8
例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式
4 100% 100% / 10% = 404. 自定义线程池
2.9 任务调度线程池
在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个任务的延迟或异常都将会影响到之后的任务。
使用Timer存在的问题
@Slf4j(topic = "c.TestTimer")public class TestTimer {public static void main(String[] args) throws ExecutionException, InterruptedException {Timer timer = new Timer();TimerTask task1 = new TimerTask() {@Overridepublic void run() {log.debug("task 1");// int i = 1/0;sleep(2);}};TimerTask task2 = new TimerTask() {@Overridepublic void run() {log.debug("task 2");}};// 使用 timer 添加两个任务,希望它们都在 1s 后执行// 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行// 同时若任务1存在异常,线程2也不会执行log.debug("start...");timer.schedule(task1, 1000);timer.schedule(task2, 1000);}
使用ScheduledExecutorService
@Slf4j(topic = "c.TestTimer")public class TestTimer {public static void main(String[] args) throws ExecutionException, InterruptedException {// 延时执行任务ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);// 添加两个任务,希望它们都在 1s 后执行pool.schedule(() -> {log.debug("task1");// int i = 1 / 0;sleep(2);}, 1, TimeUnit.SECONDS);pool.schedule(() -> {log.debug("task2");}, 1, TimeUnit.SECONDS);}
scheduleAtFixedRate和scheduleWithFixedDelay
@Slf4j(topic = "c.TestTimer")public class TestTimer {public static void main(String[] args) throws ExecutionException, InterruptedException {ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);log.debug("start...");// scheduleAtFixedRate:以固定的速率执行任务 参数:初始时间(主线程启动后间隔时间开始执行)、时间间隔、单位// delay是以上一次任务耗时计算,此处的间隔为2Spool.scheduleAtFixedRate(() -> {log.debug("running...");// 任务执行时间超过了间隔时间sleep(2);}, 1, 1, TimeUnit.SECONDS);// delay是从上一次任务结束后开始计算,此处的间隔为3Spool.scheduleWithFixedDelay(() -> {log.debug("running...");sleep(2);}, 1, 1, TimeUnit.SECONDS);}
scheduleAtFixedRate输出分析:一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s
scheduleWithFixedDelay输出分析:一开始,延时 1s,scheduleWithFixedDelay 的间隔是 上一个任务结束 <-> 延时 <-> 下一个任务开始 所以间隔都是 3s
评价:
整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线程也不会被释放。用来执行延迟或反复执行的任务2.10 正确处理执行任务异常
方法1:主动捉异常
@Slf4j(topic = "c.TestTimer")public class TestTimer {public static void main(String[] args) throws ExecutionException, InterruptedException {ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);pool.schedule(() -> {try {log.debug("task1");int i = 1 / 0;} catch (Exception e) {log.error("error:", e);}}, 1, TimeUnit.SECONDS);}
方法2:使用Future
@Slf4j(topic = "c.TestTimer")public class TestTimer {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService pool1 = Executors.newFixedThreadPool(1);Future<Boolean> future = pool1.submit(() -> {log.debug("task1");int i = 1 / 0;return true;});log.debug("result:{}",future.get());}
* 应用之定时任务
2.11 Tomcat线程池
Tomcat用到的线程池:
LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
- Acceptor 只负责【接收新的 socket 连接】
- Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
- 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
- Executor 线程池中的工作线程最终负责【处理请求】
Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同
- 如果总线程数达到 maximumPoolSize
- 这时不会立刻抛 RejectedExecutionException 异常
- 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常
源码 tomcat-7.0.42
public void execute(Runnable command, long timeout, TimeUnit unit) {submittedCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException rx) {if (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();try {if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.");}} catch (InterruptedException x) {submittedCount.decrementAndGet();Thread.interrupted();throw new RejectedExecutionException(x);}} else {submittedCount.decrementAndGet();throw rx;}}}
TaskQueue.java
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent.isShutdown() )throw new RejectedExecutionException("Executor not running, can't force a command into the queue");return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the taskis rejected}
3、Fork/Join线程池
3.1 概念
- Fork/Join是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算
- 所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解
- Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
3.2 使用
提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下面定义了一个对 1~n 之间的整数求和的任务: ```java @Slf4j(topic = “c.TestForkJoin2”) public class TestForkJoin2 {
public static void main(String[] args) {
// 4个线程,若不传,则以CPU核心为线程数ForkJoinPool pool = new ForkJoinPool(4);System.out.println(pool.invoke(new MyTask(5)));// 任务拆分:// new MyTask(5):5 + new MyTask(4)// new MyTask(4):4 + new MyTask(3)// new MyTask(3):3 + new MyTask(2)// new MyTask(2):2 + new MyTask(1)
} }
// 1~n 之间整数的和
@Slf4j(topic = “c.MyTask”)
class MyTask extends RecursiveTask
private int n;public MyTask(int n) {this.n = n;}@Overridepublic String toString() {return "{" + n + '}';}@Overrideprotected Integer compute() {// 如果n为 1,可以求得结果了if (n == 1) {log.debug("join() {}", n);return n;}// 将任务进行拆分(fork)
// AddTask1 t1 = new AddTask1(n - 1); MyTask t1 = new MyTask(n - 1); // 让一个线程去执行此任务 t1.fork(); log.debug(“fork() {} + {}”, n, t1);
// 合并(join)结果int result = n + t1.join();log.debug("join() {} + {} = {}", n, t1, result);return result;}
}
然后提交给 ForkJoinPool 来执行```javapublic static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(4);System.out.println(pool.invoke(new AddTask1(5)));}结果:[ForkJoinPool-1-worker-0] - fork() 2 + {1}[ForkJoinPool-1-worker-1] - fork() 5 + {4}[ForkJoinPool-1-worker-0] - join() 1[ForkJoinPool-1-worker-0] - join() 2 + {1} = 3[ForkJoinPool-1-worker-2] - fork() 4 + {3}[ForkJoinPool-1-worker-3] - fork() 3 + {2}[ForkJoinPool-1-worker-3] - join() 3 + {2} = 6[ForkJoinPool-1-worker-2] - join() 4 + {3} = 10[ForkJoinPool-1-worker-1] - join() 5 + {4} = 1515
3.3 改进
public class TestForkJoin {public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(4);// System.out.println(pool.invoke(new AddTask1(5)));System.out.println(pool.invoke(new AddTask3(1, 5)));}}@Slf4j(topic = "c.AddTask")class AddTask1 extends RecursiveTask<Integer> {int n;public AddTask1(int n) {this.n = n;}@Overridepublic String toString() {return "{" + n + '}';}@Overrideprotected Integer compute() {if (n == 1) {log.debug("join() {}", n);return n;}AddTask1 t1 = new AddTask1(n - 1);t1.fork();log.debug("fork() {} + {}", n, t1);int result = n + t1.join();log.debug("join() {} + {} = {}", n, t1, result);return result;}}@Slf4j(topic = "c.AddTask")class AddTask2 extends RecursiveTask<Integer> {int begin;int end;public AddTask2(int begin, int end) {this.begin = begin;this.end = end;}@Overridepublic String toString() {return "{" + begin + "," + end + '}';}@Overrideprotected Integer compute() {if (begin == end) {log.debug("join() {}", begin);return begin;}if (end - begin == 1) {log.debug("join() {} + {} = {}", begin, end, end + begin);return end + begin;}int mid = (end + begin) / 2;AddTask2 t1 = new AddTask2(begin, mid - 1);t1.fork();AddTask2 t2 = new AddTask2(mid + 1, end);t2.fork();log.debug("fork() {} + {} + {} = ?", mid, t1, t2);int result = mid + t1.join() + t2.join();log.debug("join() {} + {} + {} = {}", mid, t1, t2, result);return result;}}@Slf4j(topic = "c.AddTask")class AddTask3 extends RecursiveTask<Integer> {int begin;int end;public AddTask3(int begin, int end) {this.begin = begin;this.end = end;}@Overridepublic String toString() {return "{" + begin + "," + end + '}';}@Overrideprotected Integer compute() {if (begin == end) {log.debug("join() {}", begin);return begin;}if (end - begin == 1) {log.debug("join() {} + {} = {}", begin, end, end + begin);return end + begin;}int mid = (end + begin) / 2;AddTask3 t1 = new AddTask3(begin, mid);t1.fork();AddTask3 t2 = new AddTask3(mid + 1, end);t2.fork();log.debug("fork() {} + {} = ?", t1, t2);int result = t1.join() + t2.join();log.debug("join() {} + {} = {}", t1, t2, result);return result;}}
然后提交给 ForkJoinPool 来执行
public static void main(String[] args) {ForkJoinPool pool = new ForkJoinPool(4);System.out.println(pool.invoke(new AddTask3(1, 10)));}结果:[ForkJoinPool-1-worker-0] - join() 1 + 2 = 3[ForkJoinPool-1-worker-3] - join() 4 + 5 = 9[ForkJoinPool-1-worker-0] - join() 3[ForkJoinPool-1-worker-1] - fork() {1,3} + {4,5} = ?[ForkJoinPool-1-worker-2] - fork() {1,2} + {3,3} = ?[ForkJoinPool-1-worker-2] - join() {1,2} + {3,3} = 6[ForkJoinPool-1-worker-1] - join() {1,3} + {4,5} = 1515
二、J.U.C
1、AQS原理
1.1 概述
全称是AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
特点:
- 用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取锁和释放锁
- getState - 获取 state 状态
- setState - 设置 state 状态
- compareAndSetState - cas 机制设置 state 状态
- 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
- 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList
- 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet
子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
获取锁的姿势
1.2 实现不可重入锁
@Slf4j(topic = "c.TestAqs")public class TestAqs {public static void main(String[] args) {MyLock lock = new MyLock();new Thread(() -> {lock.lock();try {log.debug("locking...");sleep(1);} finally {log.debug("unlocking...");lock.unlock();}},"t1").start();new Thread(() -> {lock.lock();try {log.debug("locking...");} finally {log.debug("unlocking...");lock.unlock();}},"t2").start();}}// 自定义锁(不可重入锁,可以挡住自己)class MyLock implements Lock {// 独占锁 同步器类class MySync extends AbstractQueuedSynchronizer {@Override // 尝试获取锁protected boolean tryAcquire(int arg) {if(compareAndSetState(0, 1)) {// 加上了锁,并设置 owner 为当前线程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}@Override // 尝试释放锁protected boolean tryRelease(int arg) {setExclusiveOwnerThread(null);setState(0); // 解锁return true;}@Override // 是否持有独占锁protected boolean isHeldExclusively() {return getState() == 1;}// 创建条件变量public Condition newCondition() {return new ConditionObject();}}private MySync sync = new MySync();@Override // 加锁(不成功会进入等待队列)public void lock() {sync.acquire(1);}@Override // 加锁,可打断public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}@Override // 尝试加锁(一次)public boolean tryLock() {return sync.tryAcquire(1);}@Override // 尝试加锁,带超时public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(time));}@Override // 解锁public void unlock() {sync.release(1);}@Override // 创建条件变量public Condition newCondition() {return sync.newCondition();}}
1.3 不可重入测试
如果改为下面代码,会发现自己也会被挡住(只会打印一次 locking)
2、ReentrantLock 原理
3、读写锁
3.1 ReentrantReadWriteLock
当读操作远远高于写操作时,这时候使用 读写锁 让 读-读 可以并发,提高性能。 类似于数据库中的 select … from … lock in share mode
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
class DataContainer {private Object data;private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();private ReentrantReadWriteLock.ReadLock r = rw.readLock();private ReentrantReadWriteLock.WriteLock w = rw.writeLock();public Object read() {log.debug("获取读锁...");r.lock();try {log.debug("读取");sleep(1);return data;} finally {log.debug("释放读锁...");r.unlock();}}public void write() {log.debug("获取写锁...");w.lock();try {log.debug("写入");sleep(1);} finally {log.debug("释放写锁...");w.unlock();}}}
测试 读锁-读锁 可以并发
DataContainer dataContainer = new DataContainer();new Thread(() -> {dataContainer.read();}, "t1").start();new Thread(() -> {dataContainer.read();}, "t2").start();
输出结果,从这里可以看到 Thread-0 锁定期间,Thread-1 的读操作不受影响
14:05:14.341 c.DataContainer [t2] - 获取读锁...14:05:14.341 c.DataContainer [t1] - 获取读锁...14:05:14.345 c.DataContainer [t1] - 读取14:05:14.345 c.DataContainer [t2] - 读取14:05:15.365 c.DataContainer [t2] - 释放读锁...14:05:15.386 c.DataContainer [t1] - 释放读锁...
测试 读锁-写锁 相互阻塞
DataContainer dataContainer = new DataContainer();new Thread(() -> {dataContainer.read();}, "t1").start();Thread.sleep(100);new Thread(() -> {dataContainer.write();}, "t2").start();输出结果:14:04:21.838 c.DataContainer [t1] - 获取读锁...14:04:21.838 c.DataContainer [t2] - 获取写锁...14:04:21.841 c.DataContainer [t2] - 写入14:04:22.843 c.DataContainer [t2] - 释放写锁...14:04:22.843 c.DataContainer [t1] - 读取14:04:23.843 c.DataContainer [t1] - 释放读锁...写锁-写锁 也是相互阻塞的,这里就不测试了
注意事项:
- 读锁不支持条件变量
重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
r.lock();try {// ...w.lock();try {// ...} finally{w.unlock();}} finally{r.unlock();}
重入时降级支持:即持有写锁的情况下去获取读锁
class CachedData {Object data;// 是否有效,如果失效,需要重新计算 datavolatile boolean cacheValid;final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();void processCachedData() {rwl.readLock().lock();if (!cacheValid) {// 获取写锁前必须释放读锁rwl.readLock().unlock();rwl.writeLock().lock();try {// 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新if (!cacheValid) {data = ...cacheValid = true;}// 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存rwl.readLock().lock();} finally {rwl.writeLock().unlock();}}// 自己用完数据, 释放读锁try {use(data);} finally {rwl.readLock().unlock();}}}
* 应用之缓存
* 读写锁原理
3.2 StampedLock
该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用 ```java // 加解读锁 long stamp = lock.readLock(); lock.unlockRead(stamp);
// 加解写锁 long stamp = lock.writeLock(); lock.unlockWrite(stamp);
乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。```javalong stamp = lock.tryOptimisticRead();// 验戳if(!lock.validate(stamp)){// 锁升级}
提供一个 数据容器类 内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
class DataContainerStamped {private int data;private final StampedLock lock = new StampedLock();public DataContainerStamped(int data) {this.data = data;}public int read(int readTime) {long stamp = lock.tryOptimisticRead();log.debug("optimistic read locking...{}", stamp);sleep(readTime);if (lock.validate(stamp)) {log.debug("read finish...{}, data:{}", stamp, data);return data;}// 锁升级 - 读锁log.debug("updating to read lock... {}", stamp);try {stamp = lock.readLock();log.debug("read lock {}", stamp);sleep(readTime);log.debug("read finish...{}, data:{}", stamp, data);return data;} finally {log.debug("read unlock {}", stamp);lock.unlockRead(stamp);}}public void write(int newData) {long stamp = lock.writeLock();log.debug("write lock {}", stamp);try {sleep(2);this.data = newData;} finally {log.debug("write unlock {}", stamp);lock.unlockWrite(stamp);}}}
测试 读-读 可以优化
public static void main(String[] args) {DataContainerStamped dataContainer = new DataContainerStamped(1);new Thread(() -> {dataContainer.read(1);}, "t1").start();sleep(0.5);new Thread(() -> {dataContainer.read(0);}, "t2").start();}
输出结果,可以看到实际没有加读锁
15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...25615:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...25615:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:115:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1
测试 读-写 时优化读补加读锁
public static void main(String[] args) {DataContainerStamped dataContainer = new DataContainerStamped(1);new Thread(() -> {dataContainer.read(1);}, "t1").start();sleep(0.5);new Thread(() -> {dataContainer.write(100);}, "t2").start();}输出结果:15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...25615:57:00.717 c.DataContainerStamped [t2] - write lock 38415:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 25615:57:02.719 c.DataContainerStamped [t2] - write unlock 38415:57:02.719 c.DataContainerStamped [t1] - read lock 51315:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:100015:57:03.719 c.DataContainerStamped [t1] - read unlock 513
注意:
- StampedLock 不支持条件变量
- StampedLock 不支持可重入
4、Semaphore
基本使用
[ˈsɛməˌfɔr] 信号量,用来限制能同时访问共享资源的线程上限。 ```java public static void main(String[] args) { // 1. 创建 semaphore 对象 Semaphore semaphore = new Semaphore(3); // 2. 10个线程同时运行 for (int i = 0; i < 10; i++) { new Thread(() -> {
}).start(); } }// 3. 获取许可try {semaphore.acquire();} catch (InterruptedException e) {e.printStackTrace();}try {log.debug("running...");sleep(1);log.debug("end...");} finally {// 4. 释放许可semaphore.release();}
输出: 07:35:15.485 c.TestSemaphore [Thread-2] - running… 07:35:15.485 c.TestSemaphore [Thread-1] - running… 07:35:15.485 c.TestSemaphore [Thread-0] - running… 07:35:16.490 c.TestSemaphore [Thread-2] - end… 07:35:16.490 c.TestSemaphore [Thread-0] - end… 07:35:16.490 c.TestSemaphore [Thread-1] - end… 07:35:16.490 c.TestSemaphore [Thread-3] - running… 07:35:16.490 c.TestSemaphore [Thread-5] - running… 07:35:16.490 c.TestSemaphore [Thread-4] - running… 07:35:17.490 c.TestSemaphore [Thread-5] - end… 07:35:17.490 c.TestSemaphore [Thread-4] - end… 07:35:17.490 c.TestSemaphore [Thread-3] - end… 07:35:17.490 c.TestSemaphore [Thread-6] - running… 07:35:17.490 c.TestSemaphore [Thread-7] - running… 07:35:17.490 c.TestSemaphore [Thread-9] - running… 07:35:18.491 c.TestSemaphore [Thread-6] - end… 07:35:18.491 c.TestSemaphore [Thread-7] - end… 07:35:18.491 c.TestSemaphore [Thread-9] - end… 07:35:18.491 c.TestSemaphore [Thread-8] - running… 07:35:19.492 c.TestSemaphore [Thread-8] - end…
<a name="FJxne"></a>#### * Semaphore 应用<a name="sel7C"></a>#### * Semaphore 原理<a name="VU5Fp"></a>### 5、**CountdownLatch**用来进行线程同步协作,等待所有线程完成倒计时。 <br />其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一```javapublic static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(3);new Thread(() -> {log.debug("begin...");sleep(1);latch.countDown();log.debug("end...{}", latch.getCount());}).start();new Thread(() -> {log.debug("begin...");sleep(2);latch.countDown();log.debug("end...{}", latch.getCount());}).start();new Thread(() -> {log.debug("begin...");sleep(1.5);latch.countDown();log.debug("end...{}", latch.getCount());}).start();log.debug("waiting...");latch.await();log.debug("wait end...");}输出:18:44:00.778 c.TestCountDownLatch [main] - waiting...18:44:00.778 c.TestCountDownLatch [Thread-2] - begin...18:44:00.778 c.TestCountDownLatch [Thread-0] - begin...18:44:00.778 c.TestCountDownLatch [Thread-1] - begin...18:44:01.782 c.TestCountDownLatch [Thread-0] - end...218:44:02.283 c.TestCountDownLatch [Thread-2] - end...118:44:02.782 c.TestCountDownLatch [Thread-1] - end...018:44:02.782 c.TestCountDownLatch [main] - wait end...
可以配合线程池使用,改进如下:
public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(3);ExecutorService service = Executors.newFixedThreadPool(4);service.submit(() -> {log.debug("begin...");sleep(1);latch.countDown();log.debug("end...{}", latch.getCount());});service.submit(() -> {log.debug("begin...");sleep(1.5);latch.countDown();log.debug("end...{}", latch.getCount());});service.submit(() -> {log.debug("begin...");sleep(2);latch.countDown();log.debug("end...{}", latch.getCount());});service.submit(()->{try {log.debug("waiting...");latch.await();log.debug("wait end...");} catch (InterruptedException e) {e.printStackTrace();}});}输出:18:52:25.831 c.TestCountDownLatch [pool-1-thread-3] - begin...18:52:25.831 c.TestCountDownLatch [pool-1-thread-1] - begin...18:52:25.831 c.TestCountDownLatch [pool-1-thread-2] - begin...18:52:25.831 c.TestCountDownLatch [pool-1-thread-4] - waiting...18:52:26.835 c.TestCountDownLatch [pool-1-thread-1] - end...218:52:27.335 c.TestCountDownLatch [pool-1-thread-2] - end...118:52:27.835 c.TestCountDownLatch [pool-1-thread-3] - end...018:52:27.835 c.TestCountDownLatch [pool-1-thread-4] - wait end...
* 应用之同步等待多线程准备完毕
AtomicInteger num = new AtomicInteger(0);ExecutorService service = Executors.newFixedThreadPool(10, (r) -> {return new Thread(r, "t" + num.getAndIncrement());});CountDownLatch latch = new CountDownLatch(10);String[] all = new String[10];Random r = new Random();for (int j = 0; j < 10; j++) {int x = j;service.submit(() -> {for (int i = 0; i <= 100; i++) {try {Thread.sleep(r.nextInt(100));} catch (InterruptedException e) {}all[x] = Thread.currentThread().getName() + "(" + (i + "%") + ")";System.out.print("\r" + Arrays.toString(all));}latch.countDown();});}latch.await();System.out.println("\n游戏开始...");service.shutdown();中间输出:[t0(52%), t1(47%), t2(51%), t3(40%), t4(49%), t5(44%), t6(49%), t7(52%), t8(46%), t9(46%)]最后输出:[t0(100%), t1(100%), t2(100%), t3(100%), t4(100%), t5(100%), t6(100%), t7(100%), t8(100%),t9(100%)]游戏开始...
* 应用之同步等待多个远程调用结束
@RestControllerpublic class TestCountDownlatchController {@GetMapping("/order/{id}")public Map<String, Object> order(@PathVariable int id) {HashMap<String, Object> map = new HashMap<>();map.put("id", id);map.put("total", "2300.00");sleep(2000);return map;}@GetMapping("/product/{id}")public Map<String, Object> product(@PathVariable int id) {HashMap<String, Object> map = new HashMap<>();if (id == 1) {map.put("name", "小爱音箱");map.put("price", 300);} else if (id == 2) {map.put("name", "小米手机");map.put("price", 2000);}map.put("id", id);sleep(1000);return map;}@GetMapping("/logistics/{id}")public Map<String, Object> logistics(@PathVariable int id) {HashMap<String, Object> map = new HashMap<>();map.put("id", id);map.put("name", "中通快递");sleep(2500);return map;}private void sleep(int millis) {try {Thread.sleep(millis);} catch (InterruptedException e) {e.printStackTrace();}}}
rest 远程调用
RestTemplate restTemplate = new RestTemplate();log.debug("begin");ExecutorService service = Executors.newCachedThreadPool();CountDownLatch latch = new CountDownLatch(4);Future<Map<String,Object>> f1 = service.submit(() -> {Map<String, Object> r =restTemplate.getForObject("http://localhost:8080/order/{1}", Map.class, 1);return r;});Future<Map<String, Object>> f2 = service.submit(() -> {Map<String, Object> r =restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 1);return r;});Future<Map<String, Object>> f3 = service.submit(() -> {Map<String, Object> r =restTemplate.getForObject("http://localhost:8080/product/{1}", Map.class, 2);return r;});Future<Map<String, Object>> f4 = service.submit(() -> {Map<String, Object> r =restTemplate.getForObject("http://localhost:8080/logistics/{1}", Map.class, 1);return r;});System.out.println(f1.get());System.out.println(f2.get());System.out.println(f3.get());System.out.println(f4.get());log.debug("执行完毕");service.shutdown();执行结果:19:51:39.711 c.TestCountDownLatch [main] - begin{total=2300.00, id=1}{price=300, name=小爱音箱, id=1}{price=2000, name=小米手机, id=2}{name=中通快递, id=1}19:51:42.407 c.TestCountDownLatch [main] - 执行完毕
6、CyclicBarrier
[ˈsaɪklɪk ˈbæriɚ] 循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行
CyclicBarrier cb = new CyclicBarrier(2); // 个数为2时才会继续执行new Thread(()->{System.out.println("线程1开始.."+new Date());try {cb.await(); // 当个数不足时,等待} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}System.out.println("线程1继续向下运行..."+new Date());}).start();new Thread(()->{System.out.println("线程2开始.."+new Date());try { Thread.sleep(2000); } catch (InterruptedException e) { }try {cb.await(); // 2 秒后,线程个数够2,继续运行} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}System.out.println("线程2继续向下运行..."+new Date());}).start();
注意:CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的 CyclicBarrier 可以被比喻为『人满发车』
7、线程安全集合类概述

线程安全集合类可以分为三大类:
- 遗留的线程安全集合如 Hashtable , Vector
- 使用 Collections 装饰的线程安全集合,如:
- Collections.synchronizedCollection
- Collections.synchronizedList
- Collections.synchronizedMap
- Collections.synchronizedSet
- Collections.synchronizedNavigableMap
- Collections.synchronizedNavigableSet
- Collections.synchronizedSortedMap
- Collections.synchronizedSortedSet
- java.util.concurrent. :重点介绍 java.util.concurrent. 下的线程安全集合类,可以发现它们有规律,里面包含三类关键词: Blocking、CopyOnWrite、Concurrent
- Blocking 大部分实现基于锁,并提供用来阻塞的方法
- CopyOnWrite 之类容器修改开销相对较重
- Concurrent 类型的容器
- 内部很多操作使用 cas 优化,一般可以提供较高吞吐量
- 弱一致性
- 遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍历,这时内容是旧的
- 求大小弱一致性,size 操作未必是 100% 准确
- 读取弱一致性
遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出
ConcurrentModifificationException,不再继续遍历
8、ConcurrentHashMap
练习:单词计数
static final String ALPHA = "abcedfghijklmnopqrstuvwxyz";public static void main(String[] args) {int length = ALPHA.length();int count = 200;List<String> list = new ArrayList<>(length * count);for (int i = 0; i < length; i++) {char ch = ALPHA.charAt(i);for (int j = 0; j < count; j++) {list.add(String.valueOf(ch));}}Collections.shuffle(list);for (int i = 0; i < 26; i++) {try (PrintWriter out = new PrintWriter(new OutputStreamWriter(new FileOutputStream("tmp/" + (i+1) + ".txt")))) {String collect = list.subList(i * count, (i + 1) * count).stream().collect(Collectors.joining("\n"));out.print(collect);} catch (IOException e) {}}}
模版代码,模版代码中封装了多线程读取文件的代码
private static <V> void demo(Supplier<Map<String,V>> supplier,BiConsumer<Map<String,V>,List<String>> consumer) {Map<String, V> counterMap = supplier.get();List<Thread> ts = new ArrayList<>();for (int i = 1; i <= 26; i++) {int idx = i;Thread thread = new Thread(() -> {List<String> words = readFromFile(idx);consumer.accept(counterMap, words);});ts.add(thread);}ts.forEach(t->t.start());ts.forEach(t-> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});System.out.println(counterMap);}public static List<String> readFromFile(int i) {ArrayList<String> words = new ArrayList<>();try (BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream("tmp/"+ i +".txt")))) {while(true) {String word = in.readLine();if(word == null) {break;}words.add(word);}return words;} catch (IOException e) {throw new RuntimeException(e);}}
你要做的是实现两个参数
一是提供一个 map 集合,用来存放每个单词的计数结果,key 为单词,value 为计数
二是提供一组操作,保证计数的安全性,会传递 map 集合以及 单词 List
正确结果输出应该是每个单词出现 200 次
{a=200, b=200, c=200, d=200, e=200, f=200, g=200, h=200, i=200, j=200, k=200, l=200, m=200,n=200, o=200, p=200, q=200, r=200, s=200, t=200, u=200, v=200, w=200, x=200, y=200, z=200}
下面的实现为:
demo(// 创建 map 集合// 创建 ConcurrentHashMap 对不对?() -> new HashMap<String, Integer>(),// 进行计数(map, words) -> {for (String word : words) {Integer counter = map.get(word);int newValue = counter == null ? 1 : counter + 1;map.put(word, newValue);}});
有没有问题?请改进
demo(() -> new ConcurrentHashMap<String, LongAdder>(),(map, words) -> {for (String word : words) {// 注意不能使用 putIfAbsent,此方法返回的是上一次的 value,首次调用返回 nullmap.computeIfAbsent(word, (key) -> new LongAdder()).increment();}});
demo(() -> new ConcurrentHashMap<String, Integer>(),(map, words) -> {for (String word : words) {// 函数式编程,无需原子变量map.merge(word, 1, Integer::sum);}});
* ConcurrentHashMap 原理
9、BlockingQueue
* BlockingQueue 原理
10、ConcurrentLinkedQueue
ConcurrentLinkedQueue 的设计与 LinkedBlockingQueue 非常像,也是
- 两把【锁】,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行
- dummy 节点的引入让两把【锁】将来锁住的是不同对象,避免竞争
- 只是这【锁】使用了 cas 来实现
事实上,ConcurrentLinkedQueue 应用还是非常广泛的
例如之前讲的 Tomcat 的 Connector 结构时,Acceptor 作为生产者向 Poller 消费者传递事件信息时,正是采用了ConcurrentLinkedQueue 将 SocketChannel 给 Poller 使用
11、CopyOnWriteArrayList
CopyOnWriteArraySet 是它的马甲 底层实现采用了 写入时拷贝 的思想,增删改操作会将底层数组拷贝一份,更改操作在新数组上执行,这时不影响其它线程的并发读,读写分离。 以新增为例:
public boolean add(E e) {synchronized (lock) {// 获取旧的数组Object[] es = getArray();int len = es.length;// 拷贝新的数组(这里是比较耗时的操作,但不影响其它读线程)es = Arrays.copyOf(es, len + 1);// 添加新元素es[len] = e;// 替换旧的数组setArray(es);return true;}}
这里的源码版本是 Java 11,在 Java 1.8 中使用的是可重入锁而不是 synchronized
public void forEach(Consumer<? super E> action) {Objects.requireNonNull(action);for (Object x : getArray()) {@SuppressWarnings("unchecked") E e = (E) x;action.accept(e);}}
适合『读多写少』的应用场景
get 弱一致性

不容易测试,但问题确实存在
迭代器弱一致性
CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();list.add(1);list.add(2);list.add(3);Iterator<Integer> iter = list.iterator();new Thread(() -> {list.remove(0);System.out.println(list);}).start();sleep1s();while (iter.hasNext()) {System.out.println(iter.next());}
不要觉得弱一致性就不好
- 数据库的 MVCC 都是弱一致性的表现
- 并发高和一致性是矛盾的,需要权衡




