正文
今天主要来看一下 Java 中的读写锁 ReentrantReadWriteLock
,读写锁也是可重入锁的一种,可以看出,其实现也是依托于 AQS 来实现的,可以说 AQS 就是并发包的核心知识,了解了 AQS,并发包中一半的内容也就很容易了解了
读写锁在平时我是用的不多,可能受限于经验以及经历,读写锁的了解我是直接来源于其他博客以及对于源码的阅读,我主要会以源码的角度分析一下读写锁的原理,上一篇博客的末尾我也使用了读写锁的例子,说明了一下共享模式下唤醒的方式 (链式的),同属于一个共享模式下等待的线程会和多米诺骨牌一样,一个被前一个唤醒。
读写锁顾名思义,分读锁和写锁,读锁是共享的,写锁是独占的,也就是说,读写锁用到了 AQS 里面独占模式以及共享模式,读写锁我们分开解析,先来看看构造器。
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
// todo 入参用于传递 sync
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
构造器比较简单,空入参或者包含是否公平的入参,默认情况下是非公平的,我们主要讲解一下非公平的,公平的有兴趣可以自己看一看源码,变化不大。
读写锁共用了一个 32 位 state,将高 16 位作为了读锁的状态,低 16 位作为写锁的状态,通过两个方法分别获取读写锁的次数
static int sharedCount(int c) {
// todo 高 16 位 (读锁的次数)
return c >>> SHARED_SHIFT;
}
static int exclusiveCount(int c) {
// todo 低 16 位 (写锁的次数)
return c & EXCLUSIVE_MASK;
}
先来看一下写锁,写锁就是一个独占锁
lock
public void lock() {
// AQS
sync.acquire(1);
}
看过这么多遍 AQS 了,我们直接去看 tryAcquire 方法
tryAcquire
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
// todo 写锁的重入次数
int w = exclusiveCount(c);
// todo 有读写锁
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// todo 如果没有写锁 (因为锁不等于0,没有写锁表明有读锁,返回 false, 进 AQS队列) 或者当前线程不等于写锁的线程,就返回 false
// todo 返回 false 后进入 AQS 队列等待
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// todo 上面已经确保了是当前线程,所以不需要 cas
setState(c + acquires);
return true;
}
// todo writerShoudBlock 返回false,这里主要用于判断 cas 是否成功,有没有其他线程抢占,失败进 AQS 队列
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
上面代码中,出现了一个 writerSouldBlock 方法,这个方法主要和公平非公平有关
/**
* todo 和 writerShouldBlock 类似,只不过非公平模式下,为了防止写线程饥饿
*/
abstract boolean readerShouldBlock();
/**
* todo 这里主要会和公平以及非公平策略挂钩,非公平直接返回 false,公平模式判断当前是否允许获取锁
*/
abstract boolean writerShouldBlock();
... 公平模式实现
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
// todo 用于判断当前线程是否是下一个可以获取锁的线程
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
... 非公平模式实现
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
// todo (head != null) && (head.next != null) && (head.next 不是共享锁 (是写锁的时候,并且线程不为空))
// todo 这里保证了当一个持锁线程的后续线程是写线程的时候,那么读线程就要等待,否则可以尝试重入,防止了
// todo 当前线程 -> 写线程 -> (读线程获取锁) 最后的读线程需要阻塞,防止写线程饥渴 (读 >> 写)
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
写锁释放方法 unlock
unlock
public void unlock() {
sync.release(1);
}
tryRelease
protected final boolean tryRelease(int releases) {
// todo 必须是独占锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// todo 因为可重入,所以全部释放完才算锁释放
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
下面看一下读锁的上锁以及释放的过程
lock
public void lock() {
sync.acquireShared(1);
}
acquireShared AQS
public final void acquireShared(int arg) {
// todo 如果获取的锁小于 0 (获取失败)
if (tryAcquireShared(arg) < 0)
// todo 进入 AQS 自旋未果后挂起等待唤醒 (共享模式会在获取到锁也唤醒后续节点)
doAcquireShared(arg);
}
tryAcquireShared
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// todo 有写锁并且当前线程不等于写线程,返回 -1 进入 AQS 队列 (当前线程的读锁是可以上锁的 (锁降级))
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// todo 读锁的重入次数
int r = sharedCount(c);
// todo 判断当前持有锁线程的后面一个节点是否写线程,是写线程的话就返回 true (非公平)
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// todo 重入
firstReaderHoldCount++;
} else {
// todo 每个线程的读锁次数,通过 ThreadLocal 实现,这里的 cachedHoldCounter 是上一次读操作的缓存,
// todo firstReader 和 firstReaderHoldCount 以及 cachedHoldCounter都是提升效率,减少 ThreadLocal 的访问
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// todo 上面加锁失败就进入死循环,这里面的逻辑和外面实际上差不多,只是不断循环直到又返回结果,就不贴出来了
return fullTryAcquireShared(current);
}
unlock
public void unlock() {
sync.releaseShared(1);
}
releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// todo 释放锁的时候,唤醒后续等待节点
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// todo 获取相应的重入数,并且计算
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// todo 非本线程就拿上一次 cache 的 counter,如果不是再从 ThreadLocal 里面拿,然后做相应的计算
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// todo 对 state 进行值得修改
for (; ; ) {
int c = getState();
int nextc = c - SHARED_UNIT;
// todo 确保成功
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
关于锁降级,读写锁有一种锁降级的写法,就是获取写锁,获取读锁,释放写锁,在释放读锁
锁降级
try{
write.lock();
doSomeThing();
read.lock();
write.unlock();
doSomeReadThing();
}finally{
read.unlock();
}
说一下这种写法的个人理解吧,网上有很多说法,这里我就不说了,单纯的说一下自己的理解,假设我们不这么写,而是释放读锁之后在获取读锁,这样会有什么问题呢?就是在获取读锁的时候,可能被其他线程抢先获取了写锁,进而本线程无法获取读锁,假设释放完写锁我们需要对上面修改的数据进行一些操作,就会被分割开,下次再拿到锁的时候,数据就可能被其他线程修改了就不是预期的了,有人可能会说,整个操作都放在写锁里面,这样固然没有问题,但是锁住的代码就会比较多,写锁是一种独占锁,就会导致并发搞得情况效率比较低,所以这里就事先获取了读锁,即使在写锁释放了的情况,其他写线程也无法获取锁,而其他读的线程就可以获取读锁了,提高了效率。
上面也只是我个人对于锁降级的看法,可能有些地方不太得当或者不太正确,大家也应有自己的理解。
本文首次发布于 fyypumpkin Blog, 作者 @fyypumpkin ,转载请保留原文链接.