源码解读---线程锁源码探秘

我终生的等候,换不来你刹那的凝眸

Posted by yishuifengxiao on 2021-02-01

一 基本概念

1.1 各种常见的锁

1.1.1 悲观锁与乐观锁

锁的一种宏观分类方式是悲观锁乐观锁。悲观锁与乐观锁并不是特指某个锁(Java中没有哪个Lock实现类就叫PessimisticLock或OptimisticLock),而是在并发情况下的两种不同策略。

悲观锁(Pessimistic Lock), 就是很悲观,每次去拿数据的时候都认为别人会修改。所以每次在拿数据的时候都会上锁。这样别人想拿数据就被挡住,直到悲观锁被释放。

乐观锁(Optimistic Lock), 就是很乐观,每次去拿数据的时候都认为别人不会修改。所以不会上锁,不会上锁!但是如果想要更新数据,则会在更新前检查在读取至更新这段时间别人有没有修改过这个数据。如果修改过,则重新读取,再次尝试更新,循环上述步骤直到更新成功(当然也允许更新失败的线程放弃操作)。

悲观锁阻塞事务,乐观锁回滚重试,它们各有优缺点,不要认为一种一定好于另一种。像乐观锁适用于写比较少的情况下,即冲突真的很少发生的时候,这样可以省去锁的开销,加大了系统的整个吞吐量。但如果经常产生冲突,上层应用会不断的进行重试,这样反倒是降低了性能,所以这种情况下用悲观锁就比较合适。

乐观锁的基础——CAS

说到乐观锁,就必须提到一个概念:CAS

什么是CAS呢?Compare-and-Swap,即比较并替换,也有叫做Compare-and-Set的,比较并设置

1、比较:读取到了一个值A,在将其更新为B之前,检查原值是否仍为A(未被其他线程改动)。

2、设置:如果是,将A更新为B,结束。如果不是,则什么都不做。

上面的两步操作是原子性的,可以简单地理解为瞬间完成,在CPU看来就是一步操作。

有了CAS,就可以实现一个乐观锁

data = 123; // 共享数据

/* 更新数据的线程会进行如下操作 */
flag = true;
while (flag) {
oldValue = data; // 保存原始数据
newValue = doSomething(oldValue);

// 下面的部分为CAS操作,尝试更新data的值
if (data == oldValue) { // 比较
data = newValue; // 设置
flag = false; // 结束
} else {
// 啥也不干,循环重试
}
}
/*
很明显,这样的代码根本不是原子性的,
因为真正的CAS利用了CPU指令,
这里只是为了展示执行流程,本意是一样的。
*/

这是一个简单直观的乐观锁实现,它允许多个线程同时读取(因为根本没有加锁操作),但是只有一个线程可以成功更新数据,并导致其他要更新数据的线程回滚重试。 CAS利用CPU指令,从硬件层面保证了操作的原子性,以达到类似于锁的效果。

imgJava中真正的CAS操作调用的native方法

因为整个过程中并没有“加锁”和“解锁”操作,因此乐观锁策略也被称为无锁编程。换句话说,乐观锁其实不是“锁”,它仅仅是一个循环重试CAS的算法而已!

我们在Java里使用的各种锁,几乎全都是悲观锁。synchronized从偏向锁、轻量级锁到重量级锁,全是悲观锁。JDK提供的Lock实现类全是悲观锁。其实只要有“锁对象”出现,那么就一定是悲观锁。因为乐观锁不是锁,而是一个在循环里尝试CAS的算法。

那JDK并发包里到底有没有乐观锁呢?

有。java.util.concurrent.atomic包里面的原子类都是利用乐观锁实现的。

img原子类AtomicInteger的自增方法为乐观锁策略

为什么网上有些资料认为偏向锁、轻量级锁是乐观锁?理由是它们底层用到了CAS?或者是把“乐观/悲观”与“轻量/重量”搞混了?其实,线程在抢占这些锁的时候,确实是循环+CAS的操作,感觉好像是乐观锁。但问题的关键是,我们说一个锁是悲观锁还是乐观锁,总是应该站在应用层,看它们是如何锁住应用数据的,而不是站在底层看抢占锁的过程。如果一个线程尝试获取锁时,发现已经被占用,它是否继续读取数据,等后续要更新时再决定要不要重试?对于偏向锁、轻量级锁来说,显然答案是否定的。无论是挂起还是忙等,对应用数据的读取操作都被“挡住”了。从这个角度看,它们确实是悲观锁。

