在开始介绍AbstractFuture之前先让我们手动实现一个回调.
回调
手写Future类
public class Future {private Consumer consumer;public void addListener(Consumer<T> consumer) {this.consumer = consumer;}public void set(T value) {consumer.accept(value);}}
测试回调
public static void main(String[] args) throws InterruptedException {Future future = new Future();//监听future设置了值future.addListener(new Consumer<String>() {@Overridepublic void accept(String s) {System.out.println("---------"+s);}});TimeUnit.SECONDS.sleep(5);future.set("hh");TimeUnit.SECONDS.sleep(1);}
执行mian方法,可以发现睡眠5秒后,future设置了值,这时addListener方法可以监听到future设置了值.
以上回调有个问题,假设Consumer接口中的方法执行需要大量的时间,那这样future.set(“hh”)的时候就会出现阻塞,其实对于这种情况,我们完全可以启用一个线程去实现,但是AbstractFuture已经为我们很好的解决方法.
AbstractFuture使用
先通过一个例子,感受下AbstractFuture的好处
static class AbstractFutureImpl extends AbstractFuture {public boolean set(T value) {return super.set(value);}}// 创建线程池final static ExecutorService executors = Executors.newCachedThreadPool();public static void main(String[] args) throws InterruptedException {AbstractFutureImpl<String> future = new AbstractFutureImpl();//监听future设置了值future.addListener(new Runnable() {@Overridepublic void run() {try {System.out.println("进入回调函数");TimeUnit.SECONDS.sleep(3);System.out.println("收到set的值: " + future.get());} catch (Exception e) {e.printStackTrace();}}}, executors);TimeUnit.SECONDS.sleep(5);future.set("hh");System.out.println("set完值");TimeUnit.SECONDS.sleep(100);}/**set完值进入回调函数收到set的值: hh*/
由打印结果可以看出,future.set(“hh”)方法未阻塞,回调函数完全由传入线程池去执行. 当set完值时,就会主动回调addListener方法,通过future.get()(这时已经有值,故get()方法不会阻塞)获取到值.
AbstractFuture解析
AbstractFuture实现了ListenableFuture,Future接口,实现了接口中所有方法.无需我们重复造轮子.
那addListener是如何实现的呢?具体实现细节看源码,这里说下大概思路.
addListener核心源码:
public void add(Runnable runnable, Executor executor) {// Fail fast on a null. We throw NPE here because the contract of// Executor states that it throws NPE on null listener, so we propagate// that contract up into the add method as well.Preconditions.checkNotNull(runnable, "Runnable was null.");Preconditions.checkNotNull(executor, "Executor was null.");// Lock while we check state. We must maintain the lock while adding the// new pair so that another thread can't run the list out from under us.// We only add to the list if we have not yet started execution.synchronized (this) {if (!executed) {runnables = new RunnableExecutorPair(runnable, executor, runnables);return;}}// Execute the runnable immediately. Because of scheduling this may end up// getting called before some of the previously added runnables, but we're// OK with that. If we want to change the contract to guarantee ordering// among runnables we'd have to modify the logic here to allow it.executeListener(runnable, executor);}
源码里有个executed判断,如何走if()判断,会把runnable和executor存储起来,否则直接通过executor执行runnable方法.
为什么会有executed呢? 看set(T value)方法源码就会知道,:
- 如果addListener方法比set方法先执行,这时executed就是false, 执行set方法时会主动通过executor执行runnable方法.
- 如果addListener方法比set方法后执行,这时executed就是ture.直接通过executor执行runnable方法.
set核心源码:
public void execute() {// Lock while we update our state so the add method above will finish adding// any listeners before we start to run them.RunnableExecutorPair list;synchronized (this) {if (executed) {return;}executed = true;list = runnables;runnables = null; // allow GC to free listeners even if this stays around for a while.}// If we succeeded then list holds all the runnables we to execute. The pairs in the stack are// in the opposite order from how they were added so we need to reverse the list to fulfill our// contract.// This is somewhat annoying, but turns out to be very fast in practice. Alternatively, we// could drop the contract on the method that enforces this queue like behavior since depending// on it is likely to be a bug anyway.// N.B. All writes to the list and the next pointers must have happened before the above// synchronized block, so we can iterate the list without the lock held here.RunnableExecutorPair reversedList = null;while (list != null) {RunnableExecutorPair tmp = list;list = list.next;tmp.next = reversedList;reversedList = tmp;}while (reversedList != null) {executeListener(reversedList.runnable, reversedList.executor);reversedList = reversedList.next;}}
注: 思想是灵魂,实现是形式
