1. 线程池创建
线程池创建有三种:
- 自定义创建线程池
- 如果使用spring,可以使用spring自带的线程池
- 可以使用Executors框架体系(不推介使用,because 它使用的都是无界限队列,容易发生OOM)
1.1 自定义创建线程池
@Beanpublic ThreadPoolExecutor echartExecutor() {log.info("ThreadPoolExecutor echartExecutor init ... ");ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(9, 27, 30, TimeUnit.MINUTES, new ArrayBlockingQueue<>(10000),Executors.defaultThreadFactory(), new AbortPolicy());threadPoolExecutor.prestartAllCoreThreads(); // 预启动所有核心线程log.info("ThreadPoolExecutor echartExecutor init completed !!! ");return threadPoolExecutor;
线程创建工厂也可以自己定义
static class NameTreadFactory implements ThreadFactory {private final AtomicInteger mThreadNum = new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());System.out.println(t.getName() + " has been created");return t;}}
拒绝执行处理器也可以自己定义
public static class MyIgnorePolicy implements RejectedExecutionHandler {public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {doLog(r, e);}private void doLog(Runnable r, ThreadPoolExecutor e) {// 可做日志记录等System.err.println( r.toString() + " rejected");// System.out.println("completedTaskCount: " + e.getCompletedTaskCount());}}
- corePoolSize:表示线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。
- maximumPoolSize:表示线程池创建的最大线程数。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。
- keepAliveTime & unit:上面提到项目根据忙闲来增减人员,那在编程世界里,如何定义忙和闲呢?很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲,keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。
- workQueue:工作队列,和上面示例代码的工作队列同义。
- threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
- handler:通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。ThreadPoolExecutor 已经提供了以下 4 种策略。
- CallerRunsPolicy:提交任务的线程自己去执行该任务。
- AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
- DiscardPolicy:直接丢弃任务,没有任何异常抛出。
- DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
- CallerRunsPolicy:提交任务的线程自己去执行该任务。
Java 在 1.6 版本还增加了 allowCoreThreadTimeOut(boolean value) 方法,它可以让所有线程都支持超时,这意味着如果项目很闲,就会将项目组的成员都撤走。
1.2 spring自己的线程池
@Bean(name = "threadPoolTaskExecutor")public ThreadPoolTaskExecutor getAsyncThreadPoolTaskExecutor() {ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();threadPool.setCorePoolSize(10);threadPool.setMaxPoolSize(60);threadPool.setKeepAliveSeconds(60);threadPool.setQueueCapacity(10000);threadPool.setWaitForTasksToCompleteOnShutdown(true);threadPool.setAwaitTerminationSeconds(60);threadPool.initialize();return threadPool;}
1.3 Executors工具框架体系创建线程池
//创建固定线程数的线程池ExecutorService pool = Executors.newFixedThreadPool(taskSize);//创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)ExecutorService pool = new CachedThreadPool()//创建一个单线程化的ExecutorExecutorService pool = new SingleThreadExecutor()//创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。ExecutorService pool = new ScheduledThreadPool(int corePoolSize)
2. 使用线程池的注意事项
- 强烈建议使用有界队列,这也是不推荐使用
Executors的原因(Executors使用的都是无界队列容易发生OOM) - 使用有界队列,任务过多时候,容易发生拒绝策略,默认的拒绝策略是:throw RuntimeException. 默认拒绝策略要慎重使用,建议自定义自己的拒绝策略;
并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。
3. 获取线程返回结果
3.1 execute(Runnable r)方法没有返回值
ThreadPoolExecutor 的 void execute(Runnable command) 方法,利用这个方法虽然可以提交任务,但是却没有办法获取任务的执行结果(execute() 方法没有返回值)。
3.2 3个submit()方法
Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。下面我们先来介绍这 3 个 submit() 方法,这 3 个方法的方法签名如下。
// 提交 Runnable 任务Future<?> submit(Runnable task);// 提交 Callable 任务<T> Future<T> submit(Callable<T> task);// 提交 Runnable 任务及结果引用<T> Future<T> submit(Runnable task, T result);
你会发现它们的返回值都是 Future 接口,Future 接口有 5 个方法,我都列在下面了,它们分别是取消任务的方法 cancel()、判断任务是否已取消的方法 isCancelled()、判断任务是否已结束的方法 isDone()以及2 个获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。通过 Future 接口的这 5 个方法你会发现,我们提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是:这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。
// 取消任务boolean cancel(boolean mayInterruptIfRunning);// 判断任务是否已取消boolean isCancelled();// 判断任务是否已结束boolean isDone();// 获得任务执行结果get();// 获得任务执行结果,支持超时get(long timeout, TimeUnit unit);
这 3 个 submit() 方法之间的区别在于方法参数不同,下面我们简要介绍一下。
- 提交 Runnable 任务
submit(Runnable task):这个方法的参数是一个 Runnable 接口,Runnable 接口的 run() 方法是没有返回值的,所以submit(Runnable task)这个方法返回的 Future 仅可以用来断言任务已经结束了,类似于 Thread.join()。 - 提交 Callable 任务
submit(Callable<T> task):这个方法的参数是一个 Callable 接口,它只有一个 call() 方法,并且这个方法是有返回值的,所以这个方法返回的 Future 对象可以通过调用其 get() 方法来获取任务的执行结果。 - 提交 Runnable 任务及结果引用
submit(Runnable task, T result):这个方法很有意思,假设这个方法返回的 Future 对象是 f,f.get() 的返回值就是传给 submit() 方法的参数 result。这个方法该怎么用呢?下面这段示例代码展示了它的经典用法。需要你注意的是 Runnable 接口的实现类 Task 声明了一个有参构造函数Task(Result r),创建 Task 对象的时候传入了 result 对象,这样就能在类 Task 的 run() 方法中对 result 进行各种操作了。result 相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据。
3.3 FutureTask工具类
下面我们再来介绍 FutureTask 工具类。前面我们提到的 Future 是一个接口,而 FutureTask 是一个实实在在的工具类,这个工具类有两个构造函数,它们的参数和前面介绍的 submit() 方法类似,所以这里我就不再赘述了。
FutureTask(Callable<V> callable);FutureTask(Runnable runnable, V result);
那如何使用 FutureTask 呢?其实很简单,FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。下面的示例代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行。
// 创建 FutureTaskFutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);// 创建线程池ExecutorService es = Executors.newCachedThreadPool();// 提交 FutureTaskes.submit(futureTask);// 获取计算结果Integer result = futureTask.get();
FutureTask 对象直接被 Thread 执行的示例代码如下所示。相信你已经发现了,利用 FutureTask 对象可以很容易获取子线程的执行结果。
// 创建 FutureTaskFutureTask<Integer> futureTask = new FutureTask<>(()-> 1+2);// 创建并启动线程Thread T1 = new Thread(futureTask);T1.start();// 获取计算结果Integer result = futureTask.get();

下面的示例代码就是用这一章提到的 Future 特性来实现的。首先,我们创建了两个 FutureTask——ft1 和 ft2,ft1 完成洗水壶、烧开水、泡茶的任务,ft2 完成洗茶壶、洗茶杯、拿茶叶的任务;这里需要注意的是 ft1 这个任务在执行泡茶任务前,需要等待 ft2 把茶叶拿来,所以 ft1 内部需要引用 ft2,并在执行泡茶之前,调用 ft2 的 get() 方法实现等待
// 创建任务 T2 的 FutureTaskFutureTask<String> ft2= new FutureTask<>(new T2Task());// 创建任务 T1 的 FutureTaskFutureTask<String> ft1= new FutureTask<>(new T1Task(ft2));// 线程 T1 执行任务 ft1Thread T1 = new Thread(ft1);T1.start();// 线程 T2 执行任务 ft2Thread T2 = new Thread(ft2);T2.start();// 等待线程 T1 执行结果System.out.println(ft1.get());// T1Task 需要执行的任务:// 洗水壶、烧开水、泡茶class T1Task implements Callable<String>{FutureTask<String> ft2;// T1 任务需要 T2 任务的 FutureTaskT1Task(FutureTask<String> ft2){this.ft2 = ft2;}@OverrideString call() throws Exception {System.out.println("T1: 洗水壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T1: 烧开水...");TimeUnit.SECONDS.sleep(15);// 获取 T2 线程的茶叶String tf = ft2.get();System.out.println("T1: 拿到茶叶:"+tf);System.out.println("T1: 泡茶...");return " 上茶:" + tf;}}// T2Task 需要执行的任务:// 洗茶壶、洗茶杯、拿茶叶class T2Task implements Callable<String> {@OverrideString call() throws Exception {System.out.println("T2: 洗茶壶...");TimeUnit.SECONDS.sleep(1);System.out.println("T2: 洗茶杯...");TimeUnit.SECONDS.sleep(2);System.out.println("T2: 拿茶叶...");TimeUnit.SECONDS.sleep(1);return " 龙井 ";}}// 一次执行结果:T1: 洗水壶...T2: 洗茶壶...T1: 烧开水...T2: 洗茶杯...T2: 拿茶叶...T1: 拿到茶叶: 龙井T1: 泡茶...上茶: 龙井