1.1.2 自旋锁

有一种锁叫自旋锁。所谓自旋,说白了就是一个 while(true) 无限循环。

刚刚的乐观锁就有类似的无限循环操作,那么它是自旋锁吗?

不是。尽管自旋与 while(true) 的操作是一样的,但还是应该将这两个术语分开。“自旋”这两个字,特指自旋锁的自旋。。

1.1.3 synchronized锁升级

偏向锁 → 轻量级锁 → 重量级锁

synchronized会从无锁升级为偏向锁,再升级为轻量级锁,最后升级为重量级锁,就像自动换挡一样。那么自旋锁在哪里呢?这里的轻量级锁就是一种自旋锁

初次执行到synchronized代码块的时候,锁对象变成偏向锁(通过CAS修改对象头里的锁标志位),字面意思是“偏向于第一个获得它的线程”的锁。执行完同步代码块后,线程并不会主动释放偏向锁。当第二次到达同步代码块时,线程会判断此时持有锁的线程是否就是自己(持有锁的线程ID也在对象头里),如果是则正常往下执行。由于之前没有释放锁,这里也就不需要重新加锁。如果自始至终使用锁的线程只有一个,很明显偏向锁几乎没有额外开销,性能极高。

一旦有第二个线程加入锁竞争,偏向锁就升级为轻量级锁(自旋锁)。这里要明确一下什么是锁竞争:如果多个线程轮流获取一个锁,但是每次获取锁的时候都很顺利,没有发生阻塞,那么就不存在锁竞争。只有当某线程尝试获取锁的时候,发现该锁已经被占用,只能等待其释放,这才发生了锁竞争。

在轻量级锁状态下继续锁竞争,没有抢到锁的线程将自旋,即不停地循环判断锁是否能够被成功获取。获取锁的操作,其实就是通过CAS修改对象头里的锁标志位。先比较当前锁标志位是否为“释放”,如果是则将其设置为“锁定”,比较并设置是原子性发生的。这就算抢到锁了,然后线程将当前锁的持有者信息修改为自己。

长时间的自旋操作是非常消耗资源的,一个线程持有锁,其他线程就只能在原地空耗CPU,执行不了任何有效的任务,这种现象叫做忙等(busy-waiting)。如果多个线程用一个锁,但是没有发生锁竞争,或者发生了很轻微的锁竞争,那么synchronized就用轻量级锁,允许短时间的忙等现象。这是一种折衷的想法,短时间的忙等,换取线程在用户态和内核态之间切换的开销。

显然,此忙等是有限度的(有个计数器记录自旋次数,默认允许循环10次,可以通过虚拟机参数更改)。如果锁竞争情况严重,某个达到最大自旋次数的线程,会将轻量级锁升级为重量级锁(依然是CAS修改锁标志位,但不修改持有锁的线程ID)。当后续线程尝试获取锁时,发现被占用的锁是重量级锁,则直接将自己挂起(而不是忙等),等待将来被唤醒。在JDK1.6之前,synchronized直接加重量级锁,很明显现在得到了很好的优化。

一个锁只能按照 偏向锁、轻量级锁、重量级锁的顺序逐渐升级(也有叫锁膨胀的),不允许降级。

偏向锁的一个特性是,持有锁的线程在执行完同步代码块时不会释放锁。那么当第二个线程执行到这个synchronized代码块时是否一定会发生锁竞争然后升级为轻量级锁呢?
线程A第一次执行完同步代码块后,当线程B尝试获取锁的时候,发现是偏向锁,会判断线程A是否仍然存活。如果线程A仍然存活,将线程A暂停,此时偏向锁升级为轻量级锁,之后线程A继续执行,线程B自旋。但是如果判断结果是线程A不存在了,则线程B持有此偏向锁,锁不升级

1.1.4 可重入锁(递归锁)

