一、介绍
1.1 简介
Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
1.2 应用场景
- Exchanger可以用于遗传算法
- Exchanger也可以用于校对工作
1.3 Exchange api
//构造函数public Exchanger()//等待其他线程到达交换点(除非线程被打断,或者交换超时),//传输给定对象到exchanger,接收exchanger 返回的对象public V exchange(V x);//等待其他线程到达交换点(除非线程被打断,或者交换超时),//传输给定对象到exchanger,接收exchanger 返回的对象public V exchange(V x, long timeout, TimeUnit unit);
1.4 简单的使用例子
public class ExchangerTest {private static final Exchanger<String> exgr = new Exchanger<String>();private static ExecutorService threadPool = Executors.newFixedThreadPool(2);public static void main(String[] args) {threadPool.execute(() -> {String A = "银行流水A";// A录入银行流水数据try {String result = exgr.exchange(A);System.out.println("receive " + result);} catch (InterruptedException e) {e.printStackTrace();}});threadPool.execute(() -> {String B = "银行流水B";// B录入银行流水数据String A = null;try {//交换数据A = exgr.exchange("B");} catch (InterruptedException e) {e.printStackTrace();}System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:" + A + ",B录入是:" + B);});threadPool.shutdown();}}//输出A和B数据是否一致:false,A录入的是:银行流水A,B录入是:银行流水Breceive B
1.5 类UML 图
二、源码分析
2.1 Exchanger 的成员变量
static final class Participant extends ThreadLocal<Node> {public Node initialValue() { return new Node(); }}// @sun.misc.Contended 消除伪共享@sun.misc.Contendedstatic final class Node {int index; // 当前节点 在数组 arena 的下标int bound; // 交换器的最后记录值int collides; // 在当前 bound 下 CAS 的失败次数int hash; // 伪随机的自旋数Object item; // A线程交换的数据项 如 Object match = A.exchange(item)volatile Object match; // 与该A线程交换数据对象的目标B线程 的数据项volatile Thread parked; // 当阻塞时,设置此线程,不阻塞的话就不必了(因为会自旋)}//用于保存每个线程的状态, 实现于ThreadLocalprivate final Participant participant;//交换器隔离区域,启动时为空(在slotExchange 方法内初始化)private volatile Node[] arena;//在同步点交换数据时,插槽用于保存线程的数据节点private volatile Node slot;//记录arena 最后合法的位置索引private volatile int bound;
2.2 构造方法
public Exchanger() {participant = new Participant();}
在实例化时,创建了一个 ThreadLocal 对象,并设置了初始值,一个 Node 对象。
2.3 exchange 方法
public V exchange(V x) throws InterruptedException {Object v;Object item = (x == null) ? NULL_ITEM : x; // translate null argsif ((//1. 如果 arena 为空,则执行slotExchange方法;//2. slotExchange返回值 不为空 ,则返回arena != null || (v = slotExchange(item, false, 0L)) == null)&&(//3. arena 不为空 或者 slotExchange返回值 == null, 执行以下语句(//4. 线程被打断,返回//5. 线程没有被打断, 执行arenaExchange方法;Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null)))throw new InterruptedException();return (v == NULL_ITEM) ? null : (V)v;}
2.4 slotExchange 的源码分析
//item: 线程交换的数据项//timed 是否有超时//ns 超时毫秒private final Object slotExchange(Object item, boolean timed, long ns) {//拿出存在ThreadLocal 中的 nodeNode p = participant.get();//获取调用线程Thread t = Thread.currentThread();//判断是否被打断if (t.isInterrupted()) // preserve interrupt status so caller can recheckreturn null;//自旋for (Node q;;) {//判断 slot 节点是否为空if ((q = slot) != null) {//第二个线程执行 slotExchange 进入, 假设slot 不为空//设置插槽slot 为null, 唤醒等待线程, 返回数据项if (U.compareAndSwapObject(this, SLOT, q, null)) {//获取 q.itemObject v = q.item;//自己的 item 赋值给 match,以让对方线程获取q.match = item;//唤醒与之匹配的线程,进行交换数据Thread w = q.parked;if (w != null)U.unpark(w);return v;}// cas 修改slot 失败,证明有线程在占用slot//如果arena 还没有初始化,则创建arena数组//直到slot 使用完置空后,其他线程执行exchange 方法后,就使用arena数组if (NCPU > 1 && bound == 0 &&U.compareAndSwapInt(this, BOUND, 0, SEQ)) // SEQ == 256; 默认 BOUND == 0arena = new Node[(FULL + 2) << ASHIFT];// length = (2 + 2) << 7 == 512}else if (arena != null)//多线程占用slot,使用arenaExchange方法return null; // caller must reroute to arenaExchangeelse {//第一个线程执行exchange 方法时//会创建一个Node 作为 slotp.item = item;if (U.compareAndSwapObject(this, SLOT, null, p))//跳出循环,等待释放break;// 如果 CAS 失败,将 p 的值清空,重来p.item = null;}}//执行这里,说明线程已经把需要交换的数据放到了slot中,当前线程阻塞自己//等待另一个线程在同一同步点时,唤醒当前线程交换数据// 伪随机数int h = p.hash;//超时时间long end = timed ? System.nanoTime() + ns : 0L;//自旋次数 SPINS: 1024int spins = (NCPU > 1) ? SPINS : 1;Object v;//获取 p.match 数据项,是否为空//match 使用了volatile 修饰,具有线程间可见性while ((v = p.match) == null) {//如果为空,仍需阻塞等待//判断自旋次数 是否大于0if (spins > 0) {// 计算伪随机数h ^= h << 1; h ^= h >>> 3; h ^= h << 10;if (h == 0)h = SPINS | (int)t.getId();// 如果不是0,就将自旋数减一//自旋次数减为0,则让出 CPU 时间片else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)Thread.yield();}//如果自旋数不够了,且 slot 还没有得到,就重置自旋数else if (slot != p)spins = SPINS;// 如果线程没有中断 && arena数组不是 null && 没有超时限制else if (!t.isInterrupted() && arena == null &&(!timed || (ns = end - System.nanoTime()) > 0L)) {//自旋次数用完了//采用阻塞的手段U.putObject(t, BLOCKER, this);p.parked = t; //记录阻塞的线程// 如果这个数据还没有被拿走,阻塞自己if (slot == p)U.park(false, ns);//线程被唤醒,清空阻塞标识p.parked = null;// 将当前线程的 parkBlocker 属性设置成 nullU.putObject(t, BLOCKER, null);}// 如果有超时限制,使用 CAS 将 slot 从 p 变成 null,取消这次交换else if (U.compareAndSwapObject(this, SLOT, p, null)) {v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;break;}}//将节点p 的match 属性设置为空//表示初始化状态,没有任何匹配 >>> putOrderedObject是putObjectVolatile的内存非立即可见版本.U.putOrderedObject(p, MATCH, null);// 重置 itemp.item = null;// 保留伪随机数,供下次种子数字p.hash = h;//返回return v;}
slotExchage 方法逻辑
- 当第一个线程进入slotExchage 方法,会从ThreadLocal 中 获取p (Node)节点 ;没有则创建并保存在ThreadLocal中
- 线程把数据项item 保存到节点p 的item 中, 并把Exchanger 中的slot 指向 节点p
- 为了等待与之匹配交换数据的线程,需要进行等待阻塞 (为了性能考虑,按照优先级 自旋 > Thread.yield > 阻塞挂起)
- 当第二个线程进入的时候,会拿出存储在 slot item 中的值, 然后对 slot 的 match 赋值,并唤醒上次阻塞的线程.
- 当第一个线程阻塞被唤醒后,说明对方取到值了,就获取 slot 的 match (因为属性被volatile修饰,可以立即感知)值, 并重置 slot 的数据和ThreadLocal的数据,并返回自己的数据.
- 最后,如果超时了,就返回 Time_out 对象。如果线程中断了,就返回 null。在该方法中,会返回 2 种结果,一是有效的 item, 二是 null—- 要么是线程竞争使用 slot 了,创建了 arena 数组,要么是线程中断了.
2.5 使用slot 插槽节点交换数据图示
2.6 arenaExchange 方法 源码分析
private final Object arenaExchange(Object item, boolean timed, long ns) {//获取arena 数组Node[] a = arena;//拿出存在ThreadLocal 中的 nodeNode p = participant.get();for (int i = p.index;;) { p.index 初始化时为0int b, m, c; long j;//(i << ASHIFT) + ABASE) 计算第i个元素的在数组arena 的下标//通过下标 j 获取数组a 中的元素节点 qNode q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);////如果q不为null,则将对应的数组元素置为null,表示当前线程和该元素对应的线程匹配了if (q != null && U.compareAndSwapObject(a, j, q, null)) {//获取 q.itemObject v = q.item;//自己的 item 赋值给 match,以让对方线程获取q.match = item;//唤醒与之匹配的线程,进行交换数据if (w != null)U.unpark(w);return v;}//SEQ = 256//MMASK = 255//q为null 或者q不为null,cas抢占q失败了//bound初始化时是SEQ,SEQ & MMASK就是0,即m的初始值就是0,m为0时,i肯定为0else if (i <= (m = (b = bound) & MMASK) && q == null) {//把当前线程的数据放到p节点的item 中p.item = item; // offer//对应的数组元素修改为pif (U.compareAndSwapObject(a, j, null, p)) {//计算超时时间long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;//获取当前线程Thread t = Thread.currentThread(); // wait//自旋(等待匹配)for (int h = p.hash, spins = SPINS;;) {Object v = p.match;if (v != null) { //已经跟某个线程交换成功//把p.match 置空U.putOrderedObject(p, MATCH, null);//把p.item 置空p.item = null; // clear for next use//保存hash 随机数,下次自旋使用p.hash = h;return v;}//还没匹配, 判断自旋次数是否大于0else if (spins > 0) {h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshiftif (h == 0) // initialize hashh = SPINS | (int)t.getId();else if (h < 0 && // approx 50% true(--spins & ((SPINS >>> 1) - 1)) == 0)//当自旋次数等于0, 采用 yield 让出CPU 时间片Thread.yield(); // two yields per wait}else if (U.getObjectVolatile(a, j) != p)//索引j处的数组元素发生变更//重置自旋次数spins = SPINS; // releaser hasn't set match yetelse if (!t.isInterrupted() && m == 0 &&(!timed ||(ns = end - System.nanoTime()) > 0L)) {//1. 线程没有被打断//2. 没有超时//3. m == 0// 阻塞当前线程U.putObject(t, BLOCKER, this); // emulate LockSupportp.parked = t; // minimize windowif (U.getObjectVolatile(a, j) == p)U.park(false, ns);//线程被唤醒,因为中断或者等待超时p.parked = null;U.putObject(t, BLOCKER, null);}else if (U.getObjectVolatile(a, j) == p &&U.compareAndSwapObject(a, j, p, null)) {//线程被中断了或者等待超时或者m不等于0,进入此分支//m !=0if (m != 0) // try to shrink//修改bound,下一次跟MMASK求且的结果会减1,此时可能导致i大于mU.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);p.item = null;p.hash = h;//索引无符号右移(除以2)i = p.index >>>= 1; // descend//如果线程被打断,返回nullif (Thread.interrupted())return null;//如果超时,返回超时对象if (timed && m == 0 && ns <= 0L)return TIMED_OUT;//m不等于0的情形,下一次for循环继续抢占//终止内层for循环,继续外层的for循环,会尝试抢占其他的数组元素,然后自旋等待break; // expired; restart}//内层for循环结束}}else //数组元素修改失败,下一次for循环重试,q不为null,与该元素对应的线程匹配成功p.item = null; // clear offer}else {//i>m 或者q不为null,cas抢占q失败了 会进入此分支//bound的初始值也是0if (p.bound != b) { // stale; resetp.bound = b;p.collides = 0;//如果i等于m且m不等于0,则i=m-1,否则i=mi = (i != m || m == 0) ? m : m - 1;}//修改bound,下一次跟MMASK求且的结果即m会加1,下一次进入此else分支p.bound != b//如果m小于FULL,会尝试最多m次,即进入下面的逻辑最多m次,//如果还失败则增加m,然后继续尝试直到m等于FUll为止else if ((c = p.collides) < m || m == FULL ||!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {//计数加1p.collides = c + 1;//i不等于0的时候,i等于i减1,等于0则i等于m,即循环的从m往后遍历arena数组的元素了i = (i == 0) ? m : i - 1; // cyclically traverse}//上一个else if三个条件都为false,即p.collides >m 且m不等于FULL且cas 修改bound成功else//修改bound成功会导致下一次计算m时,m加1,//此处i等于m+1,下一次for循环i等于m,会抢占m+1对应的rena数组的元素i = m + 1; // grow//重置ip.index = i;}}}
arenaExchange 方法的执行逻辑
- 当第一个线程进入arenaExchange 方法,会从ThreadLocal 中 获取p (Node)节点 ;没有则创建并保存在ThreadLocal中
- 遍历arena 数组,m的初始值就是0,index的初始值也是0,两个都是大于等于0且i不大于m。 如果下标j对应的元素为空,把当前线程的数据放到p节点的item 中,并cas 设置下标j对应 元素值为 p节点。
- 为了等待与之匹配交换数据的线程,需要进行等待阻塞 (为了性能考虑,按照优先级 自旋 > Thread.yield > 阻塞挂起)
- 当第二个线程进入的时候, 也会从0开始遍历 arena, 如果当前索引j 对应的元素不为空,cas 置空成功后,进行元素交换;会拿出存储在 q.item 中的值, 然后对 q.match 赋值,并唤醒与之匹配阻塞的线程.
- 如果当某个线程多次尝试抢占index对应数组元素的Node都失败的情形下则尝试将m加1,然后抢占m加1对应的新数组元素,将其由null修改成当前线程关联的Node,然后自旋等待匹配;如果自旋结束,没有匹配的线程,则将m加1对应的新数组元素重新置为null,将m减1,然后再次for循环抢占其他为null的数组元素。极端并发下m会一直增加直到达到最大值FULL为止,达到FULL后只能通过for循环不断尝试与其他线程匹配或者抢占为null的数组元素,然后随着并发减少,m会一直减少到0。
