package com.lms.jdk8.juc;import lombok.extern.slf4j.Slf4j;import java.util.ArrayDeque;import java.util.Deque;import java.util.HashSet;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;/** * @Author: 李孟帅 * @Date: 2021-12-16 10:33 * @Description: */@Slf4jpublic class ThreadPool { // 任务队列 private BlockQueue<Runnable> taskQueue; // 线程集合 private final HashSet<Worker> workers = new HashSet<>(); // 核心线程数 private int coreSize; // 获取任务的超时时间 private long timeout; private TimeUnit timeUnit; // 拒绝策略 private RejectPolicy<Runnable> rejectPolicy; public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, BlockQueue<Runnable> taskQueue, RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = taskQueue; this.rejectPolicy = rejectPolicy; } // 执行任务 public void execute(Runnable task) { synchronized (workers) { // 任务数没有超过coreSize时,直接交给worker执行,否则加入队列暂存 if (workers.size() < coreSize) { Worker worker = new Worker(task); log.info("创建worker {},task:{}", worker, task); workers.add(worker); worker.start(); } else { // 交给拒绝策略执行 taskQueue.tryPut(rejectPolicy, task); } } } class Worker extends Thread { private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { // 执行任务 // task不为null,执行任务 // task为null,在从任务队列中获取任务并执行 while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) { try { log.info("正在执行...{}", task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { task = null; } } synchronized (workers) { log.debug("worker 被移除{}", this); workers.remove(this); } } }}@Slf4jclass BlockQueue<T> { // 存放任务队列 private Deque<T> queue = new ArrayDeque<>(); // 容量 private int capacity; // 锁 private ReentrantLock lock = new ReentrantLock(); // 生产者条件变量 private Condition fullCWait = lock.newCondition(); // 消费者条件变量 private Condition emptyWait = lock.newCondition(); public BlockQueue(int capacity) { this.capacity = capacity; } // 带超时阻塞获取 public T poll(long timeout, TimeUnit unit) { lock.lock(); try { long nanos = unit.toNanos(timeout); while (queue.isEmpty()) { try { if (nanos <= 0) { return null; } // 返回值是剩余时间 nanos = emptyWait.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullCWait.signal(); return t; } finally { lock.unlock(); } } // 阻塞获取 public T take() { lock.lock(); try { while (queue.isEmpty()) { try { emptyWait.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullCWait.signal(); return t; } finally { lock.unlock(); } } // 阻塞添加 public void put(T element) { lock.lock(); try { while (queue.size() == capacity) { try { log.info("等待加入任务队列 {}...", element); fullCWait.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("加入任务队列 {}", element); queue.addLast(element); emptyWait.signal(); } finally { lock.unlock(); } } // 带超时时间阻塞添加 public boolean offer(T element, long timeout, TimeUnit unit) { lock.lock(); try { long nanos = unit.toNanos(timeout); while (queue.size() == capacity) { try { if (nanos <= 0) { log.info("等待加入任务队列超时 {}...", element); return false; } nanos = fullCWait.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.info("加入任务队列 {}", element); queue.addLast(element); emptyWait.signal(); return true; } finally { lock.unlock(); } } // 队列长度 public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } // 尝试加入队列,失败时 拒绝策略 public void tryPut(RejectPolicy<T> rejectPolicy, T element) { lock.lock(); try { // 队列是否已满 if (queue.size() == capacity) { rejectPolicy.reject(this, element); } else { //有空闲 加入队列 log.info("加入任务队列 {}", element); queue.addLast(element); emptyWait.signal(); } } finally { lock.unlock(); } }}@FunctionalInterfaceinterface RejectPolicy<T> { void reject(BlockQueue<T> queue, T element);}@Slf4jclass Test { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(2, 1000, TimeUnit.MILLISECONDS, new BlockQueue<>(1), (queue, element) -> { // 死等 // queue.put(element); // 超时等待 // queue.offer(element,500,TimeUnit.MILLISECONDS); // 让调用者自己放弃 // log.info("放弃{}", element); // 让调用者抛出异常 // throw new RuntimeException("任务执行失败:"+element); // 让调用者自己执行任务 element.run(); }); for (int i = 0; i < 5; i++) { int finalI = i; threadPool.execute(() -> { try { Thread.sleep(1000); log.info("run :{}", finalI); } catch (InterruptedException e) { e.printStackTrace(); } }); } }}