可重入锁的字面意思是“可以重新进入的锁”,即允许同一个线程多次获取同一把锁。比如一个递归函数里有加锁操作,递归过程中这个锁会阻塞自己吗?如果不会,那么这个锁就是可重入锁(因为这个原因可重入锁也叫做递归锁

Java里只要以Reentrant开头命名的锁都是可重入锁,而且JDK提供的所有现成的Lock实现类,包括synchronized关键字锁都是可重入的。如果你需要不可重入锁,只能自己去实现了.

1.1.5 公平锁、非公平锁

如果多个线程申请一把公平锁,那么当锁释放的时候,先申请的先得到,非常公平。显然如果是非公平锁,后申请的线程可能先获取到锁,是随机或者按照其他优先级排序的。

对ReentrantLock类而言,通过构造函数传参可以指定该锁是否是公平锁,默认是非公平锁。一般情况下,非公平锁的吞吐量比公平锁大,如果没有特殊要求,优先使用非公平锁。

对于synchronized而言,它也是一种非公平锁,但是并没有任何办法使其变成公平锁。

1.1.6 可中断锁

可中断锁,字面意思是“可以响应中断的锁”。

这里的关键是理解什么是中断。Java并没有提供任何直接中断某线程的方法,只提供了中断机制。何谓“中断机制”?线程A向线程B发出“请你停止运行”的请求(线程B也可以自己给自己发送此请求),但线程B并不会立刻停止运行,而是自行选择合适的时机以自己的方式响应中断,也可以直接忽略此中断。也就是说,Java的中断不能直接终止线程,而是需要被中断的线程自己决定怎么处理。

回到锁的话题上来,如果线程A持有锁,线程B等待获取该锁。由于线程A持有锁的时间过长,线程B不想继续等待了,我们可以让线程B中断自己或者在别的线程里中断它,这种就是可中断锁

在Java中,synchronized就是不可中断锁,而Lock的实现类都是可中断锁,可以简单看下Lock接口。

/* Lock接口 */
public interface Lock {

// 拿不到锁就一直等,拿到马上返回。
void lock();

// 拿不到锁就一直等,如果等待时收到中断请求,则需要处理InterruptedException。
void lockInterruptibly() throws InterruptedException;

// 无论拿不拿得到锁,都马上返回。拿到返回true,拿不到返回false。
boolean tryLock();

// 同上,可以自定义等待的时间。
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

void unlock();

Condition newCondition();
}

1.1.7 读写锁、共享锁、互斥锁

读写锁其实是一对锁,一个读锁(共享锁)和一个写锁(互斥锁、排他锁)。

看下Java里的ReadWriteLock接口,它只规定了两个方法,一个返回读锁,一个返回写锁。

image-20210206114702265

记得之前的乐观锁策略吗?所有线程随时都可以读,仅在写之前判断值有没有被更改。

读写锁其实做的事情是一样的,但是策略稍有不同。很多情况下,线程知道自己读取数据后,是否是为了更新它。那么何不在加锁的时候直接明确这一点呢?如果我读取值是为了更新它(SQL的for update就是这个意思),那么加锁的时候就直接加写锁,我持有写锁的时候别的线程无论读还是写都需要等待;如果我读取数据仅为了前端展示,那么加锁时就明确地加一个读锁,其他线程如果也要加读锁,不需要等待,可以直接获取(读锁计数器+1)。

虽然读写锁感觉与乐观锁有点像,但是读写锁是悲观锁策略。因为读写锁并没有在更新前判断值有没有被修改过,而是在加锁前决定应该用读锁还是写锁。乐观锁特指无锁编程,如果仍有疑惑可以再回到第一、二小节,看一下什么是“乐观锁”。

JDK提供的唯一一个ReadWriteLock接口实现类是ReentrantReadWriteLock。看名字就知道,它不仅提供了读写锁,而是都是可重入锁。 除了两个接口方法以外,ReentrantReadWriteLock还提供了一些便于外界监控其内部工作状态的方法,这里就不一一展开。

1.1.8 死锁

死锁: 两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程

死锁四个产生条件:

1)互斥条件: 指进程对所分配到的资源进行排它性使用,即在一段时间内某资源只由一个进程占用。如果此时还有其它进程请求资源,则请求者只能等待,直至占有资源的进程用毕释放。

2)请求和保持条件: 指进程已经保持至少一个资源,但又提出了新的资源请求,而该资源已被其它进程占有,此时请求进程阻塞,但又对自己已获得的其它资源保持不放。

3)不剥夺条件: 指进程已获得的资源,在未使用完之前,不能被剥夺,只能在使用完时由自己释放。

