实现生产者和消费者
1. wait() / notifyAll()
public class ProducerConsumer { private static class Producer implements Runnable { private List<Integer> list; private int capacity; public Producer(List list, int capacity) { this.list = list; this.capacity = capacity; } @Override public void run() { while (true) { synchronized (list) { try { String producer = Thread.currentThread().getName(); while (list.size() == capacity) { System.out.println("生产者 " + producer + ":list 已达到最大容量,进行wait"); list.wait(); System.out.println("生产者 " + producer + ":退出 wait"); } Random random = new Random(); int i = random.nextInt(); System.out.println("生产者 " + producer + ":生产数据" + i); list.add(i); list.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } private static class Consumer implements Runnable { private List<Integer> list; public Consumer(List list) { this.list = list; } @Override public void run() { while (true) { synchronized (list) { try { String consumer = Thread.currentThread().getName(); while (list.isEmpty()) { System.out.println("消费者 " + consumer + ":list 为空,进行 wait"); list.wait(); System.out.println("消费者 " + consumer + ":退出wait"); } Integer element = list.remove(0); System.out.println("消费者 " + consumer + ":消费数据:" + element); list.notifyAll(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } public static void main(String[] args) { final LinkedList linkedList = new LinkedList(); final int capacity = 5; ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 10, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy()); executor.execute(new Producer(linkedList, capacity)); executor.execute(new Consumer(linkedList)); executor.shutdown(); }}
2. await() / sigalAll()
public class ProducerConsumer { private static ReentrantLock lock = new ReentrantLock(); private static Condition full = lock.newCondition(); private static Condition empty = lock.newCondition(); private static class Producer implements Runnable{ private List<Integer> list; private int capacity; public Producer(List list, int capacity) { this.list = list; this.capacity = capacity; } @Override public void run() { while (true){ lock.lock(); try { String producer = Thread.currentThread().getName(); while (list.size() == capacity){ System.out.println("生产者 " + producer + ":list 已达到最大容量,进行wait"); full.await(); System.out.println("生产者 " + producer + ":退出 wait"); } Random random = new Random(); int i = random.nextInt(); System.out.println("生产者 " + producer + ":生产数据" + i); list.add(i); empty.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } private static class Consumer implements Runnable{ private List<Integer> list; public Consumer(List list) { this.list = list; } @Override public void run() { while (true) { lock.lock(); try { String consumer = Thread.currentThread().getName(); while (list.isEmpty()) { System.out.println("消费者 " + consumer + ":list 为空,进行 wait"); empty.await(); System.out.println("消费者 " + consumer + ":退出wait"); } Integer element = list.remove(0); System.out.println("消费者 " + consumer + ":消费数据:" + element); full.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } } public static void main(String[] args) { final LinkedList linkedList = new LinkedList(); final int capacity = 5; ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 10, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy()); executor.execute(new Producer(linkedList,capacity)); executor.execute(new Consumer(linkedList)); }}
3. 阻塞队列
public class ProducerConsumer { private static class Producer implements Runnable { private BlockingQueue<Integer> queue; public Producer(BlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { while (true) { try { String producer = Thread.currentThread().getName(); Random random = new Random(); int i = random.nextInt(); System.out.println("生产者 " + producer + ":生产数据" + i); queue.put(i); } catch (InterruptedException e) { e.printStackTrace(); } } } } private static class Consumer implements Runnable { private BlockingQueue<Integer> queue; public Consumer(BlockingQueue<Integer> queue) { this.queue = queue; } @Override public void run() { while (true) { try { String consumer = Thread.currentThread().getName(); Integer element = queue.take(); System.out.println("消费者 " + consumer + ":消费数据:" + element); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { final BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 10, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy()); executor.execute(new Producer(queue)); executor.execute(new Consumer(queue)); executor.shutdown(); }}