模式图

使用BlockingQueue实现生产者和消费者模式

如上图所示,对于生产者和消费者主要就是包含中类型的对象
- 生产者
- 消费者
- 仓库(阻塞队列)
BlockingQueue就是一个阻塞队列,当使用take方法的时候,如果没有数据,就会阻塞,使用put方法的时候,如果队列已经满了,也会阻塞。实现代码如下:
生产者
public class Producer implements Runnable {// 阻塞队列private BlockingQueue<Object> queue;public Producer(BlockingQueue<Object> queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) {Thread.sleep(1000);queue.put(new Date());System.out.println("生产了一个,共有" + queue.size());}} catch (InterruptedException e) {e.printStackTrace();}}}
消费者
public class Consumer implements Runnable {private BlockingQueue<Object> queue;public Consumer(BlockingQueue<Object> blockingQueue) {this.queue = blockingQueue;}@Overridepublic void run() {try {while (true) {Thread.sleep(1000);Object take = queue.take();System.out.println("消费了" + take + "还剩" + queue.size());}} catch (InterruptedException e) {e.printStackTrace();}}}
测试代码
public class ProducerAndConsumer {public static void main(String[] args) throws InterruptedException {BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);Producer producer = new Producer(queue);for (int i = 0; i < 10; i++) {new Thread(producer).start();}Consumer consumer = new Consumer(queue);for (int i = 0; i < 4; i++) {new Thread(consumer).start();}}}
使用Condition实现生产者和消费者模式
package ltd.personalstudy.threadbasic;import java.util.LinkedList;import java.util.Queue;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;/*** @Author 咖啡杯里的茶* @date 2020/12/6*/public class CustomCondition {private Queue queue;private int max = 16;private ReentrantLock lock = new ReentrantLock();// 提醒消费者private Condition notEmpty = lock.newCondition();// 提醒生产者private Condition notFull = lock.newCondition();public CustomCondition(int max) {this.max = max;queue = new LinkedList();}/*** 生产者生产产品*/public void put(Object o) {lock.lock();try {while (queue.size() == max) {notFull.await();}// 生产了一个产品queue.add(o);System.out.println("生产了一个" + System.currentTimeMillis());// 提醒消费者notEmpty.signalAll();} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}/*** 消费者** @return*/public Object take() throws InterruptedException {lock.lock();try {// 这里不使用if进行判断是为了避免虚假唤醒while (queue.size() == 0) {notEmpty.await();}Object remove = queue.remove();System.out.println("消费了一个" + System.currentTimeMillis());notFull.signalAll();return remove;} finally {lock.unlock();}}public static void main(String[] args) {CustomCondition condition = new CustomCondition(16);// 创建生产者线程new Thread(()->{while (true) {condition.put(new Object());}}).start();// 创建消费者线程new Thread(()->{while (true) {try {condition.take();} catch (InterruptedException e) {e.printStackTrace();}}}).start();}}