4)环路等待条件: 指在发生死锁时,必然存在一个进程——资源的环形链,即进程集合{P0,P1,P2,···,Pn}中的P0正在等待一个P1占用的资源;P1正在等待P2占用的资源,……,Pn正在等待已被P0占用的资源。

预防死锁打破上述之一的条件。

1.2 读写锁概念

这里说的读写锁指的是 ReentrantReadWriteLock ,ReadWriteLock的实现支持与ReentrantLock相似的语义。
此类具有以下属性:

1.2.1 Acquisition order

此类不对锁定访问强加读取器或写入器首选项顺序。 但是,它确实支持可选的公平性政策。

Non-fair mode (default)

当构造为不公平(默认)时,不受重入限制,未指定读写锁的进入顺序。 不断竞争的非公平锁可能会无限期地延迟一个或多个读取器或写入器线程,但通常比公平锁具有更高的吞吐量。

Fair mode

如果公平地构造线程,则使用近似到达顺序策略争用进入线程。释放当前持有的锁时,将为等待时间最长的单个写程序线程分配写锁定,或者如果有一组读取器线程的等待时间长于所有等待的写程序线程,则将为该组分配读锁定。
如果持有写锁或有一个正在等待的写程序线程,则试图(公正地)获取一个公平读锁的线程将阻塞。直到当前等待时间最久的写入器线程获得并释放写入锁后,该线程才会获取读取锁。当然,如果等待中的写作者放弃了等待,而将一个或多个阅读器线程作为队列中最长的侍者而没有写锁定,则将为这些阅读器分配读锁定。
尝试获取公平的写锁(非可重入)的线程将阻塞,除非读锁和写锁均是空闲的(这意味着没有等待的线程)。 (请注意,非阻塞的ReentrantReadWriteLock.ReadLock.tryLock()和ReentrantReadWriteLock.WriteLock.tryLock()方法不遵循此公平设置,并且如果可能的话,将立即获取锁定,而不管等待线程如何。)

1.2.2 Reentrancy(可重入)

此锁允许读取者和写入者以ReentrantLock的样式重新获取读取或写入锁定。 在释放写线程持有的所有写锁之前,不允许非可重入读者。
此外,写者可以获得读锁,反之则不能。 在其他应用程序中,当在调用或回调对在读锁下执行读取的方法的过程中保持写锁时,重新进入很有用。 如果读者试图获取写锁,它将永远不会成功。

1.2.3 Lock downgrading (锁定降级)

重入还可以通过获取写锁,然后读锁和释放写锁的方式,从写锁降级为读锁。 但是,无法从读取锁升级到写入锁。

1.2.4 Interruption of lock acquisition (锁获取中断)

读锁和写锁都支持在锁获取期间中断。

1.2.5 Condition support (条件支持)

写入锁提供的Condition实现与写入锁的行为方式相同,就像ReentrantLock.newCondition为ReentrantLock提供的Condition实现一样。 当然,此条件只能与写锁一起使用。
读锁不支持Condition,并且readLock()。newCondition()引发UnsupportedOperationException。

1.2.6 Instrumentation

此类支持确定是持有锁还是争用锁的方法。 这些方法设计用于监视系统状态,而不用于同步控制。

此类的序列化与内置锁的行为相同:反序列化的锁处于解锁状态,而不管序列化时的状态如何。

二 使用示例

这是一个代码草图,显示了在更新缓存后如何执行锁降级(以非嵌套方式处理多个锁时,异常处理特别棘手):

class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}

try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}

ReentrantReadWriteLocks可用于提高某些种类的Collection的并发性。 仅当预期集合很大,由读取器线程而不是写入器线程访问更多读取器线程且操作所带来的开销超过同步开销时,通常这才是值得的。 例如,这是一个使用TreeMap的类,该类应该很大并且可以同时访问。


import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class RWDictionary {
private final Map<String, Data> m = new TreeMap<String, Data>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();

public Data get(String key) {
r.lock();
try {
return m.get(key);
} finally {
r.unlock();
}
}

public String[] allKeys() {
r.lock();
try {
return m.keySet().toArray();
} finally {
r.unlock();
}
}

public Data put(String key, Data value) {
w.lock();
try {
return m.put(key, value);
} finally {
w.unlock();
}
}

public void clear() {
w.lock();
try {
m.clear();
} finally {
w.unlock();
}
}
}

