一 介绍
- 阻塞队列(BlockingQueue):顾名思义,首先它是一个队列。
- 当阻塞队列是空时,从队列中获取元素的操作线程将会被阻塞。
- 当阻塞队列是满时,往队列中添加元素的操作线程将会被阻塞。
阻塞队列是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
1.1 好处
在多线程领域:所谓阻塞,在某些情况下会挂起线程(即线程阻塞),一旦条件满足,被挂起的线程又会被自动唤醒。
- 好处是我们不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为BlockingQueue都一手给你包办好了。
- 在concurrent包发布以前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
1.2 核心用法
在阻塞队列不可用的时候,它对于操作线程提供了4种处理方式。
注意:阻塞队列如果是无界队列,那么插入元素的时是一直成功的,不会发生阻塞操作线程的情况。
| 方法类型 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
| 移除 | remove() | poll() | take() | poll(time,unit) |
| 检查 | element() | peek() | 不可用 | 不可用 |
| 抛出异常 | 当阻塞队列满的时候,再往队列add插入元素会抛出 IllegalStateException:Queue full。当阻塞队列空的时候,再往队列remove移除元素时候会抛出异常NoSuchElementException |
|---|---|
| 特殊值 | 插入方法,成功返回true,失败返回false。移除方法,成功返回元素,队列为空返回null |
| 一直阻塞 | 当队列满的时候,生产者继续往队列里面put插入元素,队列会一直阻塞直到put插入成功或者响应中断退出。当队列为空的时候,消费者会从队列take移除元素,队列会一直阻塞消费者,直到队列有元素可以被移除。 |
| 超时退出 | 当阻塞队列满的时候,生产者会阻塞消费者一定时间。超时后生产者线程自动退出。 |
二 JDK中的阻塞队列
// JDK8源码:BlockingQueuepackage java.util.concurrent;// 直接父接口是Queue,Queue的直接父接口是Collectionpublic interface BlockingQueue<E> extends Queue<E> {
2.1 ArrayBlockingQueue
由数组结构组成的有界阻塞队列。三个构造方法如下
public ArrayBlockingQueue(int capacity) {this(capacity, false);}/*** fair:指的是公平性,当有很多线程都访问阻塞队列已经阻塞,此时TRUE表示先阻塞的线程在阻塞队列* 有空余的位置时优先访问阻塞队列,这时候会适当的降低程序的吞吐量。*/public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {this(capacity, fair);final ReentrantLock lock = this.lock;lock.lock(); // Lock only for visibility, not mutual exclusiontry {int i = 0;try {for (E e : c) {checkNotNull(e);items[i++] = e;}} catch (ArrayIndexOutOfBoundsException ex) {throw new IllegalArgumentException();}count = i;putIndex = (i == capacity) ? 0 : i;} finally {lock.unlock();}}
2.2 LinkedBlockingQueue
由链表结构组成的有界阻塞队列,但是队列默认最大长度为:Integer.MAX_VALUE
此队列按照先进先出的原则对元素进行排序。
2.3 PriorityBlockingQueue
支持元素优先级顺序排列的无界阻塞队列。
默认情况下,元素按照自然排序的规则升序排列,但是也可以自定义类实现 compareTo 方法来指定元素的排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来进行排序。需要注意的是不能保证同优先级元素的顺序。
2.4 DelayQueue
由PriorityBlockingQueue实现的延时无界阻塞队列。
队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
2.5 SynchronousQueue
不存储元素的阻塞队列,也即是单个元素的队列,每一个put操作必须等待一个take操作,否则不能继续添加元素。
SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。
2.6 LinkedTransferQueue
由链表结构组成的无界阻塞队列,相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
- transfer方法
如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者了才返回。
- tryTransfer方法
tryTransfer方法时用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回fasle。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。
2.7 LinkedBlockingDeque
LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。
所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双向队列的最后一个元素。
三 生产者消费者
阻塞队列实现生产者消费者
class MyResource{private volatile boolean FLAG = true; // 默认开启,进行生产+消费private AtomicInteger atomicInteger = new AtomicInteger();BlockingQueue<String> blockingQueue = null;public MyResource(BlockingQueue<String> blockingQueue) {this.blockingQueue = blockingQueue;System.out.println(blockingQueue.getClass().getName());}// 生产者public void MyProd() throws Exception{String data = null;boolean retValue ; // 默认是falsewhile (FLAG){// 往阻塞队列填充数据data = atomicInteger.incrementAndGet()+""; // 等于++i的意思retValue = blockingQueue.offer(data,2L, TimeUnit.SECONDS); // 插入成功则返回trueif (retValue){ // 如果是true,那么代表当前这个线程插入数据成功System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"成功");}else { // 那么就是插入失败System.out.println(Thread.currentThread().getName()+"\t插入队列"+data+"失败");}TimeUnit.SECONDS.sleep(1);}// 如果FLAG是false了,马上打印System.out.println(Thread.currentThread().getName()+"\t大老板叫停了,表示FLAG=false,生产结束");}// 消费者public void MyConsumer() throws Exception{String result = null;while (FLAG) { // 开始消费// 两秒钟等不到生产者生产出来的数据就不取了result = blockingQueue.poll(2L,TimeUnit.SECONDS); // 阻塞队列移除元素失败则返回nullif (null == result || result.equalsIgnoreCase("")){ // 如果取不到数据了FLAG = false;System.out.println(Thread.currentThread().getName()+"\t 超过两秒钟没有取到数据,消费退出")return; // 退出}System.out.println(Thread.currentThread().getName()+"\t消费队列数据"+result+"成功");}}// 叫停方法public void stop() throws Exception{this.FLAG = false;}}public class ProdConsumer_BlockQueueDemo {public static void main(String[] args) throws Exception{MyResource myResource = new MyResource(new ArrayBlockingQueue<>(10));new Thread(() -> {System.out.println(Thread.currentThread().getName()+"\t 生产线程启动");try {myResource.MyProd();} catch (Exception e) {e.printStackTrace();}},"Prod").start();new Thread(() -> {System.out.println(Thread.currentThread().getName()+"\t 消费线程启动");try {myResource.MyConsumer();} catch (Exception e) {e.printStackTrace();}},"Consumer").start();try { TimeUnit.SECONDS.sleep(5); }catch (Exception e) {e.printStackTrace();}System.out.println("5秒钟时间到,大bossMain主线程叫停,活动结束");myResource.stop();}}
