之前的文章中会涉及到了相关AQS的原理和相关源码的分析,所谓实践是检验真理的唯一标准!接下来就让我们活化一下AQS技术,主要针对于自己动手实现一个AQS同步器。
Doug Lea大神在JDK1.5编写了一个Lock接口,里面定义了实现一个锁的基本方法,我们只需编写一个MyLock类实现这个接口就好。
class MyLock implements Lock {
/**
* 加锁。如果不成功则进入等待队列
*/
@Override
public void lock() {}
/**
* 加锁(可被interrupt)
*/
@Override
public void lockInterruptibly() throws InterruptedException {}
/**
* 尝试加锁
*/
@Override
public boolean tryLock() {}
/**
* 加锁 带超时的
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {}
/**
* 释放锁
*/
@Override
public void unlock() {}
/**
* 返回一个条件变量(不在本案例谈论)
*/
@Override
public Condition newCondition() {}
}
复制代码
定义好MyLock后,接下来就是实现各个方法的逻辑,达到真正的用于线程间sync互斥的需求。
接下来我们需要自定义一个继承自AQS的MySync。实现自定义的MySync前,先了解AQS内部的一些基本概念。在AQS中主要的一些成员属性如下:
知道这些概念后我们就可以自定义一个AQS:
public final class MySync extends AbstractQueuedSynchronizer {
/**
* 尝试加锁
*/
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
// 修改state状态成功后设置当前线程为占有锁资源线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 释放锁
*/
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
// state有volatile修饰,为了保证解锁后其他的一些变量对其他线程可见,把setExclusiveOwnerThread(null)放到上面 hAppens-before中定义的 volatile规则
setState(0);
return true;
}
/**
* 判断是否是独占锁
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
}
复制代码
最后一步就是将第一步中的所有方法逻辑完成
class MyLock implements Lock {
// 组合自定义sync器
private MySync sync = new MySync();
/**
* 加锁。如果不成功则进入等待队列
*/
public void lock() {
sync.acquire(1);
}
/**
* 加锁(可被interrupt)
*/
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
* 尝试加锁
*/
public boolean tryLock() {
return sync.tryAcquire(1);
}
/**
* 加锁 带超时的
*/
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toMillis(time));
}
/**
* 释放锁
*/
public void unlock() {
sync.release(0);
}
/**
* 返回一个条件变量(不在本案例谈论)
*/
@Override
public Condition newCondition() {
return null;
}
}
复制代码
完成整个MyLock的逻辑后,发现在lock()、unlock()中调用的自定义sync的方法tryAcquire()和tryRelease()方法。我们就以在lock()方法中调用acquire()方法说明模板设计模式在AQS中的应用。
点进.acquire()方法后,发现该该方法是来自
AbstractQueuedSynchronizer中:
因此整个自定义加锁的流程如下:
AQS作为JAVA并发体系下的关键类,在各种并发工具中都有它的身影,如ReentrantLock、Semaphore等。这些并发工具用于控制sync互斥的手段都是采用AQS,外加Cas机制。AQS采用了模板方法设计模式让子类们自定义sync互斥的条件,比如本案例中MySync类重写了tryAcquire方法。
下面实现一个自定义的sync:
public class SelfSynchronizer {
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public boolean unLock() {
return sync.release(1);
}
static class Sync extends AbstractQueuedSynchronizer {
//是否处于占用状态
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
/**
* 获取sync资源
* @param acquires
* @return
*/
@Override
public boolean tryAcquire(int acquires) {
if(compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
//这里没有考虑可重入锁
/*else if (Thread.currentThread() == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}*/
return false;
}
/**
* 释放sync资源
* @param releases
* @return
*/
@Override
protected boolean tryRelease(int releases) {
int c = getState() - releases;
boolean free = false;
if (c == 0) {
free = true;
}
setState(c);
return free;
}
}
}
复制代码
ReentrantLock源码和上面自定义的sync很相似,测试下该sync,i++在多线程下执行情况:
public class TestSelfSynchronizer {
private static int a = 0;
private static int b = 0;
private static SelfSynchronizer selfSynchronizer = new SelfSynchronizer();
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 50, 1, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
private static ExecutorService ec = Executors.newFixedThreadPool(20);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 20 ; i++) {
executor.submit(new Task());
}
for (int j = 0; j < 20 ; j++) {
ec.submit(new TaskSync());
}
Thread.sleep(10000);
System.out.println("a的值:"+ a);
System.out.println("b的值" + b);
executor.shutdown();
ec.shutdown();
}
static class Task implements Runnable {
@Override
public void run() {
for(int i=0;i<10000;i++) {
a++;
}
}
}
static class TaskSync implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
//使用sync器加锁
selfSynchronizer.lock();
b++;
selfSynchronizer.unLock();
}
}
}
}
复制代码
开启两个线程池,对int型变量自增10000次,如果不加sync器,最后值小于200000,使用了自定义sync器则最后值正常等于200000,这是因为每次自增操作加锁
作者:李浩宇Alex
链接:
https://juejin.cn/post/6989937347429302280
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。