不同的进程需要以互斥的方式来访问共享资源,这里实现互斥就是分布式锁。
简单来说就是:同一时间只有一个客户端对共享资源操作。举个实际例子,抢购茅台,如果不加锁就会发生超卖的事故。
Redis 加锁主要是使用 set (
https://redis.io/commands/set ) 命令操作:
SET key value [EX seconds|PX milliseconds|KEEPTTL][NX|XX] [GET]
加锁命令: SET lock_key lock_value PX 10000 NX
只有当 lock_key 不存在时才会设置 lock_key 和 lock_value,超时时间 10000 毫秒,设置成功返回 OK:
当 lock_key 存在时返回 nil:
Redis 释放锁使用命令: DEL key (
https://redis.io/commands/del )
解锁命令: DEL lock_key 。
Redis 在 2.6.12 之后的版本才加入 [EX seconds|PX milliseconds] [NX|XX] 这些参数
在此之前使用 SETNX (
https://redis.io/commands/setnx ) SETNX 是 “ SET if N ot e X ists” 的缩写。
SETNX 返回 1 说明设置成功, 返回 0 说明设置失败。
SETNX 和 EXPIRE 操作之间不是原子性的,如果 SETNX 执行成功之后, 没有执行 EXPIRE 命令,就可能会发生死锁。
Redis 官网声明 SETNX 在将来的版本中可能会被弃用,因为 SETNX 实现的功能 set 都能实现。
设置锁和设置锁的超时时间要保持原子性,这点很容易做到 使用 SET lock_key lock_value PX 10000 NX 命令即可, 不要使用 SETNX lock_key lock_value , EXPIRE lock_key 10 这些命令,因为他们之间不是原子性的,有发生死锁的风险。
锁的超时时间要大于程序执行的时间,否则多个客户端可能同时获取锁。充分预估使用锁的业务代码执行时间,该时间不宜过长也不宜过短,过短,可能使锁发生错误;过长,客户端异常时可能会影响执行效率。
客户端使用完共享资源之后要及时的释放锁,即使在程序发生异常,JAVA 中一般都是在 finally 里释放锁。
在释放锁的时要确保这个锁是自己的,不能将其他锁释放掉,这样可能导致多个客户端同时获取锁。可以通过判断 lock_value 的值是否相等来判断是否是自己加的锁,lock_value 的值可以使用 UUID 或者任意确定唯一的值。
客户端在释放锁时分两个步骤,一要比较锁的值是否相等,二要删除锁( DEL key ),这两个步骤要保证原子性,否则的话可能导致将其他锁释放掉,画个图解释下:
这也好解决,Redis 提供了 EVAL (
https://redis.io/commands/eval ) 命令去解析 Lua 脚本,可以发一段 Lua 脚本给 Redis 执行:
if redis.call("get",KEYS[1]) == ARGV[1] -- 判断锁的值是否相等。 KEYS[1], ARGV[1],是指传入的参数,以上面为例,KEYS[1] 指的是 lock_order,ARGV[1] 指的是 123uD,
then
return redis.call("del",KEYS[1]) -- 删除这个 key,返回删除 key 的个数
else
return 0 -- 锁值不相等返回 0
end
复制代码
这样就可以保证原子执行了。
基于 Redisson 客户端实现 Redis 分布式锁:
/**
* 加锁利用 set(key, value, "PX", "NX") 函数实现
* 解锁利用 Lua 脚本实现
* <p>
* Created by jie.li on 2021/1/4 7:50 下午
*/
@Component
public class RedisLock1 {
@Resource
private RedissonClient redissonClient;
/**
* 尝试加锁
*
* @param name lock name
* @param value lock value
* @return true 加锁成功, false 加锁失败
*/
public boolean tryLock(String name, String value) {
RBucket<Object> bucket = redissonClient.getBucket(name);
// 执行的是 set(key, value, "PX", "NX") 命令
return bucket.trySet(value, 10000, TimeUnit.MILLISECONDS);
}
/**
* 解锁
*
* @param name lock name
* @param value lock value
*/
public void unLock(String name, String value) {
redissonClient.getScript().eval(RScript.Mode.READ_WRITE, DEL_LOCK_SCRIPT, RScript.ReturnType.INTEGER, Collections.singletonList(name), value);
}
// 解锁脚本
private static final String DEL_LOCK_SCRIPT =
"if redis.call("get",KEYS[1]) == ARGV[1] then" + // 如果 KEYS[1] 对应的 Value 值等于 ARGV[1]
" return redis.call("del",KEYS[1])" + // 删除 KEYS[1]
" else" + // 否则
" return 0" + // 返回 0
" end;";
}
复制代码
测试代码:
/**
* 测试手动加锁解锁
* <p>
* Created by jie.li on 2021/1/7 2:54 下午
*/
@Service
public class RedisLockTestService {
@Resource
private RedisLock1 redisLock1;
private int i = 50;
/**
* 测试手动实现 redis 分布式锁
*
* @return int
*/
public int biz() {
String lockName = "redis:lock:1";
String lockValue = UUID.randomUUID().toString();
try {
boolean b = redisLock1.tryLock(lockName, lockValue);
if (b) {
if (i > 0) {
i--;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
redisLock1.unLock(lockName, lockValue);
}
return i;
}
}
复制代码
@Test
public void testBiz() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(200);
for (int i = 0; i < 200; i++) {
new Thread(() -> {
try {
countDownLatch.awAIt();
} catch (InterruptedException e) {
e.printStackTrace();
}
int i1 = redisLockTestService.biz();
System.out.println(Thread.currentThread().getName() + " -> " + i1);
}, "Thread" + i).start();
countDownLatch.countDown();
}
TimeUnit.SECONDS.sleep(5);
}
复制代码
Redisson 实现的分布式锁相对于我们自己实现的锁更加完善,主要有以下两点:
1、可重入
2、锁重试
3、锁自动延期(看门狗机制)
Redisson 锁的依赖图:
Redisson 实现了很多种类型的锁,所有的锁都实现了 JUC 中的 Lock 接口,并且做了扩展( RLock ), 所以使用方法和使用 ReentrantLock 差不多。这里我们只针对 RedissonLock 进行讲解。
// waitTime 等待获取锁的时间
// leaseTime 锁的有效期
// unit 使用的时间单位
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 1、尝试加锁,如果当前有锁,返回锁的剩余时间ttl,否则返回空
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
// 2、加锁成功,返回true
if (ttl == null) {
return true;
}
// 剩余的等待时间 waitTime
time -= System.currentTimeMillis() - current;
// 剩余等待时间已过
if (time <= 0) {
// 获取锁失败
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
// 3、订阅锁释放事件。利用semaphore(信号量),订阅(Redis 发布订阅)锁的释放事件,
// 锁释放后立即通知等待的线程竞争获取锁。
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 4、线程阻塞
// - 返回 false: 阻塞时间已经超过了剩余等待时间(waitTime),取消订阅事件,加锁失败
// - 返回 ture: 继续尝试加锁
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
// 剩余等待时间已过,加锁失败
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 5、继续以同样的方式获加锁,如果过了最大的等待加锁时间,则加锁失败,返回false
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
// 6、通过信号量(共享锁)阻塞,等待释放锁消息
// 锁剩余时间小于剩余的waitTime时间
if (ttl >= 0 && ttl < time) {
// 非阻塞的获取结果,获得信号量,在给定的时间内从信号量获取一个许可。
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 7、剩余的waitTime
time -= System.currentTimeMillis() - currentTime;
// 加锁最大等待时间已过,加锁失败,返回false
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 取消订阅事件
unsubscribe(subscribeFuture, threadId);
}
}
复制代码
tryLock 方法参数说明:
尝试加锁方法 tryLock ,两个重要的入参 waitTime、leaseTime:
流程总结:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " + // 1、如果 Redis 中不存在这个 key
"redis.call('hset', KEYS[1], ARGV[2], 1); " + // 2、设置 key 和 field, 并将 value 的值设置为 1
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 3、设置 key 的过期时间
"return nil; " + // 4、返回 null
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 5、如果 Redis 中存在对应的 key 和 field
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 6、则将对应的 key 和 field 对应的 value 自增 1
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 7、设置 key 的过期时间
"return nil; " + // 8、返回 null
"end; " +
"return redis.call('pttl', KEYS[1]);", // 9、返回剩余生存时间, 单位毫秒
// 以下这三个参数分别对应 Lua 脚本中的 KEYS[1], ARGV[1], ARGV[2]
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
复制代码
Redisson 中实际加锁的代码,流程总结:
在不指定锁超时时间(leaseTime)的情况下,Redisson 分布式锁会自动给锁续期,也就是所谓的看门狗机制。
锁自动续期代码解析:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 如果指定了锁的有效期,则直接返回加锁结果,不会走后面的 Watch Dog 机制
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 实际加锁
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 加锁执行完成后
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
// 加锁执行有异常,直接返回
if (e != null) {
return;
}
// lock acquired
// 获取到锁
if (ttlRemaining == null) {
// 自动续期(watch dog)
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
复制代码
private void scheduleExpirationRenewal(long threadId) {
// ExpirationEntry 维护锁的线程重入计数器和续期任务
ExpirationEntry entry = new ExpirationEntry();
// 将 entry 放入 ConcurrentHashMap
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
// 锁重入,当前线程计数器+1
oldEntry.addThreadId(threadId);
} else {
// 第一次,当前线程计数器+1
entry.addThreadId(threadId);
// 第一次触发锁续期
renewExpiration();
}
}
复制代码
private void renewExpiration() {
// 在 ConcurrentHashMap 中拿到 ExpirationEntry 对象
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 新建一个定时任务,自动续期的主要实现
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
// 获取第一个线程Id
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 异步续期
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
// 续期异常,打印错误日志,并且清除Map,不再执行续期。
log.error("Can't update lock " + getName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// 续期成功后,递归调用,继续调用达到持续续期目的
if (res) {
// reschedule itself
renewExpiration();
}
});
}
// 延迟执行时间为 internalLockLeaseTime / 3,internalLockLeaseTime 默认时间是 30s,也可以自定义指定。
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
复制代码
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 如果存在指定的 key 和 filed
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 续期
"return 1; " + // 返回续期成功
"end; " +
"return 0;", // 返回续期失败
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
复制代码
// 看门狗超时时间默为 30s, 自定义的话可以修改 lockWatchdogTimeout 配置
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
private long lockWatchdogTimeout = 30 * 1000;
复制代码
锁自动续期总结:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 如果锁不存在
"return nil;" + // 解锁失败
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 否则将锁的重入计数器-1
"if (counter > 0) then " + // 如果重入计数器>0
"redis.call('pexpire', KEYS[1], ARGV[2]); " + // 将锁续期 30s
"return 0; " + // 返回成功
"else " +
"redis.call('del', KEYS[1]); " + // 否则删除锁
"redis.call('publish', KEYS[2], ARGV[1]); " + // 发布解锁消息
"return 1; " + // 返回解锁成功
"end; " +
"return nil;", // 解锁失败
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
复制代码
流程总结:
上面讲 Redisson 实现的分布式锁,在单机模式下已经趋近完美了。
但是单点的话故障的话,那就芭比 Q 了,所以我们第一点想到的是部署高可用集群。
目前 Redis 高可用架构主要有主从模式,哨兵模式,集群模式,在这三种模式下使用 Redis 分布式锁存在一个弊端,可能会导致多个客户端同时加锁成功。
客户端 A 加锁成功,由于 Reids 主从同步数据是异步执行的,LockA 锁还没来的及同步到 Slave,此时 Master 节点宕机了。
Slave 节点提升为 Master,客户端 B 来加锁,发现没有其他客户端占用锁,LockB 加锁成功。
这时就导致了两个客户端同时获取了锁。
RedLock 是 Redis 作者提出的一个算法。
Redlock 官网介绍
在该算法的分布式版本中,我们假设有 N 个 Redis masters。这些节点是完全独立的,所以我们不使用复制或任何其他隐式协调系统。我们已经描述了如何在单个实例中安全地获取和释放锁。我们想当然地认为,算法将使用这种方法在单个实例中获取和释放锁。在我们的示例中,我们设置了 N=5,这是一个合理的值,因此我们需要在不同的计算机或虚拟机上运行 5 个 Redis 主机,以确保它们以基本独立的方式失败。
为了获取锁,客户端执行以下操作:
Redis 作者对红锁的介绍非常详细,点击这里查看。
简单总结下:
假设有五个 Redis 实例,这些实例之间是完全独立的,并且部署在不同的计算机上,客户端尝试在这几个实例中获取锁。
如果客户端能够在大多数实例(N/2+1,至少三个)中获取锁,并且获取锁所有的总时间小于锁有效时间,则认为获取锁成功。
如果加锁成功,锁的有效期=初始有效时间-获取锁的总时间,假如锁有效期为 10s,获取锁共花了 2s,那么锁的有效期还剩 8s。
无论客户端获取锁成功还是失败,都需要解锁所有 Redis 实例,以免发生死锁。
使用多个完全独立的 Redis 实例,解决了 Redis 主从异步复制造成的锁丢失问题,同时保障了高可用。
至少 N/2+1 个实例加锁成功,保证锁的互斥性,防止多个客户端同时获取到锁。
表面上看 RedLock 解决 Redis 分布式锁的痛点,但是真的就万无一失了吗?
有人就提出了质疑,Martin Kleppmann: How to do distributed locking
Martin Kleppmann 在效率和正确性方面质疑了红锁,他认为如果是为了效率使用分布式锁,没有必要承担 Redlock 的成本和复杂性,最好还是使用一个 Reids 实例或者主从模式。正确性方面,他认为 Redlock 也绝对保证不了锁的正确性,文章在网络延迟,过程暂停(GC),时钟漂移方面给出了论证。
Redis 作者(Salvatore)也反驳了该质疑:Is Redlock safe?
建议大家读下上面两篇文章。
我个人认为使用 Redlock 要慎重,首先,它的效率比较差,在一些 RT 要求比较高的接口中增加了耗时风险;其次,无法保证绝对的正确性,可能会出现多个客户端同时获取锁的风险(Martin Kleppmann 在他的文章里有举证);再次,成本和复杂性较高。
使用示例:
// 在不同Redis实例上获取 RLock
RLock rLock1 = redisson1.getLock(key);
RLock rLock2 = redisson2.getLock(key);
RLock rLock3 = redisson3.getLock(key);
// 初始化红锁
RedissonRedLock redissonRedLock = new RedissonRedLock(rLock1, rLock2, rLock3);
// 加锁
redissonRedLock.lock();
// 业务逻辑
// 解锁
redissonRedLock.unlock();
复制代码
Redisson 在新版本中已经弃用了 RedissonRedLock,不建议使用。