锁的公平是相对于获取锁的顺序而言的,如果是一个公平锁,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。在上面分析的例子来说,只要CAS设置同步状态成功,则表示当前线程获取了锁,而公平锁则不一样,差异点有两个
FairSync.tryAcquire
final void lock() {
acquire(1);
}
复制代码
非公平锁在获取锁的时候,会先通过CAS进行抢占,则公平锁不会
FairSync.tryAcquire
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
复制代码
这个方法与 nonfairTryAcquire(int acquires)比较,不同的地方在于判断条件多了 hasQueuedPredecessors()方法,也就是加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回 true,则表示有线程比当前线程更早地请求获取锁, 因此需要等待前驱线程获取并释放锁之后才能继续获取锁。
在synchronized中,有wait/notify可以实现线程的通信。那么J.U.C里面提供了锁的实现机制,肯定也提供了线程通信的机制
Condition是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒
public class ConditionDemoWait implements Runnable{
private Lock lock;
private Condition condition;
public ConditionDemoWait(Lock lock , Condition condition){
this.lock = lock;
this.condition = condition;
}
public void run() {
System.out.println("begin -ConditionDemoWait");
try {
lock.lock();
condition.await();
System.out.println("end - ConditionDemoWait");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
复制代码
public class ConditionDemoSignal implements Runnable{
private Lock lock;
private Condition condition;
public ConditionDemoSignal(Lock lock , Condition condition){
this.lock = lock;
this.condition = condition;
}
public void run() {
System.out.println("begin -ConditionDemoSignal");
try {
lock.lock();
condition.signal();
System.out.println("end - ConditionDemoSignal");
}finally {
lock.unlock();
}
}
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(new ConditionDemoWait(lock , condition)).start();
new Thread(new ConditionDemoSignal( lock , condition)).start();
}
}
复制代码
结果:
begin -ConditionDemoWait
begin -ConditionDemoSignal
end - ConditionDemoSignal
end - ConditionDemoWait
复制代码
通过这个案例简单实现了wait和notify的功能,当调用await方法后,当前线程会释放锁并等待,而其他线程调用condition对象的signal或者signalall方法通知并被阻塞的线程,然后自己执行unlock释放锁,被唤醒的线程获得之前的锁继续执行,最后释放锁
所以,condition中两个最重要的方法,一个是await,一个是signal方法
每一个Condition对象上面,都阻塞了多个线程。因此,在ConditionObject内部也有一个双向链表 组成的队列,如下所示
public class ConditionObject implements Condition, JAVA.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
}
static final class Node {
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
...
复制代码
调用Condition,需要获取lock锁,所以意味着会存在一个AQS同步队列,先来看Condition.await方法
condition.await
调用Condition的await方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await方法返回时,当前线程一定获取了Condition相关联的锁
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();//创建一个新的节点,节点状态为condition,采用数据结构是链表
int savedState = fullyRelease(node);//释放当前的锁,得到锁的状态,并唤醒AQS队列中的一个线程
int interruptMode = 0;
//如果当前节点没有在同步队列中,即还没有被signal,则将当前线程阻塞
while (!isOnSyncQueue(node)) {//判断这个节点是否在AQS队列上,第一次判断的是false,因为前面已经释放锁了
LockSupport.park(this);//第一次总是park自己,开始阻塞等待
//线程判断自己在等待过程中是否被中断了,如果没有中断,则再次循环,会在isOnSyncQueue中判断自己是否在队列上
//isOnSyncQueue判断当前node状态,如果是CONDITION状态,或者不在队列上了,就继续阻塞
//isOnSyncQueue判断当前node还在队列上且不是CONDITION状态了,就结束循环和阻塞
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//当这个线程醒来,会尝试拿锁,当acquireQueued返回false就是拿到锁了
//interruptMode != THROW_IE -> 表示这个线程没有成功将node入队,但signal执行了enq方法让其入队了
//将这个变量设置成REINTERRUPT
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//如果node的下一个等待者不是null,则进行清理,清理Condition队列上的节点
//如果是null,就没有什么好清理的了
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//如果线程被中断了,需要抛出异常,或者什么都不做
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
复制代码
关于await,有几个关键点要说明:
private Node addConditionWaiter() {
// ...
//拿到末端的node
Node t = lastWaiter;
//new一个node
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
复制代码
与await()不同,awaitUninterruptibly()不会响应中断,其方法的定义中不会有中断异常抛出,下面分析实现和await()的区别
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
复制代码
可以看出,整体代码和await()类似,区别在于收到异常后,不会抛出异常,而是继续执行while循环
Condition.signal
调用Condition的signal方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中
public final void signal() {
if (!isHeldExclusively())//先判断当前线程是否获取了锁
throw new IllegalMonitorStateException();
Node first = firstWaiter;//拿到Condition队列上第一个节点
if (first != null)
doSignal(first);
}
复制代码
Condition.doSignal
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)//如果第一个节点的下一个节点是null,那么,最后一个节点也是null
lastWaiter = null;//将next节点设置成null
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
复制代码
final boolean transferForSignal(Node node) {
Node p = enq(node);
int ws = p.waitStatus;
//如果上一个节点的状态被取消了,或者尝试设置上一个节点的状态为SIGNAL失败了(SIGNAL 表示: 他的 next 节点需要停止阻塞)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);//唤醒输入节点上的线程
return true;
}
复制代码
该方法先是CAS修改了节点状态,如果成功,就将这个节点放到AQS队列中,然后唤醒这个节点上的线程。此时,那个节点就会在await方法中苏醒
阻塞:await()方法中,在线程释放锁资源之后,如果节点不在 AQS 等待队 列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁
释放:signal()后,节点会从 condition 队列移动到 AQS 等待队列,则进入 正常锁的获取流程
作者:Five在努力
链接:https://juejin.cn/post/6918557057947795470