阻塞队列的产生来自于典型的生产者-消费者模式。
记住2个阻塞的方法
- take:从队列中取
- put:放入队列
不可放Null值
各种阻塞队列

- 其中有两个阻塞队列说是无界的。我大致看了一下源码好像并不对。
阻塞队列原理
这里我们就以ArrayBlockingQueue为例,阅读其put()和take()的方法
构造方法
/*** Serialization ID. This class relies on default serialization* even for the items array, which is default-serialized, even if* it is empty. Otherwise it could not be declared final, which is* necessary here.*/private static final long serialVersionUID = -817911632652898426L;/** 存放元素的数组 */final Object[] items;/** 下一个取数位置 */int takeIndex;/** 下一个入队列位置 */int putIndex;/** 元素个数 */int count;/** Concurrency control uses the classic two-condition algorithm* found in any textbook.*//** 可重入锁 */final ReentrantLock lock;/** 等待有元素入队列条件 */private final Condition notEmpty;/** 等待队列不满条件 */private final Condition notFull;/*** Creates an {@code ArrayBlockingQueue} with the given (fixed)* capacity and default access policy.** @param capacity the capacity of this queue* @throws IllegalArgumentException if {@code capacity < 1}*/public ArrayBlockingQueue(int capacity) {this(capacity, false);}/*** Creates an {@code ArrayBlockingQueue} with the given (fixed)* capacity and the specified access policy.** @param capacity the capacity of this queue* @param fair if {@code true} then queue accesses for threads blocked* on insertion or removal, are processed in FIFO order;* if {@code false} the access order is unspecified.* @throws IllegalArgumentException if {@code capacity < 1}*/public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}
-
put操作
```java public void put(E e) throws InterruptedException { //检查不为null checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try {
//当元素个数==队列(数组长度) 循环等待 notFull 条件while (count == items.length)notFull.await();//入队列enqueue(e);
} finally {
lock.unlock();
} }
private static void checkNotNull(Object v) { if (v == null)
throw new NullPointerException();
}
/**
- Inserts element at current put position, advances, and signals.
- Call only when holding lock.
- 入队列时还通知 不为空
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
count++; notEmpty.signal(); }putIndex = 0;
- 入队列时,判断元素不为空- 若队列已满,通过 NotFull阻塞- 入队列后,唤醒NotEmpty<a name="HttPy"></a>## take操作```javapublic E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)notEmpty.await();return dequeue();} finally {lock.unlock();}}/*** Extracts element at current take position, advances, and signals.* Call only when holding lock.*/private E dequeue() {// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items = this.items;@SuppressWarnings("unchecked")E x = (E) items[takeIndex];items[takeIndex] = null;if (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();notFull.signal();return x;}
