一、前言
本文参考:https://blog.csdn.net/qq_31865983/article/details/106137777
CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。
多线程的基本编程参考:
7-多线程
二、创建异步任务
1、Future
通常的线程池接口类ExecutorService,其中execute方法的返回值是void,即无法获取异步任务的执行状态,3个重载的submit方法的返回值是Future,可以据此获取任务执行的状态和结果,示例如下:
public class callableTest {public static void main(String[] args) throws ExecutionException, InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(3);Future<String> future = executorService.submit(new Callable<String>() {@Overridepublic String call() throws Exception {Thread.sleep(2000);int a = 1 / 0;return Thread.currentThread().getName();}});// 每次执行一个线程System.out.println("main thread start,time->" + System.currentTimeMillis());//等待子任务执行完成,如果已完成则直接返回结果//如果执行任务异常,则get方法会把之前捕获的异常重新抛出System.out.println(future.get());System.out.println("main thread end,time->" + System.currentTimeMillis());executorService.shutdown();}}
执行结果如下(get方法抛出异常导致主线程异常终止),若无异常则正常输出
子线程是异步执行的,主线程休眠等待子线程执行完成,子线程执行完成后唤醒主线程,主线程获取任务执行结果后退出。
2、supplyAsync/runAsync
supplyAsync表示创建带返回值的异步任务的,相当于ExecutorService submit(Callable
(1)runAsync
public class Test01 {public static ExecutorService executorService = Executors.newFixedThreadPool(10);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main....start.....");CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {System.out.println("当前线程 :" + Thread.currentThread().getId());int i = 10 / 2;System.out.println("运行结果:" + i);}, executorService);executorService.shutdown();}}main....start.....当前线程 :13运行结果:5
(2)supplyAsync
CompletableFuture<Object> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程 :" + Thread.currentThread().getId());int i = 10 / 2;System.out.println("运行结果:" + i);return i;}, executorService);System.out.println(supplyAsyncFuture.get());main....start.....当前线程 :13运行结果:55
(3)默认线程池
2-ForkJoinPool
上述两方法各有一个重载版本,可以指定执行异步任务的Executor实现,如果不指定,默认使用ForkJoinPool.commonPool(),如果机器是单核的,则默认使用ThreadPerTaskExecutor,该类是一个内部类,每次执行execute都会创建一个新线程。测试用例如下:
CompletableFuture<Object> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程 :" + Thread.currentThread().getId());int i = 10 / 2;System.out.println("运行结果:" + i);return i;});System.out.println(supplyAsyncFuture.get());
三、异步回调函数
1、thenApply / thenApplyAsync
thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中,当使用 ExecutorService 线程池的时候,这两个方法没有什么区别,当使用 ForkJoinPool 线程池的使用,thenApply的方法使用的是和上一个方法相同的线程,而 thenApplyAsync 使用的是新的线程。
(1)thenApply
ExecutorService
public class Test03 {public static ExecutorService executorService = Executors.newFixedThreadPool(10);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main....start.....");CompletableFuture<String> feature = CompletableFuture.supplyAsync(() -> {System.out.println("a、当前线程 :" + Thread.currentThread().getId());int i = 10 / 2;System.out.println("运行结果:" + i);return i;}, executorService).thenApply(res -> {System.out.println("b、当前线程 :" + Thread.currentThread().getId());System.out.println("任务四启动了。。。" + res);return "hello" + res;});System.out.println("main....end...." + feature.get());}}main....start.....a、当前线程 :13运行结果:5b、当前线程 :14任务四启动了。。。5main....end....hello5
ForkJoinPool
public class Test03 {private static final ForkJoinPool pool = new ForkJoinPool();public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main....start.....");CompletableFuture<String> feature = CompletableFuture.supplyAsync(() -> {System.out.println("a、当前线程 :" + Thread.currentThread().getId());int i = 10 / 2;System.out.println("运行结果:" + i);return i;}, executorService).thenApply(res -> {System.out.println("b、当前线程 :" + Thread.currentThread().getId());System.out.println("任务四启动了。。。" + res);return "hello" + res;});System.out.println("main....end...." + feature.get());}}main....start.....a、当前线程 :13运行结果:5b、当前线程 :13任务四启动了。。。5main....end....hello5
(2)thenApplyAsync
ExecutorService
public class Test03 {public static ExecutorService executorService = Executors.newFixedThreadPool(10);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main....start.....");CompletableFuture<String> feature = CompletableFuture.supplyAsync(() -> {System.out.println("a、当前线程 :" + Thread.currentThread().getId());int i = 10 / 2;System.out.println("运行结果:" + i);return i;}, executorService).thenApplyAsync(res -> {System.out.println("b、当前线程 :" + Thread.currentThread().getId());System.out.println("任务四启动了。。。" + res);return "hello" + res;}, executorService);System.out.println("main....end...." + feature.get());}}main....start.....a、当前线程 :13运行结果:5b、当前线程 :14任务四启动了。。。5main....end....hello5
ForkJoinPool
public class Test03 {private static final ForkJoinPool pool = new ForkJoinPool();public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main....start.....");CompletableFuture<String> feature = CompletableFuture.supplyAsync(() -> {System.out.println("a、当前线程 :" + Thread.currentThread().getId());int i = 10 / 2;System.out.println("运行结果:" + i);return i;}, executorService).thenApplyAsync(res -> {System.out.println("b、当前线程 :" + Thread.currentThread().getId());System.out.println("任务四启动了。。。" + res);return "hello" + res;}, executorService);System.out.println("main....end...." + feature.get());}}main....start.....a、当前线程 :13运行结果:5b、当前线程 :14任务四启动了。。。5main....end....hello5
2、thenAccept/thenApplyAsync
和 �thenApply / thenApplyAsync 的用法一致,只是,没有返回值,下面只举一个例子。
public class Test03 {public static ExecutorService executorService = Executors.newFixedThreadPool(10);public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main....start.....");CompletableFuture<Void> feature = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程 :" + Thread.currentThread().getId());int i = 10 / 2;System.out.println("运行结果:" + i);return i;}, executorService).thenAcceptAsync(res -> {System.out.println("任务三执行。。。。" + res);}, executorService);System.out.println("main....end...." + feature.get());}}main....start.....当前线程 :13运行结果:5任务三执行。。。。5main....end....null
3、exceptionally
exceptionally方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中,如果该任务正常执行则会exceptionally方法返回的CompletionStage的result就是该任务正常执行的结果,测试用例如下:
public class Test03 {private static final ForkJoinPool pool = new ForkJoinPool();public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main....start.....");CompletableFuture<Integer> supplyAsyncFuture = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程 :" + Thread.currentThread().getId());int i = 10 / 0;System.out.println("运行结果:" + i);return i;}).exceptionally(throwable -> {throwable.printStackTrace();// 抛出异常throw new RuntimeException(throwable);// 异常时候的处理// return Integer.parseInt("1");});System.out.println(supplyAsyncFuture.get());}}main....start.....当前线程 :13java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zeroat java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1692)at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)Caused by: java.lang.ArithmeticException: / by zeroat demo08_completableFuture.Test01.lambda$main$0(Test01.java:26)at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)... 6 more。。。。。。。。
4、whenComplete
whenComplete是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。测试用例如下:
public class Test03 {private static final ForkJoinPool pool = new ForkJoinPool();public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main....start.....");/*** 方法完成后的感知*/CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程 :" + Thread.currentThread().getId());int i = 10 / 0;System.out.println("运行结果:" + i);return i;}, executorService).whenComplete((res, exception) -> {// 虽然能得到异常信息,但是没法修改结果System.out.println("异步任务成功完成了....结果是:" + res + "异常是:" + exception);}).exceptionally(throwable -> {// 感知异常,同时返回默认值return 10;});Integer integer = future.get();System.out.println("main....end...." + integer);}}结果:main....start.....当前线程 :13异步任务成功完成了....结果是:null异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zeromain....end....10
5、handle
跟whenComplete基本一致,区别在于handle的回调方法有返回值,且handle方法返回的CompletableFuture的result是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFuture的result无关了。测试用例如下:
public class Test03 {private static final ForkJoinPool pool = new ForkJoinPool();public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("main....start.....");ExecutorService executorService = Executors.newFixedThreadPool(3);CompletableFuture<Integer> feature = CompletableFuture.supplyAsync(() -> {System.out.println("当前线程 :" + Thread.currentThread().getId());int i = 10 / 2;System.out.println("运行结果:" + i);return i;}, executorService).handle((a, b) -> {if (b != null) {// 删除异常b.printStackTrace();}return a;});System.out.println(feature.get());}}
四、组合处理
1、thenCombine/thenAcceptBoth/runAfterBoth
这三个方法都是将两个CompletableFuture组合起来,只有这两个都正常执行完了才会执行某个任务,区别在于,thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。测试用例如下:
public class Test07 {public static void main(String[] args) throws ExecutionException, InterruptedException {// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + "start cf time->" + System.currentTimeMillis());try {Thread.sleep(20000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + "end cf time->" + System.currentTimeMillis());return 1.2;});CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());try {Thread.sleep(15000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());return 3.2;});//cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,且有返回值CompletableFuture<Double> cf3 = cf.thenCombine(cf2, (a, b) -> {System.out.println(Thread.currentThread() + " start job3,time->" + System.currentTimeMillis());System.out.println("job3 param a->" + a + ",b->" + b);try {Thread.sleep(20000);} catch (InterruptedException e) {}System.out.println(Thread.currentThread() + " exit job3,time->" + System.currentTimeMillis());return a + b;});//cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,无返回值CompletableFuture cf4 = cf.thenAcceptBoth(cf2, (a, b) -> {System.out.println("job4 param a->" + a + ",b->" + b);try {Thread.sleep(15000);} catch (InterruptedException e) {}});//cf4和cf3都执行完成后,执行cf5,无入参,无返回值CompletableFuture cf5 = cf4.runAfterBoth(cf3, () -> {try {Thread.sleep(10000);} catch (InterruptedException e) {}System.out.println("cf5 do something");});//等待子任务执行完成System.out.println("cf run result->" + cf.get());System.out.println("cf3 run result->" + cf3.get());System.out.println("cf5 run result->" + cf5.get());System.out.println("main thread exit,time->" + System.currentTimeMillis());}}执行结果:Thread[ForkJoinPool.commonPool-worker-5,5,main] start job2,time->1639575226010Thread[ForkJoinPool.commonPool-worker-19,5,main]start cf time->1639575226010Thread[ForkJoinPool.commonPool-worker-5,5,main] exit job2,time->1639575241012Thread[ForkJoinPool.commonPool-worker-19,5,main]end cf time->1639575246012Thread[main,5,main] start job3,time->1639575246013job4 param a->1.2,b->3.2job3 param a->1.2,b->3.2Thread[main,5,main] exit job3,time->1639575266020cf5 do somethingcf run result->1.2cf3 run result->4.4cf5 run result->nullmain thread exit,time->1639575276022
2、applyToEither / acceptEither / runAfterEither
这三个方法都是将两个CompletableFuture组合起来,只要其中一个执行完了就会执行某个任务,其区别在于applyToEither会将已经执行完成的任务的执行结果作为方法入参,并有返回值;acceptEither同样将已经执行完成的任务的执行结果作为方法入参,但是没有返回值;runAfterEither没有方法入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。测试用例如下:
public class Test08 {public static void main(String[] args) throws ExecutionException, InterruptedException {// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {}return 1.2;});CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1500);} catch (InterruptedException e) {}return 3.2;});//cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,且有返回值CompletableFuture<Double> cf3 = cf.applyToEither(cf2, (result) -> {System.out.println("job3 param result->" + result);return result;});//cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,无返回值CompletableFuture cf4 = cf.acceptEither(cf2, (result) -> {System.out.println("job4 param result->"+result);});//cf4和cf3都执行完成后,执行cf5,无入参,无返回值CompletableFuture cf5 = cf4.runAfterEither(cf3, () -> {System.out.println("cf5 do something");});//等待子任务执行完成System.out.println("cf run result->" + cf.get());System.out.println("cf3 run result->" + cf3.get());System.out.println("cf5 run result->" + cf5.get());}}// 结果job4 param result->3.2cf5 do somethingjob3 param result->3.2cf run result->1.2cf3 run result->3.2cf5 run result->null
3、thenCompose
thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例,如果该CompletableFuture实例的result不为null,则返回一个基于该result的新的CompletableFuture实例;如果该CompletableFuture实例为null,则,然后执行这个新任务,测试用例如下:
public class Test09 {public static void main(String[] args) throws ExecutionException, InterruptedException {// 创建异步执行任务:CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {}return 1.2;});CompletableFuture<String> cf2 = cf.thenCompose((param) -> {System.out.println("job3 .....param:" + param);try {Thread.sleep(2000);} catch (InterruptedException e) {}return CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {}return "job3 test";});});//等待子任务执行完成System.out.println("cf run result->" + cf.get());System.out.println("main thread start cf2.get(),time->" + System.currentTimeMillis());System.out.println("cf2 run result->" + cf2.get());}}// 结果cf run result->1.2job3 .....param:1.2main thread start cf2.get(),time->1640015940253cf2 run result->job3 test
4、allOf
allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只要有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
public class Test10 {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {}return 1.2;});CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1500);} catch (InterruptedException e) {}return 2.2;});CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(() -> {int a = 1/0;try {Thread.sleep(1500);} catch (InterruptedException e) {}return 3.2;});// 当三个线程都执行成功之后才执行 ,a 一般都是 null,b是输出异常CompletableFuture<Void> cf4 = CompletableFuture.allOf(cf, cf2, cf3).whenComplete((a, b) -> {System.out.println("cf4 a-->" + a);System.out.println("cf4 b-->" + b);});//等待子任务执行完成System.out.println("cf run result->" + cf.get());System.out.println("cf2 run result->" + cf2.get());System.out.println("cf3 run result->" + cf3.get());System.out.println("cf4 run result->" + cf4.get());}}// 结果cf4 a-->nullcf4 b-->java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zerocf run result->1.2cf2 run result->2.2Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zeroat java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)at demo08_completableFuture.Test10.main(Test10.java:45)Caused by: java.lang.ArithmeticException: / by zeroat demo08_completableFuture.Test10.lambda$main$2(Test10.java:28)...........
5、anyof
anyOf返回的CompletableFuture是多个任务只要其中一个执行完成就会执行,其get返回的是已经执行完成的任务的执行结果,如果该任务执行异常,则抛出异常。
public class Test11 {public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {}return 1.2;});CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1500);} catch (InterruptedException e) {}return 2.2;});CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1500);} catch (InterruptedException e) {}return 3.2;});// 当三个线程都执行成功之后才执行 ,a 一般都是 null,b是输出异常CompletableFuture<Object> cf4 = CompletableFuture.anyOf(cf, cf2, cf3).whenComplete((a, b) -> {System.out.println("cf4 a-->" + a);System.out.println("cf4 b-->" + b);});//等待子任务执行完成System.out.println("cf run result->" + cf.get());System.out.println("cf2 run result->" + cf2.get());System.out.println("cf3 run result->" + cf3.get());System.out.println("cf4 run result->" + cf4.get());}}结果cf4 a-->3.2cf4 b-->nullcf run result->1.2cf2 run result->2.2cf3 run result->3.2cf4 run result->3.2
