并发编程的核心问题:
- 互斥: 同一时刻只允许一个线程访问共享资源==>Lock
- 同步: 线程之间的通信以及协作==>Condition
隐藏在并发包中的管程
再造管程的理由
为啥有了synchronized还要lock
针对死锁问题, 有破坏”不可抢占条件”的方案, 即当线程申请不到资源的时候, 可以释放已占有的资源
但synchronized做不到, 如果申请不到, 线程会直接进入阻塞状态, 如果重新设计一把锁解决此问题, 有三种方案:
1.能够响应中断. 当给阻塞的线程发送中断的信号时, 能够唤醒他, 那他就有机会释放占用的资源;
2.支持超时. 若线程一段时间内没获取到锁, 不是进入阻塞状态, 而是返回一个错误, 那这个线程也有机会释放曾经持有的锁;
3.非阻塞的获取锁. 如果尝试获取锁失败, 并不进入阻塞状态.
以上对应lock接口的三个方法:
// 支持中断的APIvoid lockInterruptibly()throws InterruptedException;// 支持超时的APIboolean tryLock(long time, TimeUnit unit)throws InterruptedException;// 支持非阻塞获取锁的APIboolean tryLock();
如何保证可见性
synchronized: synchronized 的解锁 Happens-Before 于后续对这个锁的加锁。
Lock: 利用了 volatile 相关的 Happens-Before 规则
Java SDK 里面的 ReentrantLock,内部持有一个 volatile 的成员变量 state,获取锁的时候,会读写 state 的值;解锁的时候,也会读写 state 的值(简化后的代码如下面所示)
class SampleLock {volatile int state;// 加锁lock() {// 省略代码无数state = 1;}// 解锁unlock() {// 省略代码无数state = 0;}}
如下面示例代码:
1.顺序性规则:对于线程 T1,value+=1 Happens-Before 释放锁的操作 unlock();
2.volatile 变量规则:由于 state = 1 会先读取 state,所以线程 T1 的 unlock() 操作 Happens-Before 线程 T2 的 lock() 操作;
3.传递性规则:线程 T1 的 value+=1 Happens-Before 线程 T2 的 lock() 操作。
因此, 线程 T1 对 value 进行了 +=1 操作,后续的线程 T2 能够看到 value 的正确结果
class X {private final Lock rtl =new ReentrantLock();int value;public void addOne() {// 获取锁rtl.lock();try {value+=1;} finally {// 保证锁能释放rtl.unlock();}}}
什么是可重入锁
ReentrantLock, 可重入锁, 线程可以重复获取同一把锁
示例代码: 在1处加锁, 在2处再次加锁
class X {private final Lock rtl =new ReentrantLock();int value;public int get() {// 获取锁rtl.lock(); ②try {return value;} finally {// 保证锁能释放rtl.unlock();}}public void addOne() {// 获取锁rtl.lock();try {value = 1 + get(); ①} finally {// 保证锁能释放rtl.unlock();}}}
可重入函数: 多个线程可以同时调用该函数, 每个线程都能得到正确结果; 同时在一个线程内支持线程切换, 无论被切换多少次, 结果都是正确的. 即可重入函数是线程安全的
公平锁与非公平锁
ReentrantLock的两个构造函数, true表示构造一个公平锁
//无参构造函数:默认非公平锁public ReentrantLock() {sync = new NonfairSync();}//根据公平策略参数创建锁public ReentrantLock(boolean fair){sync = fair ? new FairSync(): new NonfairSync();}
管程中谈到入口等待队列,锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。
如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒。
用锁的最佳实践
出自Doug Lea《Java 并发编程:设计原则与模式》一书:
- 永远只在更新对象的成员变量时加锁
- 永远只在访问可变的成员变量时加锁
- 永远不在调用其他对象的方法时加锁
示例代码: 存在活锁, A,B两账户相互转账,各自持有自己lock的锁,都一直在尝试获取对方的锁,形成了活锁。这个例子可以稍微改下,成功转账后应该跳出循环。加个随机重试时间避免活锁
class Account {private int balance;private final Lock lock= new ReentrantLock();// 转账void transfer(Account tar, int amt){while (true) {if(this.lock.tryLock()) {try {if (tar.lock.tryLock()) {try {this.balance -= amt;tar.balance += amt;} finally {tar.lock.unlock();}}//if} finally {this.lock.unlock();}}//if}//while}//transfer}
Dubbo如何用管程实现异步转同步
Condition实现了管程模型里的条件变量
Java 语言内置的管程里只有一个条件变量,而 Lock&Condition 实现的管程是支持多个条件变量的
同步与异步
调用方是否需要等待结果,如果需要等待结果,就是同步;如果不需要等待结果,就是异步。
同步是Java代码默认的处理方式, 让程序支持异步:
1.调用方创建一个子线程, 在子线程中执行方法调用, 这种调用称为异步调用;
2.方法实现的时候, 创建一个新的线程执行主要逻辑, 主线程直接return, 这种方法我们一般称为异步方法.
利用两个条件变量快速实现阻塞队列:**
public class BlockedQueue<T>{final Lock lock =new ReentrantLock();// 条件变量:队列不满final Condition notFull =lock.newCondition();// 条件变量:队列不空final Condition notEmpty =lock.newCondition();// 入队void enq(T x) {lock.lock();try {while (队列已满){// 等待队列不满notFull.await();}// 省略入队操作...//入队后,通知可出队notEmpty.signal();}finally {lock.unlock();}}// 出队void deq(){lock.lock();try {while (队列已空){// 等待队列不空notEmpty.await();}// 省略出队操作...//出队后,通知可入队notFull.signal();}finally {lock.unlock();}}}
Lock 和 Condition 实现的管程,线程等待和通知需要调用 await()、signal()、signalAll(),它们的语义和 wait()、notify()、notifyAll() 是相同的。
但是不一样的是,Lock&Condition 实现的管程里只能使用前面的 await()、signal()、signalAll(),而后面的 wait()、notify()、notifyAll() 只有在 synchronized 实现的管程里才能使用。
Dubbo的异步转同步:
1.通过对调用时等待结果的堆进行快照分析, 与DefaultFuture.get()有关
2.分析get()方法
public class DubboInvoker{Result doInvoke(Invocation inv){// 下面这行就是源码中108行// 为了便于展示,做了修改return currentClient.request(inv, timeout).get();}}
// 创建锁与条件变量private final Lock lock= new ReentrantLock();private final Condition done= lock.newCondition();// 调用方通过该方法等待结果Object get(int timeout){long start = System.nanoTime();lock.lock();try {while (!isDone()) {done.await(timeout);long cur=System.nanoTime();if (isDone() ||cur-start > timeout){break;}}} finally {lock.unlock();}if (!isDone()) {throw new TimeoutException();}return returnFromResponse();}// RPC结果是否已经返回boolean isDone() {return response != null;}// RPC结果返回时调用该方法private void doReceived(Response res) {lock.lock();try {response = res;if (done != null) {done.signalAll();}} finally {lock.unlock();}}
本质: 当 RPC 返回结果之前,阻塞调用线程,让调用线程等待;当 RPC 返回结果后,唤醒调用线程,让调用线程重新执行。
**
Lock&Condition 实现的管程相对于 synchronized 实现的管程来说更加灵活、功能也更丰富。
Semaphore: 如何快速实现一个限流器
Semaphore, 信号量
描述代码化:
class Semaphore{// 计数器int count;// 等待队列Queue queue;// 初始化操作Semaphore(int c){this.count=c;}//void down(){this.count--;if(this.count<0){//将当前线程插入等待队列//阻塞当前线程}}void up(){this.count++;if(this.count<=0) {//移除等待队列中的某个线程T//唤醒线程T}}}
补充:
信号量模型里面,down()、up() 这两个操作历史上最早称为 P 操作和 V 操作,所以信号量模型也被称为 PV 原语。另外,还有些人喜欢用 semWait() 和 semSignal() 来称呼它们,虽然叫法不同,但是语义都是相同的。在 Java SDK 并发包里,down() 和 up() 对应的则是 acquire() 和 release()。
如何使用信号量
static int count;//初始化信号量static final Semaphore s= new Semaphore(1);//用信号量保证互斥static void addOne() {s.acquire();try {count+=1;} finally {s.release();}}
理解:
当T1和T2两个线程访问addOne()方法时,
1.由于acquire()是一个原子性操作, 假设T1线程将计数器-1, 计数器变为0; T2线程再-1变成-1, T2线程将阻塞, 进入等待队列;
2.T1执行完后release(), 将计数器加1变为0, 即将T2从等待队列中移除, T2将被唤醒接着执行.
快速实现一个限流器
Semaphore 可以允许多个线程访问一个临界区。
需求: 池化资源, 例如线程池, 连接池等, 在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的.
实现对象池, 一次性创建出 N 个对象,之后所有的线程重复利用这 N 个对象,当然对象在被释放前,也是不允许其他线程使用的。限流,指的是不允许多于 N 个线程同时进入临界区.
信号量的计数器,在上面的例子中,我们设置成了 1,这个 1 表示只允许一个线程进入临界区,但如果我们把计数器的值设置成对象池里对象的个数 N,就能完美解决对象池的限流问题了。
class ObjPool<T, R> {final List<T> pool;// 用信号量实现限流器final Semaphore sem;// 构造函数ObjPool(int size, T t){pool = new Vector<T>(){};for(int i=0; i<size; i++){pool.add(t);}sem = new Semaphore(size);}// 利用对象池的对象,调用funcR exec(Function<T,R> func) {T t = null;sem.acquire();try {t = pool.remove(0);return func.apply(t);} finally {pool.add(t);sem.release();}}}// 创建对象池ObjPool<Long, String> pool =new ObjPool<Long, String>(10, 2);// 通过对象池获取t,之后执行pool.exec(t -> {System.out.println(t);return t.toString();});
ReadWriteLock:如何快速实现一个完备的缓存?
什么是读写锁?
读写锁,并不是 Java 语言特有的,而是一个广为使用的通用技术,所有的读写锁都遵守以下三条基本原则:
- 允许多个线程同时读共享变量;
- 只允许一个线程写共享变量;
- 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。
快速实现一个缓存
用 ReadWriteLock 快速实现一个通用的缓存工具类。
class Cache<K,V> {final Map<K, V> m =new HashMap<>();final ReadWriteLock rwl =new ReentrantReadWriteLock();// 读锁final Lock r = rwl.readLock();// 写锁final Lock w = rwl.writeLock();// 读缓存V get(K key) {r.lock();try { return m.get(key); }finally { r.unlock(); }}// 写缓存V put(K key, V value) {w.lock();try { return m.put(key, v); }finally { w.unlock(); }}}
实现缓存的按需加载
class Cache<K,V> {final Map<K, V> m =new HashMap<>();final ReadWriteLock rwl =new ReentrantReadWriteLock();final Lock r = rwl.readLock();final Lock w = rwl.writeLock();V get(K key) {V v = null;//读缓存r.lock(); ①try {v = m.get(key); ②} finally{r.unlock(); ③}//缓存中存在,返回if(v != null) { ④return v;}//缓存中不存在,查询数据库w.lock(); ⑤try {//再次验证//其他线程可能已经查询过数据库v = m.get(key); ⑥if(v == null){ ⑦//查询数据库v=省略代码无数m.put(key, v);}} finally{w.unlock();}return v;}}
读写锁的升级与降级
如下面代码, ①处获取读锁,在②处如果缓存不存在则升级为写锁, 更新缓存后再释放写锁, 最后在③处释放读锁
//读缓存r.lock(); ①try {v = m.get(key); ②if (v == null) {w.lock();try {//再次验证并更新缓存//省略详细代码} finally{w.unlock();}}} finally{r.unlock(); ③}
这称为锁的升级。可惜 ReadWriteLock 并不支持这种升级。在上面的代码示例中,读锁还没有释放,此时获取写锁,会导致写锁永久等待,最终导致相关线程都被阻塞,永远也没有机会被唤醒。锁的升级是不允许的,这个一定要注意。
但是允许锁的降级, 以下代码来源自 ReentrantReadWriteLock 的官方示例,略做了改动。
class CachedData {Object data;volatile boolean cacheValid;final ReadWriteLock rwl =new ReentrantReadWriteLock();// 读锁final Lock r = rwl.readLock();//写锁final Lock w = rwl.writeLock();void processCachedData() {// 获取读锁r.lock();if (!cacheValid) {// 释放读锁,因为不允许读锁的升级r.unlock();// 获取写锁w.lock();try {// 再次检查状态if (!cacheValid) {data = ...cacheValid = true;}// 释放写锁前,降级为读锁// 降级是可以的r.lock(); ①} finally {// 释放写锁w.unlock();}}// 此处仍然持有读锁try {use(data);}finally {r.unlock();}}}
补充: 写锁支持条件变量,读锁是不支持条件变量的,读锁调用 newCondition() 会抛出 UnsupportedOperationException 异常。
如何解决缓存数据与源头数据的同步问题?
- 超时机制, 为缓存中的数据设置生存时间
- 源头数据发生变化时反馈给缓存
- 操作数据时, 同时写数据库和缓存