此锁最多支持65535个递归写锁和65535个读锁。 尝试超过这些限制会导致锁定方法引发错误。

三 主要方法

image-20210205170334370

注意ReentrantReadWriteLock不是Lock的实例,而是ReentrantReadWriteLock的实例

3.1 构造函数

image-20210205170427105

默认情况下,创建的是非公平锁

主要部分代码如下

/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;

/**
* Creates a new {@code ReentrantReadWriteLock} with
* default (nonfair) ordering properties.
*/
public ReentrantReadWriteLock() {
this(false);
}

/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }

在上述代码中,有一个很重要的属性为 Sync ,其定义如下

/**
* Synchronization implementation for ReentrantReadWriteLock.
* Subclassed into fair and nonfair versions.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;

/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/

static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

/**
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}

/**
* ThreadLocal subclass. Easiest to explicitly define for sake
* of deserialization mechanics.
*/
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

/**
* The number of reentrant read locks held by current thread.
* Initialized only in constructor and readObject.
* Removed whenever a thread's read hold count drops to 0.
*/
private transient ThreadLocalHoldCounter readHolds;

/**
* The hold count of the last thread to successfully acquire
* readLock. This saves ThreadLocal lookup in the common case
* where the next thread to release is the last one to
* acquire. This is non-volatile since it is just used
* as a heuristic, and would be great for threads to cache.
*
* <p>Can outlive the Thread for which it is caching the read
* hold count, but avoids garbage retention by not retaining a
* reference to the Thread.
*
* <p>Accessed via a benign data race; relies on the memory
* model's final field and out-of-thin-air guarantees.
*/
private transient HoldCounter cachedHoldCounter;

/**
* firstReader is the first thread to have acquired the read lock.
* firstReaderHoldCount is firstReader's hold count.
*
* <p>More precisely, firstReader is the unique thread that last
* changed the shared count from 0 to 1, and has not released the
* read lock since then; null if there is no such thread.
*
* <p>Cannot cause garbage retention unless the thread terminated
* without relinquishing its read locks, since tryReleaseShared
* sets it to null.
*
* <p>Accessed via a benign data race; relies on the memory
* model's out-of-thin-air guarantees for references.
*
* <p>This allows tracking of read holds for uncontended read
* locks to be very cheap.
*/
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}

/*
* Acquires and releases use the same code for fair and
* nonfair locks, but differ in whether/how they allow barging
* when queues are non-empty.
*/

/**
* Returns true if the current thread, when trying to acquire
* the read lock, and otherwise eligible to do so, should block
* because of policy for overtaking other waiting threads.
*/
abstract boolean readerShouldBlock();

/**
* Returns true if the current thread, when trying to acquire
* the write lock, and otherwise eligible to do so, should block
* because of policy for overtaking other waiting threads.
*/
abstract boolean writerShouldBlock();

/*
* Note that tryRelease and tryAcquire can be called by
* Conditions. So it is possible that their arguments contain
* both read and write holds that are all released during a
* condition wait and re-established in tryAcquire.
*/

protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}

private IllegalMonitorStateException unmatchedUnlockException() {
return new IllegalMonitorStateException(
"attempt to unlock read lock, not locked by current thread");
}

protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

/**
* Performs tryLock for write, enabling barging in both modes.
* This is identical in effect to tryAcquire except for lack
* of calls to writerShouldBlock.
*/
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}

/**
* Performs tryLock for read, enabling barging in both modes.
* This is identical in effect to tryAcquireShared except for
* lack of calls to readerShouldBlock.
*/
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}

protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}

// Methods relayed to outer class

final ConditionObject newCondition() {
return new ConditionObject();
}

final Thread getOwner() {
// Must read state before owner to ensure memory consistency
return ((exclusiveCount(getState()) == 0) ?
null :
getExclusiveOwnerThread());
}

final int getReadLockCount() {
return sharedCount(getState());
}

final boolean isWriteLocked() {
return exclusiveCount(getState()) != 0;
}

final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}

final int getReadHoldCount() {
if (getReadLockCount() == 0)
return 0;

Thread current = Thread.currentThread();
if (firstReader == current)
return firstReaderHoldCount;

HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == getThreadId(current))
return rh.count;

