- Callable
- Future
- CompletionService
- CompletableFuture,重要
- 重要API
- whenComplete&exceptionally
- thenApply,上个任务的结果作为入参
- thenCompose 返回的函数实现计算步骤的记过看实例d
- thenAccept,没有返回值了
- thenAcceptBoth,前面的两个都执行玩,执行action
- thenRun,再执行一段另一个逻辑,与上一个CompletabelFuture无关了
- thenCombine 合并连个任务的结果,下一步处理
- applyToEither
- acceptEither,获取执行最快的任务的结果,并有下一步的处理
- runAfterEither,两个任务,有一个完成,就执行下一步
- runAfterBoth 两个都完成菜执行下一步
- anyOf 多个completableFuture ,有一个完成就返回这个的Future结果
Callable
类比 Runnable,都可以创建线程,但是区别是有返回值
需要搭配Future FutureTask
//无返回值//不能抛出异常@FunctionalInterfacepublic interface Runnable {public abstract void run();}@FunctionalInterfacepublic interface Callable<V> {V call() throws Exception;}
使用示例
FutureTask task = new FutureTask(new Callable() {@Overridepublic Object call() throws Exception {System.out.println("通过Callable方式执行任务");Thread.sleep(3000);return "返回任务结果";}});new Thread(task).start();
Future
接受Callable的任务,对这个返回的任务的操作
- boolean cancel (boolean mayInterruptIfRunning) 取消任务,是立即取消还是等执行完取消
- boolean isCancelled () 判断是否已取消
- boolean isDone () 判断是否已完成
- V get () 获取任务,阻塞式的获取结果
- V get (long timeout, TimeUnit unit) ,同上,制定了阻塞的时间限制
FutureTask与Future,
实现关系
FutureTask既可以当做Runnable执行,也可以当做Future接受Callable的返回结果
使用示例
public class FutureTaskDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {Task task = new Task();//构建futureTaskFutureTask<Integer> futureTask = new FutureTask<>(task);//作为Runnable入参new Thread(futureTask).start();System.out.println("task运行结果:"+futureTask.get());}static class Task implements Callable<Integer> {@Overridepublic Integer call() throws Exception {System.out.println("子线程正在计算");int sum = 0;for (int i = 0; i < 100; i++) {sum += i;}return sum;}}}
使用案例:促销活动中的尚宁信息查询

public class FutureTaskDemo2 {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> ft1 = new FutureTask<>(new T1Task());FutureTask<String> ft2 = new FutureTask<>(new T2Task());FutureTask<String> ft3 = new FutureTask<>(new T3Task());FutureTask<String> ft4 = new FutureTask<>(new T4Task());FutureTask<String> ft5 = new FutureTask<>(new T5Task());//构建线程池ExecutorService executorService = Executors.newFixedThreadPool(5);executorService.submit(ft1);executorService.submit(ft2);executorService.submit(ft3);executorService.submit(ft4);executorService.submit(ft5);//获取执行结果System.out.println(ft1.get());System.out.println(ft2.get());System.out.println(ft3.get());System.out.println(ft4.get());System.out.println(ft5.get());executorService.shutdown();}static class T1Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T1:查询商品基本信息...");TimeUnit.MILLISECONDS.sleep(50);return "商品基本信息查询成功";}}static class T2Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T2:查询商品价格...");TimeUnit.MILLISECONDS.sleep(50);return "商品价格查询成功";}}static class T3Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T3:查询商品库存...");TimeUnit.MILLISECONDS.sleep(50);return "商品库存查询成功";}}static class T4Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T4:查询商品图片...");TimeUnit.MILLISECONDS.sleep(50);return "商品图片查询成功";}}static class T5Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T5:查询商品销售状态...");TimeUnit.MILLISECONDS.sleep(50);return "商品销售状态查询成功";}}}
Future注意事项
建议使用get加timeout
思考: 使用Callable 和Future 产生新的线程了吗? 单独使用时并没有
局限性:
- 并发执行多任务时:get是阻塞等待获取的
- 无法对多个任务进行链式调用,下面有方法可以实现
- 无法组合任务,下面的也可以实现
- 无异常处理,
CompletionService

主要功能就是:一边生成任务,一边获取任务,让两个事分开执行,任务直接不会相互阻塞,不再依赖任务顺序
CompletionService原理
内部通过阻塞队列+FutureTask,先完成的可以先获取,
内部队列保存执行完成的Future,
take, poll获取一个已经执行完成的Future,再get结果
使用案
向不同平台询价,再保存
// 创建线程池ExecutorService executor = Executors.newFixedThreadPool(3);// 异步向电商S1询价Future<Integer> f1 = executor.submit(()->getPriceByS1());// 异步向电商S2询价Future<Integer> f2= executor.submit(()->getPriceByS2());// 获取电商S1报价并异步保存executor.execute(()->save(f1.get()));// 获取电商S2报价并异步保存executor.execute(()->save(f2.get())//问题,就是f1会阻挡f2
//创建线程池ExecutorService executor = Executors.newFixedThreadPool(10);//创建CompletionServiceCompletionService<Integer> cs = new ExecutorCompletionService<>(executor);//异步向电商S1询价cs.submit(() -> getPriceByS1());//异步向电商S2询价cs.submit(() -> getPriceByS2());//异步向电商S3询价cs.submit(() -> getPriceByS3());//将询价结果异步保存到数据库for (int i = 0; i < 3; i++) {Integer r = cs.take().get();executor.execute(() -> save(r));}
实现类似 Dubbo 的 Forking Cluster场景
Dubbo 中有一种叫做 Forking 的集群模式,这种集群模式下,支持并行地调用多个服务实例,只要有一个成功就返回结果。
// 创建线程池ExecutorService executor = Executors.newFixedThreadPool(3);// 创建CompletionServiceCompletionService<Integer> cs = new ExecutorCompletionService<>(executor);// 用于保存Future对象List<Future<Integer>> futures = new ArrayList<>(3);//提交异步任务,并保存future到futuresfutures.add(cs.submit(()->geocoderByS1()));futures.add(cs.submit(()->geocoderByS2()));futures.add(cs.submit(()->geocoderByS3()));// 获取最快返回的任务执行结果Integer r = 0;try {// 只要有一个成功返回,则breakfor (int i = 0; i < 3; ++i) {r = cs.take().get();//简单地通过判空来检查是否成功返回if (r != null) {break;}}} finally {//取消所有任务for(Future<Integer> f : futures)f.cancel(true);}// 返回结果
应用场景
- 批量提交异步任务时,有效各个
- 让异步的执行结果有序化,先执行完的先入阻塞队列
- 线程池隔离,各个业务不干扰
CompletableFuture,重要
让业务逻辑处理并行啊,聚合,依赖的,扩展,增强,
对Future扩展,增强,还有任务的编排能力
CompletionStage: 默认使用时ForkJoinPool.commonPool线程池
重要API
依赖场景
- thenApply(),前面执行完,执行后面
- thenCompose(),链接有依赖关系的任务,
描述and聚合关系
- thenCombine: 任务合并
- thenAccepetBoth: 两个如未能执行完成交给后面的任务,无返回
- runAfterBoth,:l两个任务完成,执行下一个操作
描述or的关系
- applyToEither:两个任务谁快,执行谁
- acceptEither: 谁快消耗那个的结果
- runAfterEither::谁先完成,就下一步
并行
- anyOf
- allOf
创建异步操作
CompletableFuture 提供了四个静态方法来创建一个异步操作:
//public static CompletableFuture<Void> runAsync(Runnable runnable)//不指定就用ForkJoinPool.commonPool() 指定就用executor//强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)//Supplier 接口的 get() 方法是有返回值的(会阻塞)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync&supplyAsync
Runnable runnable = () -> System.out.println("执行无返回结果的异步任务");CompletableFuture.runAsync(runnable);CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {System.out.println("执行有返回值的异步任务");try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return "Hello World";});String result = future.get();System.out.println(result);
获取结果
join&get,都是获取异步的结果,抛的异常不一样
whenComplete&exceptionally
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
- 方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
- 这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture的计算结果或者返回异常
whenComplete&exceptionally
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {}if (new Random().nextInt(10) % 2 == 0) {int i = 12 / 0;}System.out.println("执行结束!");return "test";});future.whenComplete(new BiConsumer<String, Throwable>() {@Overridepublic void accept(String t, Throwable action) {System.out.println(t+" 执行完成!");}});//失败之后的处理future.exceptionally(new Function<Throwable, String>() {@Overridepublic String apply(Throwable t) {System.out.println("执行失败:" + t.getMessage());return "异常xxxx";}}
thenApply,上个任务的结果作为入参
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int result = 100;System.out.println("一阶段:" + result);return result;}).thenApply(number -> {// number是上任务的结果int result = number * 3;System.out.println("二阶段:" + result);return result;});
thenCompose 返回的函数实现计算步骤的记过看实例d
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(30);System.out.println("第一阶段:" + number);return number;}}).thenCompose(new Function<Integer, CompletionStage<Integer>>() {@Override//入参param 技术是上个任务返回的numberpublic CompletionStage<Integer> apply(Integer param) {return CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = param * 2;System.out.println("第二阶段:" + number);return number;}});}});}System.out.println("最终结果: " + future.get());
thenApply 和 thenCompose的区别,有点类似
- thenApply 转换的是泛型中的类型,返回的是同一个CompletableFuture;
- thenCompose 将内部的 CompletableFuture 调用展开来并使用上一个
CompletableFutre 调用的结果在下一步的 CompletableFuture 调用中进行运算,是生
成一个新的CompletableFuture。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture<String> result1 = future.thenApply(param -> param + " World");CompletableFuture<String> result2 = future.thenCompose(param -> CompletableFuture.supplyAsync(() -> param + " World"));System.out.println(result1.get());System.out.println(result2.get());
下面的是结果消费,只是执行action,不返回CompletableFuture
- thenAccept系列:对单个结果进行消费
- thenAcceptBoth系列:对两个结果进行消费
- thenRun系列:不关心结果,只对结果执行Action
thenAccept,没有返回值了
```java public CompletionStagethenAccept(Consumer<? super T> action); public CompletionStage thenAcceptAsync(Consumer<? super T> action);
```javaCompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {int number = new Random().nextInt(10);System.out.println("第一阶段:" + number);return number;}).thenAccept(number ->System.out.println("第二阶段:" + number * 5));System.out.println("最终结果:" + future.get());//fuure 泛型是void
thenAcceptBoth,前面的两个都执行玩,执行action
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
CompletableFuture<Integer> futrue1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(3) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(3) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:" + number);return number;}});//就时连个future 结束时在处理futrue1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() {@Overridepublic void accept(Integer x, Integer y) {System.out.println("最终结果:" + (x + y));}}).join();
thenRun,再执行一段另一个逻辑,与上一个CompletabelFuture无关了
public CompletionStage<Void> thenRun(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action);
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {int number = new Random().nextInt(10);System.out.println("第一阶段:" + number);return number;}).thenRun(() ->System.out.println("thenRun 执行"));System.out.println("最终结果:" + future.get());
thenCombine 合并连个任务的结果,下一步处理
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第一阶段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第二阶段:" + number);return number;}});//跟thenApply没啥区别CompletableFuture<Integer> result = future1.thenCombine(future2, new BiFunction<Integer, Integer, Integer>() {@Overridepublic Integer apply(Integer x, Integer y) {return x + y;}});System.out.println("最终结果:" + result.get());
applyToEither
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第一阶段start:" + number);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段end:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10);System.out.println("第二阶段start:" + number);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段end:" + number);return number;}});就是获取运行比较快的那个任务的结果future1.applyToEither(future2, new Function<Integer, Integer>() {@Overridepublic Integer apply(Integer number) {System.out.println("最快结果:" + number);return number * 2;}}).join();
acceptEither,获取执行最快的任务的结果,并有下一步的处理
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(10) + 1;try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:" + number);return number;}});future1.acceptEither(future2, new Consumer<Integer>() {@Overridepublic void accept(Integer number) {System.out.println("最快结果:" + number);}}).join();
runAfterEither,两个任务,有一个完成,就执行下一步
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(5);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:" + number);return number;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {int number = new Random().nextInt(5);try {TimeUnit.SECONDS.sleep(number);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:" + number);return number;}});future1.runAfterEither(future2, new Runnable() {@Overridepublic void run() {System.out.println("已经有一个任务完成了");}}).join();
runAfterBoth 两个都完成菜执行下一步
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第一阶段:1");return 1;}});CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {@Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("第二阶段:2");return 2;}});future1.runAfterBoth(future2, new Runnable() {@Overridepublic void run() {System.out.println("上面两个任务都执行完成了。");}}).get();
anyOf 多个completableFuture ,有一个完成就返回这个的Future结果
Random random = new Random();CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(random.nextInt(5));} catch (InterruptedException e) {e.printStackTrace();}return "hello";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(random.nextInt(1));} catch (InterruptedException e) {e.printStackTrace();}return "world";});CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);System.out.println(result.get());
allOf
allOf方法用来实现多 CompletableFuture 的同时返回。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("future1完成!");return "future1完成!";});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {System.out.println("future2完成!");return "future2完成!";});CompletableFuture<Void> combindFuture = CompletableFuture.allOf(future1, future2);try {combindFuture.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}System.out.println("future1: " + future1.isDone() + ",future2: " +future2.isDone());
CompletableFuture常用方法总结
使用案例:实现最优的“烧水泡茶”程序
public class FutureTaskDemo3{public static void main(String[] args) throws ExecutionException, InterruptedException {// 创建任务T2的FutureTaskFutureTask<String> ft2 = new FutureTask<>(new T2Task());// 创建任务T1的FutureTaskFutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));// 线程T1执行任务ft1Thread T1 = new Thread(ft1);T1.start();// 线程T2执行任务ft2Thread T2 = new Thread(ft2);T2.start();// 等待线程T1执行结果System.out.println(ft1.get());}}// T1Task需要执行的任务:// 洗水壶、烧开水、泡茶class T1Task implements Callable<String> {FutureTask<String> ft2;// T1任务需要T2任务的FutureTaskT1Task(FutureTask<String> ft2){this.ft2 = ft2;}@Overridepublic String call() throws Exception {System.out.println("T1:洗水壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T1:烧开水...");TimeUnit.SECONDS.sleep(15);// 获取T2线程的茶叶String tf = ft2.get();System.out.println("T1:拿到茶叶:"+tf);System.out.println("T1:泡茶...");return "上茶:" + tf;}}// T2Task需要执行的任务:// 洗茶壶、洗茶杯、拿茶叶class T2Task implements Callable<String> {@Overridepublic String call() throws Exception {System.out.println("T2:洗茶壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T2:洗茶杯...");TimeUnit.SECONDS.sleep(2);System.out.println("T2:拿茶叶...");TimeUnit.SECONDS.sleep(1);return "龙井";}}
public class CompletableFutureDemo2 {public static void main(String[] args) {//任务1:洗水壶->烧开水CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {System.out.println("T1:洗水壶...");sleep(1, TimeUnit.SECONDS);System.out.println("T1:烧开水...");sleep(15, TimeUnit.SECONDS);});//任务2:洗茶壶->洗茶杯->拿茶叶CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {System.out.println("T2:洗茶壶...");sleep(1, TimeUnit.SECONDS);System.out.println("T2:洗茶杯...");sleep(2, TimeUnit.SECONDS);System.out.println("T2:拿茶叶...");sleep(1, TimeUnit.SECONDS);return "龙井";});//任务3:任务1和任务2完成后执行:泡茶CompletableFuture<String> f3 = f1.thenCombine(f2, (__, tf) -> {System.out.println("T1:拿到茶叶:" + tf);System.out.println("T1:泡茶...");return "上茶:" + tf;});//等待任务3执行结果System.out.println(f3.join());}static void sleep(int t, TimeUnit u){try {u.sleep(t);} catch (InterruptedException e) {}}}
