来自《Java并发编程的艺术》
一个简易的线程池的实现。三个类:
ThreadPool线程池接口
DefaultThreadPool线程池接口实现
Worker工作线程
线程池的工作逻辑:

全部代码
package ConcurrencyArt;import java.util.ArrayList;import java.util.Collections;import java.util.LinkedList;import java.util.List;import java.util.concurrent.atomic.AtomicLong;interface ThreadPool<Job extends Runnable> {void execute(Job job);//执行一个Job,这个Job需要实现Runnablevoid shutdown();//关闭线程池void addWorkers(int m);//增加工作者线程void removeWorkers(int m);//减少工作者线程int getJobSize();//得到正在等待执行的任务数量}public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {private static final int MAX_WORKER_NUMBERS = 10;//线程池最大限制数private static final int DEFAULT_WORKER_NUMBERS = 5;//线程池默认的数量private static final int MIN_WORKER_NUMBERS = 1;//线程池最小的数量private final LinkedList<Job> jobs = new LinkedList<>();//工作列表private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());//工作者列表private int workerNum = DEFAULT_WORKER_NUMBERS;//工作者线程的数量private AtomicLong threadNum = new AtomicLong();//线程编号生成public DefaultThreadPool() {initializeWorkers(DEFAULT_WORKER_NUMBERS);}public DefaultThreadPool(int num) {workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;initializeWorkers(workerNum);}//初始化线程工作者private void initializeWorkers(int num) {for (int i = 0; i < num; i++) {Worker worker = new Worker();workers.add(worker);Thread thread = new Thread(worker, "ThreadPool-Worker-" + threadNum.incrementAndGet());thread.start();}}@Overridepublic void execute(Job job) {if (job != null) {synchronized (jobs) {jobs.addLast(job);jobs.notify();//在worker的run方法中的while循环中,调用了jobs.wait();}}}@Overridepublic void shutdown() {for (Worker worker : workers) {worker.shutdown();}}@Overridepublic void addWorkers(int m) {synchronized (jobs) {if (m + this.workerNum > MAX_WORKER_NUMBERS) {m = MAX_WORKER_NUMBERS - this.workerNum;}initializeWorkers(m);this.workerNum += m;}}@Overridepublic void removeWorkers(int m) {synchronized (jobs) {if (m >= this.workerNum) {throw new IllegalArgumentException("beyond workNum");}int count = 0;while (count < m) {Worker worker = workers.get(count);if (workers.remove(worker)) {worker.shutdown();count++;}}this.workerNum -= count;}}@Overridepublic int getJobSize() {return jobs.size();}//工作者,负责消费任务。class Worker implements Runnable {private volatile boolean running = true;//在这里通过volatile boolean而不是interrupt方法来安全地中断线程@Overridepublic void run() {while (running) {Job job = null;synchronized (jobs) {while (jobs.isEmpty()) {//当工作队列为空时,所有的工作者线程均等待在工作队列上。try {jobs.wait();} catch (InterruptedException e) {Thread.currentThread().interrupt();return;//感知到外部对工作者线程的中断操作,返回。}}job = jobs.removeFirst();}if (job != null) {try {//job,作为一个Runnable的实现类,在这里并未将其变成线程并start(),// 而是将其的run方法放在工作者线程中执行,这样真正的多线程是指工作者线程,//而提交进来的Job只是作为一个应该异步执行的任务。job.run();} catch (Exception ex) {//忽略job执行中的exception}}}}public void shutdown() {running = false;}}}
