7.1 线程安全的同步容器类
1 .通过synchronizedSortedSet静态方法包装出一个同步容器
package com.crazymakercircle.syncontainer;// 省略importpublic class CollectionsDemo{public static void main(String[] args) throws InterruptedException{// 创建一下基础的有序集合SortedSet<String> elementSet = new TreeSet<String>();// 增加元素elementSet.add("element 1");elementSet.add("element 2");// 将 elementSet 包装成一个同步容器SortedSet sorset = Collections.synchronizedSortedSet(elementSet);// 输出容器中的元素System.out.println("SortedSet is :" + sorset);CountDownLatch latch=new CountDownLatch(5);for (int i = 0; i < 5; i++){int finalI = i;ThreadUtil.getCpuIntenseTargetThreadPool().submit(() ->{// 向同步容器中增加一个元素sorset.add("element " + (3 + finalI));Print.tco("add element"+ (3 + finalI));latch.countDown();});}latch.await();// 输出容器中的元素System.out.println("SortedSet is :" + sorset);}}
- java.util.Collections所提供的同步包装方法
-
7.2 JUC高并发容器
7.3 CopyOnWriteArrayList
7.3.1 CopyOnWriteArrayList的使用
前面讲到,Collections可以将基础容器包装为线程安全的同步容器,但是这些同步容器包装类在进行元素迭代时并不能进行元素添加操作。下面是一个简单的例子: ```java package com.crazymakercircle.lockfree; // 省略import public class CopyOnWriteArrayListTest {
//并发操作的执行目标public static class CocurrentTarget implements Runnable{//并发操作的目标队列List<String> targetList = null;public CocurrentTarget(List<String> targetList){this.targetList = targetList;}@Overridepublic void run(){Iterator<String> iterator = targetList.iterator();//迭代操作while (iterator.hasNext()){// 在迭代操作时,进行列表的修改String threadName = currentThread().getName();Print.tco("开始往同步队列加入线程名称:" + threadName);targetList.add(threadName);}}}//测试同步队列:在迭代操作时,进行列表的修改@Testpublic void testSynchronizedList(){List<String> notSafeList = asList("a", "b", "c");List<String> synList = Collections.synchronizedList(notSafeList);//创建一个执行目标CocurrentTarget synchronizedListListDemo =new CocurrentTarget(synList);//10个线程并发for (int i = 0; i < 10; i++){new Thread(synchronizedListListDemo , "线程" + i).start();}//主线程等待sleepSeconds(1000);}
}
那么,该如何解决此问题呢?可使用CopyOnWriteArrayList替代Collections.synchronizedList同步包装实例,具体的代码如下:```javapackage com.crazymakercircle.lockfree;// 省略importpublic class CopyOnWriteArrayListTest{//测试CopyOnWriteArrayList@Testpublic void testcopyOnWriteArrayList(){List<String> notSafeList = asList("a", "b", "c");//创建一个CopyOnWriteArrayList队列List<String> copyOnWriteArrayList = new CopyOnWriteArrayList();copyOnWriteArrayList.addAll(notSafeList);//并发执行目标CocurrentTarget copyOnWriteArrayListDemo =new CocurrentTarget(copyOnWriteArrayList);for (int i = 0; i < 10; i++){new Thread(copyOnWriteArrayListDemo, "线程" + i).start();}//主线程等待sleepSeconds(1000);}}
7.3.2 CopyOnWriteArrayList的原理
CopyOnWriteArrayList的核心成员如下:
public class CopyOnWriteArrayList<E>implements List<E>, RandomAccess, Cloneable, java.io.Serializable {private static final long serialVersionUID = 8673264195747942595L;/** 对所有的修改器方法进行保护,访问器方法并不需要保护 */final transient ReentrantLock lock = new ReentrantLock();/** 内部对象数组,通过 getArray/setArray方法访问 */private transient volatile Object[] array;/***获取内部对象数组*/final Object[] getArray() {return array;}/***设置内部对象数组*/final void setArray(Object[] a) {array = a;}// 省略其他代码}
7.3.3 CopyOnWriteArrayList读取操作
/** 操作内存的引用*/private transient volatile Object[] array;public E get(int index) {return get(getArray(), index);}//获取元素@SuppressWarnings("unchecked")private E get(Object[] a, int index) {return (E) a[index];}//返回操作内存final Object[] getArray() {return array;}
7.3.4 CopyOnWriteArrayList写入操作
public boolean add(E e) {final ReentrantLock lock = this.lock;lock.lock(); // 加锁try {Object[] elements = getArray();int len = elements.length;// 复制新数组Object[] newElements = Arrays.copyOf(elements, len + 1);newElements[len] = e;setArray(newElements);return true;} finally {lock.unlock(); // 释放锁}}
7.3.5 CopyOnWriteArrayList的迭代器实现
static final class COWIterator<E> implements ListIterator<E> {/**对象数组的快照(snapshot)*/private final Object[] snapshot;/** Index of element to be returned by subsequent call to next. */private int cursor;private COWIterator(Object[] elements, int initialCursor) {cursor = initialCursor;snapshot = elements;}public boolean hasNext() {return cursor < snapshot.length;}//下一个元素public E next() {if (! hasNext())throw new NoSuchElementException();return (E) snapshot[cursor++];}}
7.4 BlockingQueue
7.4.1 BlockingQueue的特点
7.4.2 阻塞队列的常用方法
public interface BlockingQueue<E> extends Queue<E> {//将指定的元素添加到此队列的尾部//在成功时返回true,如果此队列已满,就抛出IllegalStateExceptionboolean add(E e);//非阻塞式添加:将指定的元素添加到此队列的尾部(如果立即可行且不会超过该队列的容量)//如果该队列已满,就直接返回boolean offer(E e)//限时阻塞式添加:将指定的元素添加到此队列的尾部//如果该队列已满,那么在到达指定的等待时间之前,添加线程会阻塞,等待可用的空间,该方法可中断boolean offer(E e, long timeout, TimeUnit unit)throws InterruptedException;//阻塞式添加:将指定的元素添加到此队列的尾部,如果该队列已满,就一直等待(阻塞)void put(E e) throws InterruptedException;//阻塞式删除:获取并移除此队列的头部,如果没有元素就等待(阻塞)//直到有元素,将唤醒等待线程执行该操作E take() throws InterruptedException;//非阻塞式删除:获取并移除此队列的头部,如果没有元素就直接返回null(空)E poll() throws InterruptedException;//限时阻塞式删除:获取并移除此队列的头部,在指定的等待时间前一直等待获取元素,超过时间,方法将结束E poll(long timeout, TimeUnit unit) throws InterruptedException;//获取但不移除此队列的头元素,没有则抛出异常NoSuchElementExceptionE element();//获取但不移除此队列的头元素,如果此队列为空,就返回nullE peek();//从此队列中移除指定元素,返回删除是否成功boolean remove(Object o);}
7.4.3 常见的BlockingQueue
- ArrayBlockingQueue
- LinkedBlockingQueue
- DelayQueue
- PriorityBlockingQueue
-
7.4.4 ArrayBlockingQueue的基本使用
```java package com.crazymakercircle.producerandcomsumer.store; // 省略import public class ArrayBlockingQueuePetStore {
public static final int MAX_AMOUNT = 10; //数据区长度//共享数据区,类定义static class DataBuffer<T>{//使用阻塞队列保存数据private ArrayBlockingQueue<T> dataList =new ArrayBlockingQueue<>(MAX_AMOUNT);// 向数据区增加一个元素,委托给阻塞队列public void add(T element) throws Exception{dataList.add(element); //直接委托}/*** 从数据区取出一个商品,委托给阻塞队列*/public T fetch() throws Exception{return dataList.take(); //直接委托}}public static void main(String[] args) throws InterruptedException{Print.cfo("当前进程的ID是" + JvmUtil.getProcessID());System.setErr(System.out);//共享数据区,实例对象DataBuffer<IGoods> dataBuffer = new DataBuffer<>();//生产者执行的操作Callable<IGoods> produceAction = () ->{//首先生成一个随机的商品IGoods goods = Goods.produceOne();//将商品加上共享数据区dataBuffer.add(goods);return goods;};//消费者执行的操作Callable<IGoods> consumerAction = () ->{// 从PetStore获取商品IGoods goods = null;goods = dataBuffer.fetch();return goods;};// 同时并发执行的线程数final int THREAD_TOTAL = 20;// 线程池,用于多线程模拟测试
<a name="DV7Lo"></a>## 7.4.5 ArrayBlockingQueue构造器和成员1. ArrayBlockingQueue构造器```java//默认非公平阻塞队列ArrayBlockingQueue queue = new ArrayBlockingQueue(capacity);//公平阻塞队列ArrayBlockingQueue queue1 = new ArrayBlockingQueue(capacity,true);
//只带一个capacity参数的构造器public ArrayBlockingQueue(int capacity) {this(capacity, false);}//带两个参数的构造器public ArrayBlockingQueue(int capacity, boolean fair) {if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair); //根据fair参数构造公平锁/获取非公平锁notEmpty = lock.newCondition(); //有元素加入,队列为非空notFull = lock.newCondition(); //有元素被取出,队列为未满}
ArrayBlockingQueue内部的成员变量 ```java public class ArrayBlockingQueue
extends AbstractQueue implements BlockingQueue<E>, java.io.Serializable {/** 存储数据的数组 */final Object[] items;/**获取、删除元素的索引,主要用于take、poll、peek、remove方法 */int takeIndex;/**添加元素的索引,主要用于 put、offer、add方法*/int putIndex;/** 队列元素的个数 */int count;/** 控制并发访问的显式锁 */final ReentrantLock lock;/**notEmpty条件对象,用于通知take线程(消费队列),可执行删除操作 */private final Condition notEmpty;/**notFull条件对象,用于通知put线程(生产队列),可执行添加操作 */private final Condition notFull;/**迭代器*/transient Itrs itrs = null;
}