int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}

/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
readHolds = new ThreadLocalHoldCounter();
setState(0); // reset to unlocked state
}

final int getCount() { return getState(); }
}

其中公平锁的定义如下

/**
* Fair version of Sync
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;

final boolean writerShouldBlock() {
// hasQueuedPredecessors() 定义在基类中
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
// hasQueuedPredecessors() 定义在基类中
return hasQueuedPredecessors();
}
}

非公平锁的定义

 /**
* Nonfair version of Sync
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;

final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
//作为一种避免无限期地饿死writer的试探法,如果暂时看起来是队列头的线程(如果存在)则阻塞,等待writer。
//这只是一种概率效应,因为如果在其他启用的读取器后面还没有等待中的写入器还没有从队列中耗尽,
//那么新的读取器将不会阻塞。
return apparentlyFirstQueuedIsExclusive();
}
}

3.2 读锁

image-20210206103616241

这里的读锁值得 ReentrantReadWriteLock.ReadLock ,其构造函数与主要方法如下图

image-20210205171257117

完整代码如下


/**
* The lock returned by method {@link ReentrantReadWriteLock#readLock}.
*/
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;

/**
* Constructor for use by subclasses
*
* @param lock the outer lock object
* @throws NullPointerException if the lock is null
*/
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}

/**
* 获取读取锁定。
* 如果写锁未被另一个线程持有,则获取读锁并立即返回。
*
* 如果写锁由另一个线程持有,则出于线程调度目的,当前线程将被禁用,并处于休眠状态,直到获取读锁为止。
*/
public void lock() {
sync.acquireShared(1);
}

/**
* 除非当前线程被中断,否则获取读锁定。
*
* 如果写锁未被另一个线程持有,则获取读锁并立即返回。
*
* 如果写锁由另一个线程持有,则出于线程调度的目的,当前线程将被禁用,
* 并处于休眠状态,直到发生以下两种情况之一:
* 1 读取锁由当前线程获取
* 2 其他一些线程中断当前线程。
*
* 如果当前线程:
* 1 在进入此方法时设置了其中断状态 或者
* 2 获取读锁时被中断
* 然后抛出InterruptedException并清除当前线程的中断状态。
*
* 在此实现中,由于此方法是显式的中断点,因此优先于对中断的响应而不是正常或可重入的锁获取。
*
* @throws InterruptedException if the current thread is interrupted
*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

/**
* 仅当调用时另一个线程未持有写锁时才获取读锁。
*
* 如果写锁未被另一个线程持有,则获取读锁,并立即返回true值。
* 即使已将此锁设置为使用公平的排序策略,对tryLock()的调用也会立即获取读取锁(如果可用)
* ,无论其他线程当前是否在等待读取锁。 即使破坏公平性,
* 这种“讨价还价”的行为在某些情况下还是有用的。
* 如果要遵守此锁的公平性设置,请使用几乎等效的tryLock(0,TimeUnit.SECONDS)
* (它还会检测到中断)。
*
* 如果写锁由另一个线程持有,则此方法将立即返回false值。
*
* @return {@code true} if the read lock was acquired
*/
public boolean tryLock() {
return sync.tryReadLock();
}

/**
*如果在给定的等待时间内另一个线程未持有写锁定并且当前线程未被中断,则获取读锁定
*
* 如果写锁未被另一个线程持有,则获取读锁,并立即返回true值。
* 如果已将此锁设置为使用公平的排序策略,则在任何其他线程正在等待该锁的情况下,
* 将不会获取可用锁。 这与tryLock()方法相反。
* 如果您想要定时的tryLock确实允许对公平锁进行插入,则将定时和非定时形式组合在一起:
* if (lock.tryLock() ||
* lock.tryLock(timeout, unit)) {
* ...
* }
*
* 如果写锁由另一个线程持有,则出于线程调度目的,当前线程将被禁用
* ,并且在发生以下三种情况之一之前,它处于休眠状态:
* 1 读锁由当前线程获取;
* 2 其他一些线程中断当前线程
* 3 经过指定的等待时间
* 如果获取了读锁,则返回值true。
*
*如果当前线程:
* 1 在进入此方法时设置了其中断状态
* 2 获取读取锁时中断
* 然后抛出InterruptedException并清除当前线程的中断状态。
*
* 如果经过了指定的等待时间,则返回值false。 如果时间小于或等于零,则该方法将完全不等待
*
* 在此实现中,由于此方法是显式的中断点,因此优先于对中断的响应,
* 而不是正常或可重入的锁定获取,而是优先报告等待时间的流逝
*
* @param timeout the time to wait for the read lock
* @param unit the time unit of the timeout argument
* @return {@code true} if the read lock was acquired
* @throws InterruptedException if the current thread is interrupted
* @throws NullPointerException if the time unit is null
*/
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

