您当前的位置:首页 > 电脑百科 > 数据库 > Redis

Redis 分布式锁详解

时间:2022-06-17 09:45:19  来源:  作者:用Java可可

一、什么是分布式锁

不同的进程需要以互斥的方式来访问共享资源,这里实现互斥就是分布式锁。

简单来说就是:同一时间只有一个客户端对共享资源操作。举个实际例子,抢购茅台,如果不加锁就会发生超卖的事故。

Redis 分布式锁详解

 

二、实现分布式锁需要注意的点

  1. 互斥性:在任何时刻,只有一个客户端获得锁。
  2. 无死锁:任何时候都能获取锁,即使客户端崩溃或者或被分区。
  3. 正确性:解铃还须系铃人,客户端 A 加的锁只能由客户端 A 解锁,其他客户端不能解锁。
  4. 容错:只要大部分 redis 节点处于运行状态,客户端就能够获取和释放锁。

三、Redis 分布式锁原理

Redis 加锁主要是使用 set (
https://redis.io/commands/set ) 命令操作:

SET key value [EX seconds|PX milliseconds|KEEPTTL][NX|XX] [GET]

  • EX -- 设置指定的过期时间,以秒为单位。
  • PX -- 设置指定的过期时间,以毫秒为单位。
  • NX -- 仅当该键不存在的时才会设置该键。
  • XX -- 仅当该键存在时才会设置该键。

加锁命令: SET lock_key lock_value PX 10000 NX

只有当 lock_key 不存在时才会设置 lock_key 和 lock_value,超时时间 10000 毫秒,设置成功返回 OK:

Redis 分布式锁详解

 

当 lock_key 存在时返回 nil:

Redis 分布式锁详解

 

Redis 释放锁使用命令: DEL key (
https://redis.io/commands/del )

解锁命令: DEL lock_key 。

Redis 在 2.6.12 之后的版本才加入 [EX seconds|PX milliseconds] [NX|XX] 这些参数

Redis 分布式锁详解

 

在此之前使用 SETNX (
https://redis.io/commands/setnx ) SETNX 是 “ SET if N ot e X ists” 的缩写。

SETNX 返回 1 说明设置成功, 返回 0 说明设置失败。

SETNX 和 EXPIRE 操作之间不是原子性的,如果 SETNX 执行成功之后, 没有执行 EXPIRE 命令,就可能会发生死锁。

Redis 官网声明 SETNX 在将来的版本中可能会被弃用,因为 SETNX 实现的功能 set 都能实现。

Redis 分布式锁详解

 

四、Redis 实现分布式锁注意的点及解决方案

  1. 防死锁

设置锁和设置锁的超时时间要保持原子性,这点很容易做到 使用 SET lock_key lock_value PX 10000 NX 命令即可, 不要使用 SETNX lock_key lock_value , EXPIRE lock_key 10 这些命令,因为他们之间不是原子性的,有发生死锁的风险。

  1. 合理设置锁超时时间

锁的超时时间要大于程序执行的时间,否则多个客户端可能同时获取锁。充分预估使用锁的业务代码执行时间,该时间不宜过长也不宜过短,过短,可能使锁发生错误;过长,客户端异常时可能会影响执行效率。

  1. 释放锁要及时

客户端使用完共享资源之后要及时的释放锁,即使在程序发生异常,JAVA 中一般都是在 finally 里释放锁。

  1. 只能释放自己加的锁

在释放锁的时要确保这个锁是自己的,不能将其他锁释放掉,这样可能导致多个客户端同时获取锁。可以通过判断 lock_value 的值是否相等来判断是否是自己加的锁,lock_value 的值可以使用 UUID 或者任意确定唯一的值。

  1. 释放锁要保证原子性

客户端在释放锁时分两个步骤,一要比较锁的值是否相等,二要删除锁( DEL key ),这两个步骤要保证原子性,否则的话可能导致将其他锁释放掉,画个图解释下:

Redis 分布式锁详解

 

  1. 客户端 A 设置 lock_order 锁成功,锁值为 123uD,超时间为 10000ms。
  2. 客户端 A 业务代码执行完成,释放锁前需要获取 lock_order 锁的值。
  3. 客户端 A 判断锁值是否是 123uD,执行缓慢。
  4. 客户端 A 的锁超时时间已到,Redis 自动移除了锁。
  5. 此时客户端 B 设置锁,lock_order 锁不存在,所以加锁成功。
  6. 客户端 A 判断锁值相等,执行 del 释放锁,此时客户端 A 释放的锁是客户端 B 的而不是自己的,锁出现错误。

这也好解决,Redis 提供了 EVAL (
https://redis.io/commands/eval ) 命令去解析 Lua 脚本,可以发一段 Lua 脚本给 Redis 执行:

if redis.call("get",KEYS[1]) == ARGV[1] -- 判断锁的值是否相等。 KEYS[1], ARGV[1],是指传入的参数,以上面为例,KEYS[1] 指的是 lock_order,ARGV[1] 指的是 123uD, 
then
    return redis.call("del",KEYS[1])    -- 删除这个 key,返回删除 key 的个数
else
    return 0                            -- 锁值不相等返回 0
end

复制代码

这样就可以保证原子执行了。

五、基于Set命令实现 Redis 分布式锁

基于 Redisson 客户端实现 Redis 分布式锁:

/**
 * 加锁利用 set(key, value, "PX", "NX") 函数实现
 * 解锁利用 Lua 脚本实现
 * <p>
 * Created by jie.li on 2021/1/4 7:50 下午
 */
@Component
public class RedisLock1 {


    @Resource
    private RedissonClient redissonClient;


    /**
     * 尝试加锁
     *
     * @param name  lock name
     * @param value lock value
     * @return true 加锁成功, false 加锁失败
     */
    public boolean tryLock(String name, String value) {
        RBucket<Object> bucket = redissonClient.getBucket(name);
        // 执行的是 set(key, value, "PX", "NX") 命令
        return bucket.trySet(value, 10000, TimeUnit.MILLISECONDS);
    }


    /**
     * 解锁
     *
     * @param name  lock name
     * @param value lock value
     */
    public void unLock(String name, String value) {
        redissonClient.getScript().eval(RScript.Mode.READ_WRITE, DEL_LOCK_SCRIPT, RScript.ReturnType.INTEGER, Collections.singletonList(name), value);
    }


    // 解锁脚本
    private static final String DEL_LOCK_SCRIPT =
            "if redis.call("get",KEYS[1]) == ARGV[1] then" +      // 如果 KEYS[1] 对应的 Value 值等于 ARGV[1]
                    " return redis.call("del",KEYS[1])" +         // 删除 KEYS[1]
                    " else" +                                       // 否则
                    " return 0" +                                   // 返回 0
                    " end;";
}

复制代码

测试代码:

/**
 * 测试手动加锁解锁
 * <p>
 * Created by jie.li on 2021/1/7 2:54 下午
 */
@Service
public class RedisLockTestService {


    @Resource
    private RedisLock1 redisLock1;


    private int i = 50;


    /**
     * 测试手动实现 redis 分布式锁
     *
     * @return int
     */
    public int biz() {
        String lockName = "redis:lock:1";
        String lockValue = UUID.randomUUID().toString();
        try {
            boolean b = redisLock1.tryLock(lockName, lockValue);
            if (b) {
                if (i > 0) {
                    i--;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            redisLock1.unLock(lockName, lockValue);
        }
        return i;
    }
}

复制代码

@Test
public void testBiz() throws InterruptedException {
    CountDownLatch countDownLatch = new CountDownLatch(200);
    for (int i = 0; i < 200; i++) {
        new Thread(() -> {
            try {
                countDownLatch.awAIt();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            int i1 = redisLockTestService.biz();
            System.out.println(Thread.currentThread().getName() + " -> " + i1);
        }, "Thread" + i).start();


        countDownLatch.countDown();
    }


    TimeUnit.SECONDS.sleep(5);
}

复制代码

六、Redisson 实现分布式锁

1. Redisson 实现锁简介

Redisson 实现的分布式锁相对于我们自己实现的锁更加完善,主要有以下两点:

1、可重入

2、锁重试

3、锁自动延期(看门狗机制)

Redisson 锁的依赖图:

Redis 分布式锁详解

 

Redisson 实现了很多种类型的锁,所有的锁都实现了 JUC 中的 Lock 接口,并且做了扩展( RLock ), 所以使用方法和使用 ReentrantLock 差不多。这里我们只针对 RedissonLock 进行讲解。

2. Redisson 源码解析

尝试加锁

// waitTime  等待获取锁的时间
// leaseTime 锁的有效期
// unit      使用的时间单位
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    // 1、尝试加锁,如果当前有锁,返回锁的剩余时间ttl,否则返回空
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    // lock acquired
    // 2、加锁成功,返回true
    if (ttl == null) {
        return true;
    }
    // 剩余的等待时间 waitTime
    time -= System.currentTimeMillis() - current;
    // 剩余等待时间已过
    if (time <= 0) {
        // 获取锁失败
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
    
    current = System.currentTimeMillis();
    // 3、订阅锁释放事件。利用semaphore(信号量),订阅(Redis 发布订阅)锁的释放事件,
    // 锁释放后立即通知等待的线程竞争获取锁。 
    RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    // 4、线程阻塞
    // - 返回 false: 阻塞时间已经超过了剩余等待时间(waitTime),取消订阅事件,加锁失败
    // - 返回 ture:  继续尝试加锁
    if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.onComplete((res, e) -> {
                if (e == null) {
                    unsubscribe(subscribeFuture, threadId);
                }
            });
        }
        acquireFailed(waitTime, unit, threadId);
        return false;
    }


    try {
        time -= System.currentTimeMillis() - current;
        // 剩余等待时间已过,加锁失败
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        // 5、继续以同样的方式获加锁,如果过了最大的等待加锁时间,则加锁失败,返回false
        while (true) {
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }


            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }


            // waiting for message
            currentTime = System.currentTimeMillis();
            // 6、通过信号量(共享锁)阻塞,等待释放锁消息
            // 锁剩余时间小于剩余的waitTime时间
            if (ttl >= 0 && ttl < time) {
                // 非阻塞的获取结果,获得信号量,在给定的时间内从信号量获取一个许可。
                subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }
            // 7、剩余的waitTime
            time -= System.currentTimeMillis() - currentTime;
            // 加锁最大等待时间已过,加锁失败,返回false
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        }
    } finally {
        // 取消订阅事件
        unsubscribe(subscribeFuture, threadId);
    }
}

复制代码

tryLock 方法参数说明:

尝试加锁方法 tryLock ,两个重要的入参 waitTime、leaseTime:

  • waitTime: 尝试加锁的最大时间,如果在这个时间内一直没有加锁成功,则返回 false。
  • leaseTime: 锁的有效期,如果客户端(进程)在这个时间内没有释放锁,则 Redis 主动释放,当然 Redisson 看门狗的机制会将这个时间延期,后面会说到。

流程总结:

  1. 尝试加锁 tryAcquire ,如果加锁成功则返回 null, 如果锁被占用,则返回锁的剩余时间 ttl。
  2. 如果加锁成功返回 true,否在判断 waitTime 是否过期,过期则加锁失败返回 false。
  3. 基于信号量,通过 Redis 的发布订阅,订阅锁的释放事件,一旦锁释放会立即通知等待的线程去竞争锁。
  4. 线程阻塞剩余 waitTime 时间,来等待锁释放的通知,如果阻塞时间超过了剩余 waitTime 时间,则取消任务,取消任务成功再取消订阅信息,加锁失败返回 false;否则在剩余 waitTime 时间内等到了锁释放通知,则进入循环加锁阶段。
  5. 循环中继续以同样的方式加锁,如果在剩余 waitTime 内加锁成功返回 true,否在加锁失败返回 false。
  6. 如果在剩余 waitTime 时间内,锁还是被其他的客户端(进程)持有,阻塞指定时间(持有锁的剩余过期时间和剩余 waitTime 时间)等待锁的释放消息。
  7. 具体实现:利用信号量(semaphore)阻塞当前线程获取许可,如果有可用许可则继续尝试加锁,如果没有可用许可则阻塞给定的时间,直至其他线程释放锁,调用 release() 方法增加许可,或者其它某些线程中断当前线程,或者已超出指定的等待时间。
  8. 如果剩余 waitTime 过期,加锁失败返回 false。

加锁

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
      internalLockLeaseTime = unit.toMillis(leaseTime);


      return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +  // 1、如果 Redis 中不存在这个 key
                "redis.call('hset', KEYS[1], ARGV[2], 1); " +  // 2、设置 key 和 field, 并将 value 的值设置为 1
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +  // 3、设置 key 的过期时间
                "return nil; " +  // 4、返回 null
            "end; " +
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +  // 5、如果 Redis 中存在对应的 key 和 field 
                "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +  // 6、则将对应的 key 和 field 对应的 value 自增 1
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +  // 7、设置 key 的过期时间
                "return nil; " +  // 8、返回 null
            "end; " +
            "return redis.call('pttl', KEYS[1]);",  // 9、返回剩余生存时间, 单位毫秒
              // 以下这三个参数分别对应 Lua 脚本中的 KEYS[1], ARGV[1], ARGV[2]
              Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

复制代码

Redisson 中实际加锁的代码,流程总结:

  1. 如果 Redis 中不存在 key。
  2. 则使用 hset 这个命令设置 key 和 field,并将 hash 的 value 设置为 1,这里使用 Redis 中的 hash 数据结构, value 的值用于支持可重入锁,记录加锁次数。
  3. 设置 key 的过期时间。
  4. 加锁成功返回 null。
  5. 如果 Redis 中存在对应的 key 和 field。
  6. 将对应的 key 和 field 对应的 value 值自增 1,记录重入锁的次数。
  7. 设置 key 的过期时间。
  8. 加锁成功返回 null。
  9. 加锁失败,返回 key 的剩余生存时间(单位毫秒)。

锁自动续期(Watch Dog 机制)

在不指定锁超时时间(leaseTime)的情况下,Redisson 分布式锁会自动给锁续期,也就是所谓的看门狗机制。

锁自动续期代码解析:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    // 如果指定了锁的有效期,则直接返回加锁结果,不会走后面的 Watch Dog 机制
    if (leaseTime != -1) {
        return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 实际加锁
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    // 加锁执行完成后
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        // 加锁执行有异常,直接返回
        if (e != null) {
            return;
        }


        // lock acquired
        // 获取到锁
        if (ttlRemaining == null) {
            // 自动续期(watch dog)
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

复制代码

private void scheduleExpirationRenewal(long threadId) {
    // ExpirationEntry 维护锁的线程重入计数器和续期任务
    ExpirationEntry entry = new ExpirationEntry();
    // 将 entry 放入 ConcurrentHashMap
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
      // 锁重入,当前线程计数器+1
      oldEntry.addThreadId(threadId);
    } else {
      // 第一次,当前线程计数器+1
      entry.addThreadId(threadId);
      // 第一次触发锁续期
      renewExpiration();
    }
}

复制代码

 private void renewExpiration() {
     // 在 ConcurrentHashMap 中拿到 ExpirationEntry 对象
     ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
     if (ee == null) {
       return;
     }
     // 新建一个定时任务,自动续期的主要实现
     Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
       @Override
       public void run(Timeout timeout) throws Exception {
         ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
         if (ent == null) {
           return;
         }
         // 获取第一个线程Id
         Long threadId = ent.getFirstThreadId();
         if (threadId == null) {
           return;
         }
         // 异步续期
         RFuture<Boolean> future = renewExpirationAsync(threadId);
         future.onComplete((res, e) -> {
           if (e != null) {
             // 续期异常,打印错误日志,并且清除Map,不再执行续期。
             log.error("Can't update lock " + getName() + " expiration", e);
             EXPIRATION_RENEWAL_MAP.remove(getEntryName());
             return;
           }
           // 续期成功后,递归调用,继续调用达到持续续期目的
           if (res) {
             // reschedule itself
             renewExpiration();
           }
         });
       }
     // 延迟执行时间为 internalLockLeaseTime / 3,internalLockLeaseTime 默认时间是 30s,也可以自定义指定。
     }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);


     ee.setTimeout(task);
 }

复制代码

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
               "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +  // 如果存在指定的 key 和 filed 
               "redis.call('pexpire', KEYS[1], ARGV[1]); " +                // 续期
               "return 1; " +                                               // 返回续期成功
               "end; " +
               "return 0;",                                                 // 返回续期失败
                Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

复制代码

// 看门狗超时时间默为 30s, 自定义的话可以修改 lockWatchdogTimeout 配置
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();


private long lockWatchdogTimeout = 30 * 1000;

复制代码

锁自动续期总结:

  1. 在没有指定锁超时时间(leaseTime)的情况下,加锁成功后就会执行自动续期。
  2. 如果当前线程持有的是重入锁,则对锁重入次数+1,如果是首次加锁,除了锁次数+1 还需要执行锁续期。这里需要清楚是只有首次加锁才会续期,重入锁不会执行续期操作。将锁对应的线程 Id 及重入次数放入对象 ExpirationEntry 中, ExpirationEntry 对像使用 LinkedHashMap 维护了锁的线程 Id 和重入计数器。然后将 ExpirationEntry 对象放 EXPIRATION_RENEWAL_MAP (ConcurrentHashMap), EXPIRATION_RENEWAL_MAP 中存放着所有需要续期的锁。
  3. 新建一个延迟任务,10s(默认)之后执行,在 EXPIRATION_RENEWAL_MAP 中取出 ExpirationEntry 对象,拿到第一个线程 Id,然后执行 Lua 脚本,检查线程 Id 对应的 key 和 filed 是否存在(锁),如果存在则重置锁的超时时间为 30s(默认),如果不存在则说明已经解锁了不需要续期。
  4. 续期成功后,继续递归调用步骤 3,保证持续锁续期,续期失败则说明锁已经不存在了,停止续期。

当服务宕机时,看门狗的线程也就不存在了,此时也就不会对锁进行自动续期,到了 30s 锁就会自动过期,其他线程就可以获取锁了,不会造成死锁。

解锁

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +                  // 如果锁不存在
                    "return nil;" +                                                      // 解锁失败
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +    // 否则将锁的重入计数器-1
                    "if (counter > 0) then " +                                           // 如果重入计数器>0
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +                        // 将锁续期 30s
                    "return 0; " +                                                       // 返回成功
                    "else " + 
                    "redis.call('del', KEYS[1]); " +                                     // 否则删除锁    
                    "redis.call('publish', KEYS[2], ARGV[1]); " +                        // 发布解锁消息
                    "return 1; " +                                                       // 返回解锁成功
                    "end; " +
                    "return nil;",                                                       // 解锁失败
            Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

复制代码

流程总结:

  1. 判断锁是否存在,如果锁不存在直接返回 null。
  2. 重入计数器减一(因为支持重入锁的缘故,这里不能直接将锁删除)。
  3. 如果重入计数器还是大于零,说明线程还是持有锁的,将锁续期 30s,返回成功。
  4. 否则删除锁,并且发送删除锁的消息(channelName:redisson_lock__channel:{锁 key 值}),以通知阻塞队列中的线程尝试加锁。
  5. 返回解锁成功。

总结

  • RedissonLock 实现了锁等待(waitTime),锁重入,锁自动续期等复杂功能。
  • RedissonLock 实现的分布式锁使用的是 Hash 数据结构,其中 Hash key 是我们指定锁的 key 值, filed 是 UUID:threaId,value 是重入锁次数。其中 UUID 是 Redisson 客户端连接管理器实例初始化生成的 UUID。使用 Hash 数据结构,是实现锁重入的关键。
  • RedissonLock 加锁,解锁,看门狗都是用了 Lua 脚本,保证命令执行的原子性。
  • RedissonLock 实现锁等待时间(waitTime)不是使用的 while(true) 手段,而是使用的 Redis 发布订阅,semaphore(信号量)实现的,解决了无效锁申请造成的系统资源浪费问题。
  • 具体实现是使用 semaphore 进行带期限的阻塞线程,当锁释放时会发布锁释放的消息,收到解锁消息后调用 release() 方法,此时被 semaphore 阻塞的等待队列中的一个线程就可以尝试获取锁了,如果在指定期限内未获得锁,则获取锁失败。
  • 只有未设置锁超时时间(leaseTime),才能使用 Redisson 看门狗机制。

七、Redis 高可用架构下的分布式锁问题

上面讲 Redisson 实现的分布式锁,在单机模式下已经趋近完美了。

但是单点的话故障的话,那就芭比 Q 了,所以我们第一点想到的是部署高可用集群。

目前 Redis 高可用架构主要有主从模式,哨兵模式,集群模式,在这三种模式下使用 Redis 分布式锁存在一个弊端,可能会导致多个客户端同时加锁成功。

Redis 分布式锁详解

 

客户端 A 加锁成功,由于 Reids 主从同步数据是异步执行的,LockA 锁还没来的及同步到 Slave,此时 Master 节点宕机了。

Slave 节点提升为 Master,客户端 B 来加锁,发现没有其他客户端占用锁,LockB 加锁成功。

这时就导致了两个客户端同时获取了锁。

所以,如果使用 Redis 分布式锁,应尽量避免主从、哨兵或集群模式。

八、红锁(Redlock)

1. Redlock 概念

RedLock 是 Redis 作者提出的一个算法。

Redlock 官网介绍

在该算法的分布式版本中,我们假设有 N 个 Redis masters。这些节点是完全独立的,所以我们不使用复制或任何其他隐式协调系统。我们已经描述了如何在单个实例中安全地获取和释放锁。我们想当然地认为,算法将使用这种方法在单个实例中获取和释放锁。在我们的示例中,我们设置了 N=5,这是一个合理的值,因此我们需要在不同的计算机或虚拟机上运行 5 个 Redis 主机,以确保它们以基本独立的方式失败。

为了获取锁,客户端执行以下操作:

  1. 以毫秒为单位获取当前时间。
  2. 使用相同的 key 和随机值在所有 Redis 实例中顺序获取锁。当在每个实例中获取锁时,客户端使用一个超时,该超时与锁自动释放的总时间相比很小,以便获取它。例如,如果自动释放时间为 10 秒,则超时时间可能在 5-50 毫秒范围内。这可以防止客户端在尝试与已关闭的 Redis 节点通话时长时间处于阻塞状态:如果某个实例不可用,我们应该尽快尝试与下一个实例通话。
  3. 客户端通过从当前时间中减去在步骤 1 中获得的时间戳来计算获取锁所用的时间。当且仅当客户端能够在大多数实例(至少 3 个)中获取锁,并且获取锁所用的总时间小于锁有效时间,则认为已获取锁。
  4. 如果获得了锁,其有效时间将被视为初始有效时间减去经过的时间,如步骤 3 中计算的。
  5. 如果客户端由于某种原因(无法锁定 N/2+1 实例或有效期为负)未能获取锁,它将尝试解锁所有实例(即使是它认为无法锁定的实例)。

Redis 作者对红锁的介绍非常详细,点击这里查看。

简单总结下:

假设有五个 Redis 实例,这些实例之间是完全独立的,并且部署在不同的计算机上,客户端尝试在这几个实例中获取锁。

如果客户端能够在大多数实例(N/2+1,至少三个)中获取锁,并且获取锁所有的总时间小于锁有效时间,则认为获取锁成功。

如果加锁成功,锁的有效期=初始有效时间-获取锁的总时间,假如锁有效期为 10s,获取锁共花了 2s,那么锁的有效期还剩 8s。

无论客户端获取锁成功还是失败,都需要解锁所有 Redis 实例,以免发生死锁。

Redis 分布式锁详解

 

使用多个完全独立的 Redis 实例,解决了 Redis 主从异步复制造成的锁丢失问题,同时保障了高可用。

至少 N/2+1 个实例加锁成功,保证锁的互斥性,防止多个客户端同时获取到锁。

2. Redlock 存在问题

表面上看 RedLock 解决 Redis 分布式锁的痛点,但是真的就万无一失了吗?

有人就提出了质疑,Martin Kleppmann: How to do distributed locking

Martin Kleppmann 在效率和正确性方面质疑了红锁,他认为如果是为了效率使用分布式锁,没有必要承担 Redlock 的成本和复杂性,最好还是使用一个 Reids 实例或者主从模式。正确性方面,他认为 Redlock 也绝对保证不了锁的正确性,文章在网络延迟,过程暂停(GC),时钟漂移方面给出了论证。

Redis 作者(Salvatore)也反驳了该质疑:Is Redlock safe?

建议大家读下上面两篇文章。

我个人认为使用 Redlock 要慎重,首先,它的效率比较差,在一些 RT 要求比较高的接口中增加了耗时风险;其次,无法保证绝对的正确性,可能会出现多个客户端同时获取锁的风险(Martin Kleppmann 在他的文章里有举证);再次,成本和复杂性较高。

3. Redisson红锁使用

使用示例:

// 在不同Redis实例上获取 RLock
RLock rLock1 = redisson1.getLock(key);
RLock rLock2 = redisson2.getLock(key);
RLock rLock3 = redisson3.getLock(key);
// 初始化红锁
RedissonRedLock redissonRedLock = new RedissonRedLock(rLock1, rLock2, rLock3);
// 加锁
redissonRedLock.lock();
// 业务逻辑
// 解锁
redissonRedLock.unlock();

复制代码

Redis 分布式锁详解

 

Redisson 在新版本中已经弃用了 RedissonRedLock,不建议使用。



Tags:分布式锁   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
在Redis中如何实现分布式锁的防死锁机制?
在Redis中实现分布式锁是一个常见的需求,可以通过使用Redlock算法来防止死锁。Redlock算法是一种基于多个独立Redis实例的分布式锁实现方案,它通过协调多个Redis实例之间的锁...【详细内容】
2024-02-20  Search: 分布式锁  点击:(50)  评论:(0)  加入收藏
手动撸一个 Redis 分布式锁
大家好呀,我是楼仔。今天第一天开工,收拾心情,又要开始好好学习,好好工作了。对于使用 Java 的小伙伴,其实我们完全不用手动撸一个分布式锁,直接使用 Redisson 就行。但是因为这些...【详细内容】
2024-02-19  Search: 分布式锁  点击:(47)  评论:(0)  加入收藏
Redis分布式锁常见坑点分析
日常开发中,基于 Redis 天然支持分布式锁,大家在线上分布式项目中都使用过 Redis 锁。本文主要针对日常开发中加锁过程中某些异常场景进行讲解与分析。本文讲解示例代码都在 h...【详细内容】
2023-12-11  Search: 分布式锁  点击:(119)  评论:(0)  加入收藏
一文详解分布式锁的看门狗机制
我们今天来看看这个 Redis 的看门狗机制,毕竟现在还是有很多是会使用 Redis 来实现分布式锁的,我们现在看看这个 Redis 是怎么实现分布式锁的,然后我们再来分析这个 Redis 的看...【详细内容】
2023-11-29  Search: 分布式锁  点击:(219)  评论:(0)  加入收藏
全新的分布式锁,功能简单且强大
作者:donnie4w链接:https://my.oschina.net/donnie4w/blog/10114233前言:分布式锁是分布式系统中一个极为重要的工具。目前有多种分布式锁的设计方案,比如借助 redis,mq,数据库,zoo...【详细内容】
2023-10-30  Search: 分布式锁  点击:(269)  评论:(0)  加入收藏
Redis分布式锁失效,数据是否仍存在于内存中?
正文大家好,我是小米,欢迎来到小米的技术分享!今天,我要和大家一起探讨一个有趣而又深奥的话题:Redis分布式锁失效了,数据还存在Redis内存中吗?这个问题在面试中经常被提出,也是我们...【详细内容】
2023-10-11  Search: 分布式锁  点击:(321)  评论:(0)  加入收藏
Redis魔法:点燃分布式锁的奇妙实现
分布式锁是一种用于在分布式系统中控制对共享资源的访问的锁。它与传统的单机锁不同,因为它需要在多个节点之间协调以确保互斥访问。本文将介绍什么是分布式锁,以及使用Redis...【详细内容】
2023-10-11  Search: 分布式锁  点击:(248)  评论:(0)  加入收藏
分布式锁,原来这么简单!
作者 | 蔡柱梁审校 | 重楼目录 分布式锁介绍 如何实现分布式锁 实现分布式锁1 分布式锁介绍现在的服务往往都是多节点,在一些特定的场景下容易产生并发问题,比如扣减库存,送完...【详细内容】
2023-09-22  Search: 分布式锁  点击:(237)  评论:(0)  加入收藏
lua+redis:分布式锁解决方案分享
介绍当我们涉及到多进程或多节点的分布式系统时,传统的单机锁机制不再足够应对并发控制的需求。这是因为在分布式环境中,多个进程或节点同时访问共享资源,传统锁无法有效地协调...【详细内容】
2023-09-13  Search: 分布式锁  点击:(343)  评论:(0)  加入收藏
分布式锁的3种实现!
分布式锁是一种用于保证分布式系统中多个进程或线程同步访问共享资源的技术。同时它又是面试中的常见问题,所以我们本文就重点来看分布式锁的具体实现(含实现代码)。在分布式系...【详细内容】
2023-09-13  Search: 分布式锁  点击:(311)  评论:(0)  加入收藏
▌简易百科推荐
兄弟,王者荣耀的段位排行榜是通过Redis实现的?
在王者荣耀中,我们会打排位赛,而且大家最关注的往往都是你的段位,还有在好友中的排名。作为程序员的你,是否思考过这个段位排行榜是怎么实现的?了解它的实现原理,会不会对上分有所...【详细内容】
2024-04-15    dbaplus社群  Tags:Redis   点击:(3)  评论:(0)  加入收藏
16个Redis常见使用场景总结
来源:blog.csdn.net/qq_39938758/article/details/105577370目录 缓存 数据共享分布式 分布式锁 全局ID 计数器 限流 位统计 购物车 用户消息时间线timeline 消息...【详细内容】
2024-04-11    书圈  Tags:Redis   点击:(8)  评论:(0)  加入收藏
Linux获取Redis 性能指标方法
一、监控指标&Oslash; 性能指标:Performance&Oslash; 内存指标: Memory&Oslash; 基本活动指标:Basic activity&Oslash; 持久性指标: Persistence&Oslash; 错误指标:Error二、监...【详细内容】
2024-04-11  上海天正信息科技有限    Tags:Redis   点击:(10)  评论:(0)  加入收藏
Redis与缓存一致性问题
缓存一致性问题是在使用缓存系统,如Redis时经常遇到的问题。当数据在原始数据源(如数据库)中发生变化时,如何确保缓存中的数据与数据源保持一致,是开发者需要关注的关键问题。一...【详细内容】
2024-04-11  后端Q    Tags:Redis   点击:(8)  评论:(0)  加入收藏
Redis 不再 “开源”,未来采用 SSPLv1 和 RSALv2 许可证
Redis 官方于21日宣布修改开源协议 &mdash;&mdash; 未来所有版本都将使用 “源代码可用” 的许可证 (source-available licenses)。具体来说,Redis 将不再遵循 BSD 3-Clause...【详细内容】
2024-03-27  dbaplus社群    Tags:Redis   点击:(25)  评论:(0)  加入收藏
Redis“叛逃”开源,得罪了几乎所有人
内存数据库供应商Redis近日在开源界砸下了一块“巨石”。Redis即将转向双许可模式,并实施更为严格的许可条款。官方对此次变更的公告直截了当:从Redis 7.4版本开始,Redis将在Re...【详细内容】
2024-03-25    51CTO  Tags:Redis   点击:(13)  评论:(0)  加入收藏
如何使用 Redis 实现消息队列
Redis不仅是一个强大的内存数据存储系统,它还可以用作一个高效的消息队列。消息队列是应用程序间或应用程序内部进行异步通信的一种方式,它允许数据生产者将消息放入队列中,然...【详细内容】
2024-03-22  后端Q  微信公众号  Tags:Redis   点击:(22)  评论:(0)  加入收藏
Redis不再 “开源”
Redis 官方今日宣布修改开源协议 &mdash;&mdash; 未来所有版本都将使用 “源代码可用” 的许可证 (source-available licenses)。具体来说,Redis 将不再遵循 BSD 3-Clause 开...【详细内容】
2024-03-21  OSC开源社区    Tags:Redis   点击:(15)  评论:(0)  加入收藏
在Redis中如何实现分布式锁的防死锁机制?
在Redis中实现分布式锁是一个常见的需求,可以通过使用Redlock算法来防止死锁。Redlock算法是一种基于多个独立Redis实例的分布式锁实现方案,它通过协调多个Redis实例之间的锁...【详细内容】
2024-02-20  编程技术汇    Tags:Redis   点击:(50)  评论:(0)  加入收藏
手动撸一个 Redis 分布式锁
大家好呀,我是楼仔。今天第一天开工,收拾心情,又要开始好好学习,好好工作了。对于使用 Java 的小伙伴,其实我们完全不用手动撸一个分布式锁,直接使用 Redisson 就行。但是因为这些...【详细内容】
2024-02-19  楼仔  微信公众号  Tags:Redis   点击:(47)  评论:(0)  加入收藏
站内最新
站内热门
站内头条