JAVA从版本5开始,在
java.util.concurrent.locks包内给我们提供了除了synchronized关键字以外的几个新的锁功能的实现,ReentrantLock就是其中的一个。但是这并不意味着我们可以抛弃synchronized关键字了。
在这些锁实现之前,如果我们想实现锁的功能,只能通过synchronized关键字,例子如下:
private static volatile int value;
public static void main() {
Runnable run = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
increaseBySync();
}
}
};
Thread t1 = new Thread(run);
Thread t2 = new Thread(run);
t1.start();
t2.start();
while (Thread.activeCount() > 1) {
Thread.yield();
}
System.out.println(value);
}
private static synchronized int increaseBySync() {
return value++;
}
有了ReentrantLock之后,我们可以这样写:
private static volatile int value;
private static Lock lock = new ReentrantLock();
public static void main(String[] args) {
testIncreaseWithLock();
}
public static void main() {
Runnable run = new Runnable() {
@Override
public void run() {
try {
lock.lock();
for (int i = 0; i < 1000; i++) {
value++;
}
} finally {
lock.unlock();
}
}
};
Thread t1 = new Thread(run);
Thread t2 = new Thread(run);
t1.start();
t2.start();
while (Thread.activeCount() > 1) {
Thread.yield();
}
System.out.println(value);
}
以上两段代码都可以实现value值同步自增1,并且value都能得到正确的值2000。下面我们就来分析下ReentrantLock有哪些功能以及它底层的原理。
从名字我们就可以看出ReentrantLock是可重入锁。可重入锁是指当某个线程已经获得了该锁时,再次调用lock()方法可以再次立即获得该锁。
举个例子,当某个线程在执行methodA()时,假设已经获得了锁,这是当它执行到methodB()时可以立即获得methodB里面的锁,因为两个方法是调用的同一把锁。
private static Lock lock = new ReentrantLock();
public static void methodA(){
try{
lock.lock();
//dosomething
methodB();
}finally{
lock.unlock();
}
}
public static void methodB(){
try{
lock.lock();
//dosomthing
}finally{
lock.unlock();
}
}
synchronized也是可重入锁,有兴趣的同学可以自己写个例子测试下。
通过源码我们可以看到ReentrantLock支持公平锁,并且默认是非公平锁。
//ReentrantLock源码
public ReentrantLock() {
sync = new NonfairSync();
}
//ReentrantLock源码
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
下面是非公平锁和公平锁的lock()方法实现,从两个lock()方法我们可以看到,它们的不同点是在调用非公平锁的lock()方法时,当前线程会尝试去获取锁,如果获取失败了则调用acquire(1)方法入队列等待;而调用公平锁的lock()方法当前线程会直接入队列等待(acquire方法涉及到AQS下面会讲到)。
//ReentrantLock源码,非公平锁
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//ReentrantLock源码,公平锁
final void lock() {
acquire(1);
}
而synchronized是一个非公平锁。
ReentrantLock还提供了超时机制,当调用tryLock()方法,当前线程如果获取锁失败会立刻返回;而当调用带参tryLock()方法是,当前线程如果在设置的timeout时间内未获得锁,也会立刻返回。而这些功能背后主要是依赖AQS实现的。
//ReentrantLock源码
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
//ReentrantLock源码
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
synchronized没有这个功能。
ReentrantLock有一个lockInterruptibly()方法,它会最终调用AQS的两个方法:
AQS方法一中如果当前线程被中断则抛出InterruptedException,否则尝试去获取锁,获取成功则返回,获取失败则调用aqs方法二doAcquireInterruptibly()。
AQS方法二中在for循环线程自旋中也会判断当前线程是否被标记为中断,如果是也会抛出InterruptedException。
doAcquireInterruptibly()的细节我们在下面讲解AQS的时候会重点介绍,它和doAcquire()方法很类似,唯一区别是会抛出InterruptedException。
//lock方法
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//aqs方法一
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
//aqs方法二
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
AQS是一个基于队列的同步器,它是一个抽象类,主要提供了多线程获取锁时候的排队等待和激活机制,ReentrantLock内部有两个基于AQS实现的子类,分别针对公平锁和非公平锁做了支持。
下面我们以公平锁为例,讲解下ReentrantLock是如何依赖AQS实现其功能的。获得锁涉及到的主要源代码和解释如下:
//AQS源码,公平锁的lock()方法会直接调用该方法
//这里当前如果获取失败会调用acquireQueued方法
//addWaiter方法主要是将当前线程加入AQS内部队列的尾部
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//ReentrantLock中实现公平锁的AQS子类的方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//c == 0表示当前AQS为初始状态,可以尝试获取锁, 如获取成功则返回true
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 只有当前线程是已经获取了该锁的线程才能再次获取锁(可重入锁)并返回true
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//返回false获取失败
return false;
}
//AQS源码
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//这里如果当前线程对应的队列里的Node的前置Node是head,则尝试获取锁,并成功返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//shouldParkAfterFailedAcquire方法标记当前线程Node的前置Node的waitStatus为SIGNAL,意思是当你从队列移除时记得要唤醒我哦
//parkAndCheckInterrupt方法会让当前线程挂起,停止自旋,免得白白浪费CPU资源
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
假设现在有线程A、B、C同时去获取同一把锁,大概的流程是这样的,这里假设线程A获的锁并成功返回,线程B和C则依次调用上面的方法,进入AQS的等待队列并最终被挂起。
这时线程A做完自己该做的事情了,它在finally块中调用了unlock方法,这时我们再看下相关的源代码。
//AQS源码,当前线程在释放锁的同时,会判断它所在Node的waitStatus有没有被它的后继结点标记,如果被标记了,那就唤醒后继结点对应的线程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//AQS源码,主要关注最下面LockSupport.unpark(s.thread),唤醒后继结点线程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or Apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
线程A释放锁时,会唤醒head结点的后继结点也就是线程B,此时线程B就可以继续for循环里面自旋并成功获得锁了。
之前介绍AtomicInteger的时候提到Unsafe对象,AtomicInteger用到了Unsafe对象的CAS功能(底层是cpu提供的功能)。
ReentrantLock除了用到了CAS之外,还用到了Unsafe的pack和unpack两个方法(LockSupport当中),除了性能更好之外,它可以精确的唤醒某一个线程。