/**
* 尝试释放此锁。
*
* <p>If the number of readers is now zero then the lock
* is made available for write lock attempts.
*/
public void unlock() {
sync.releaseShared(1);
}

/**
* Throws {@code UnsupportedOperationException} because
* {@code ReadLocks} do not support conditions.
*
* @throws UnsupportedOperationException always
*/
public Condition newCondition() {
throw new UnsupportedOperationException();
}

/**
* 返回标识此锁定及其锁定状态的字符串。 括号中的状态包括字符串“ Read locks =”,后跟持有的读锁的数量。
*
* @return a string identifying this lock, as well as its lock state
*/
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}

3.3 写锁

image-20210206103554575

这里的读锁值得 ReentrantReadWriteLock.WriteLock,其构造函数与主要方法如下图

image-20210206085659190代码如下


import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* The lock returned by method {@link ReentrantReadWriteLock#writeLock}.
*/
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;

/**
* Constructor for use by subclasses
*
* @param lock the outer lock object
* @throws NullPointerException if the lock is null
*/
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}

/**
* 获取写锁。
* <p>
* 如果读取和写入锁定均未被另一个线程持有,则获取写入锁定并立即返回,将写入锁定保持计数设置为一个
* <p>
* 如果当前线程已经持有写锁,那么持有计数将增加一,并且该方法将立即返回
* <p>
* 如果锁是由另一个线程持有的,则当前线程将出于线程调度目的而被禁用,并处于休眠状态
* ,直到获取了写入锁为止,此时写入锁的保持计数被设置为一个。
*/
public void lock() {
sync.acquire(1);
}

/**
* 除非当前线程被中断,否则获取写锁定。
* <p>
* 如果读取和写入锁定均未由另一个线程持有,则获取写入锁定并立即返回,将写入锁定保持计数设置为1。
* <p>
* 如果当前线程已经持有此锁,则持有计数将增加一,该方法将立即返回。
* <p>
* 如果锁是由另一个线程持有的,则出于线程调度的目的,当前线程将被禁用,
* 并处于休眠状态,直到发生以下两种情况之一:
* 1 写锁由当前线程获取;
* 2 其他一些线程中断当前线程。
* <p>
* 如果当前线程获取了写锁,则将锁保持计数设置为1。
* <p>
* 如果当前线程:
* 1 在进入此方法时已设置其中断状态;
* 2 获取写锁时被中断,
* <p>
* 然后抛出InterruptedException并清除当前线程的中断状态。
* <p>
* 在此实现中,由于此方法是显式的中断点,因此优先于对中断的响应而不是正常或可重入的锁获取。
*
* @throws InterruptedException if the current thread is interrupted
*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

/**
* 仅在调用时没有其他线程持有写锁时才获取写锁。
* 如果读取和写入锁均未由另一个线程持有,则获取写入锁,并立即返回true值,
* 将写入锁的保持计数设置为1。 即使已将此锁设置为使用公平的排序策略,
* 对tryLock()的调用也会立即获取该锁(如果可用),无论其他线程当前是否在等待写锁。
* 即使破坏公平性,这种“讨价还价”的行为在某些情况下还是有用的。
* 如果要遵守此锁的公平性设置,请使用几乎等效的tryLock(0,TimeUnit.SECONDS)(它还会检测到中断)。
* <p>
* 如果当前线程已经持有此锁,则持有计数将增加一,并且该方法返回true。
* <p>
* 如果锁由另一个线程持有,则此方法将立即返回false值。
*/
public boolean tryLock() {
return sync.tryWriteLock();
}

/**
* 如果在给定的等待时间内另一个线程未持有该写锁,并且当前线程尚未中断,则获取该写锁。
* <p>
* 如果读取和写入锁均未由另一个线程持有,则获取写入锁,并立即返回true值,
* 将写入锁的保持计数设置为1。 如果已将此锁定设置为使用公平的排序策略,
* 则如果任何其他线程正在等待写锁定,则不会获取可用的锁定。
* 这与tryLock()方法相反。 如果您想要定时的tryLock确实允许对公平锁进行插入,则将定时和非定时形式组合在一起:
* <p>
* if (lock.tryLock() ||
* lock.tryLock(timeout, unit)) {
* ...
* }
* <p>
* 如果当前线程已经持有此锁,则持有计数将增加一,并且该方法返回true。
* <p>
* 如果锁是由另一个线程持有的,则出于线程调度目的,当前线程将被禁用,并处于休眠状态,直到发生以下三种情况之一:
* 1 写锁由当前线程获取
* 2 其他一些线程中断当前线程
* 3 经过指定的等待时间
* <p>
* 如果获取了写锁,则返回值true,并且将写锁保持计数设置为1。
* <p>
* 如果当前线程:
* 1 在进入此方法时已设置其中断状态
* 2 获取写锁时被中断,
* <p>
* 然后抛出InterruptedException并清除当前线程的中断状态。
* <p>
* 如果经过了指定的等待时间,则返回值false。如果时间小于或等于零,则该方法将根本不等待。
* <p>
* 在此实现中,由于此方法是显式的中断点,因此优先于对中断的响应而不是正常或可重入的锁定获取,而是优先报告等待时间的流逝。
*
* @throws InterruptedException if the current thread is interrupted
* @throws NullPointerException if the time unit is null
*/
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

/**
* 如果当前线程是此锁的持有者,则保留计数将减少。
* 如果保持计数现在为零,则释放锁定。
* 如果当前线程不是此锁的持有者,则抛出IllegalMonitorStateException。
*
* @throws IllegalMonitorStateException if the current thread does not
* hold this lock
*/
public void unlock() {
sync.release(1);
}

/**
* 返回用于此Lock实例的Condition实例。
* <p>
* 当与内置监视器锁定一起使用时,返回的Condition实例支持与Object监视器方法(wait,notify和notifyAll)相同的用法。
* <p>
* 1 如果在调用任何Condition方法时未保留此写锁定,则将抛出IllegalMonitorStateException。
* (读锁独立于写锁而持有,因此不会被检查或受影响。但是,当当前线程也已获取读锁时
* ,调用条件等待方法本质上总是错误,因为其他可能解除阻塞它的线程不会被调用。能够获取写锁。)
* <p>
* 2 当条件等待方法被调用时,写锁被释放,并且在返回之前,将重新获取写锁,并将锁保持计数恢复为调用该方法时的状态。
* <p>
* 3 如果线程在等待时被中断,则等待将终止,将抛出InterruptedException,并清除线程的中断状态,并以FIFO顺序发出等待线程的信号。
* <p>
* 4 从等待方法返回的线程的锁重新获取顺序与最初获取锁的线程的顺序相同(默认情况下未指定),但对于公平锁,优先使用等待时间最长的线程。
*
* @return the Condition object
*/
public Condition newCondition() {
return sync.newCondition();
}

/**
* Returns a string identifying this lock, as well as its lock
* state. The state, in brackets includes either the String
* {@code "Unlocked"} or the String {@code "Locked by"}
* followed by the {@linkplain Thread#getName name} of the owning thread.
*
* @return a string identifying this lock, as well as its lock state
*/
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}

/**
* Queries if this write lock is held by the current thread.
* Identical in effect to {@link
* ReentrantReadWriteLock#isWriteLockedByCurrentThread}.
*
* @return {@code true} if the current thread holds this lock and
* {@code false} otherwise
* @since 1.6
*/
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}

/**
* Queries the number of holds on this write lock by the current
* thread. A thread has a hold on a lock for each lock action
* that is not matched by an unlock action. Identical in effect
* to {@link ReentrantReadWriteLock#getWriteHoldCount}.
*
* @return the number of holds on this lock by the current thread,
* or zero if this lock is not held by the current thread
* @since 1.6
*/
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}

与读锁相比,可以看出,写锁比读锁多了两个方法:

//查询当前线程对该写锁的保留数
int getHoldCount()

//查询当前线程是否拥有此写锁定。
boolean isHeldByCurrentThread()