您当前的位置:首页 > 电脑百科 > 程序开发 > 框架

Redisson杂谈,你学到了什么?

时间:2023-10-16 13:13:44  来源: 政采云技术  作者:

Redisson杂谈,你学到了什么?
一.redisson 简介

Redisson 是一个基于.NETty 通信框架的高性能 Redis 客户端, 实现了分布式和可扩展的 JAVA 数据结构,提供很多分布式相关操作服务以及大量便利的工具方法,让开发者可以把精力放在开发业务,避免重复造轮子。

二.Redisson 优点

1.通信框架基于 Netty,使用多路复用。吞吐量高。

2.兼容支持 Redis 集群模式,Reids 哨兵模式等,天然适配分布式服务。

3.提供多种分布式对象的封装,如:Bloom Filter,Object Bucket,Bitset,AtomicLong, 和 HyperLogLog 等。

4.提供分布式锁实现包括:

RedissonFAIrLock 公平锁,

RedissonLock 非公平锁,

RedissonRedLock 红锁(基于红锁算法, 当集群中大多数( N/2 + 1 )加锁成功了,则认为加锁成功,

目前已被弃用,Redisson 官方不再建议使用)。

三.RedissonLock 分布式锁相关部分源码解析

RedissonLock 作为分布式锁,实现了可重入锁。阻塞锁,非阻塞锁。并且 Redisson 存在看门狗机制,可以对未手动设置超时时间的锁实现自动续期。

1.Trylock 加锁

加锁代码逻辑

/**
*
* @param waitTime 获取锁的最大等待时间,默认 -1,
* @param leaseTime 锁的过期时间,默认 -1
* @param unit
* @param threadId
* @return
*/
private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  RFuture<Boolean> acquiredFuture;
  if (leaseTime > 0) {
    //若手动设置了锁的过期时间,则加锁时以当前传入过期时间为准
    //执行Lua脚本,加锁
    acquiredFuture = tryLockInnerAsync(waitTime, leaseTime, unit, 
    threadId,RedisCommands.EVAL_NULL_BOOLEAN);                                                 
  } else {
    //若未手动设置,则默认过期时间等于配置的lockWatchdogTimeout,lockWatchdogTimeout默认为30s。
    //然后执行Lua脚本,加锁
    acquiredFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
  }

  CompletionStage<Boolean> f = acquiredFuture.thenApply(acquired -> {
  //lock acquired
  //若锁成功获取到
  if (acquired) {
    if (leaseTime > 0) {
      internalLockLeaseTime = unit.toMillis(leaseTime);
      } else {
      //若未手动设置过期时间,则执行看门狗任务,自动续期
      scheduleExpirationRenewal(threadId);
    }
  }
  return acquired;
  });
  return new CompletableFutureWrapper<>(f);
}

加锁 Lua 脚本如下:

if (redis.call('exists', KEYS[1]) == 0) then " +
  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  "return nil; " +
  "end; " +
  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  "return nil; " +
  "end; " +
  "return redis.call('pttl', KEYS[1]);

其中 KEYS[1] 是锁逻辑名称,ARGV[1] 是 key 的过期时间,ARGV[2]是锁的线程级别名称( uuid + 线程id ,uuid 是每个 Redisson 客户端创建时唯一生成的)。

由此可看出,锁利用 Hash 结构实现,其中 Hash 的 key 是锁的逻辑名称,field 是锁的线程级别名称,value 是锁的重入次数。

加锁 Lua 脚本的含义:

先判断当前逻辑锁名称的 key 是否存在,

若不存在,在 Hash 结构中设置这个锁,锁重入次数加 1,然后给 key 设置一个过期时间,最后返回 null。

若存在,并且已经被当前线程持有,就锁可重入次数加 1,并且重新设置 key 的过期时间,最后返回 null,

若当前锁被其他线程持有,返回 key 剩余过期时间。

2.Lock 阻塞锁

Lock 阻塞锁与 Trylock 底层调用代码基本一致。多了一个等待锁被其他线程释放后,重新尝试加锁的过程。

代码如下:

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
  long threadId = Thread.currentThread().getId();
  Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
  // lock acquired
  if (ttl == null) {
    return;
  }
  //订阅释放锁消息
  CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
  pubSub.timeout(future);
  RedissonLockEntry entry;
  if (interruptibly) {
    entry = commandExecutor.getInterrupted(future);
  } else {
    entry = commandExecutor.get(future);
  }

  try {
    while (true) {
      //重新尝试取锁
      ttl = tryAcquire(-1, leaseTime, unit, threadId);
      // lock acquired
      if (ttl == null) {
        break;
      }
      // waiting for message,
      if (ttl >= 0) {
        try {
          //当锁仍然被其他线程占有时,调用
          //java.util.concurrent.Semaphore#tryAcquire方法进行信号量阻塞,
          //当线程阻塞等待时间超过最大超时时间(ttl即锁的key的剩余存活时间)
          //或者 监听到锁释放消息后,信号量被释放后,线程不再阻塞
          entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
          if (interruptibly) {
            throw e;
          }
          entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
        }
      } else {
        if (interruptibly) {
          //尝试从信号量获取一个许可
          entry.getLatch().acquire();
        } else {
          entry.getLatch().acquireUninterruptibly();
        }
      }
    }
  } finally {
  //取消订阅锁释放消息
  unsubscribe(entry, threadId);
}

大致流程如下:

1.先获取锁,若获取锁成功,直接返回。

2.若获取失败,订阅释放锁消息。

3.进入 while 循环,重新尝试获取锁。若获取锁成功,则跳出循环,并不再订阅释放锁消息。

4.若重新获取锁失败,进行信号量阻塞,直到锁被其他占有线程释放(监听锁释放消息的监听器中,有唤醒信号量的逻辑)或者到达阻塞超时时间,然后继续这个 while 循环。

3.Unlock 解锁

代码如下

public RFuture<Void> unlockAsync(long threadId) {
  //执行解锁lua脚本
  RFuture<Boolean> future = unlockInnerAsync(threadId);

  CompletionStage<Void> f = future.handle((opStatus, e) -> {
    //取消看门狗任务
    cancelExpirationRenewal(threadId);

    if (e != null) {
      throw new CompletionException(e);
    }
    if (opStatus == null) {
      IllegalMonitorStateException cause = new IllegalMonitorStateException
      ("attempt to unlock lock, not locked by current thread by node id: "
      + id + " thread-id: " + threadId);
      throw new CompletionException(cause);
    }
    return null;
  });

  return new CompletableFutureWrapper<>(f);
}

1.其中解锁 Lua 脚本如下:

if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
  "return nil;" +
  "end; " +
  "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
  "if (counter > 0) then " +
  "redis.call('pexpire', KEYS[1], ARGV[2]); " +
  "return 0; " +
  "else " +
  "redis.call('del', KEYS[1]); " +
  "redis.call('publish', KEYS[2], ARGV[1]); " +
  "return 1; " +
  "end; " +
  "return nil;

其中 KEYS[1] 为锁的逻辑名称,KEYS[2] 为通道名称,ARGV[1] 为 0, ARGV[2] 为锁的过期时间,默认 30s,ARGV[3] 为锁的线程级别名称。

解锁 Lua 脚本含义:

解锁时,先判断当前锁是否被当前线程持有,

若不是,则返回 null。

若是,锁的可重入次数 减1。

然后继续判断锁的可重入次数是否大于 0,若大于 0,继续给这个锁 key 续期 30s,并且最后返回 0。

若不大于 0,删除这个锁的 key,并向指定通道发布这个解锁消息,并且返回 1。

2.如果这个锁有看门狗任务在定时续期,当解锁成功时会取消这个定时续期任务。

4.看门狗机制

当某个锁内的任务的执行时间不可预估时,可能执行时间很长,也可能很短。此时若直接设置一个固定的锁过期时间,可能会导致任务执行时间远远大于锁的过期时间,导致任务还未执行完成,但是锁已经过期了。那其他线程又可以获取到锁,然后执行该任务了,最终导致线程安全问题。

为应对这种情况,定期给锁续期的看门狗机制出现了。

代码:

//真正看门狗续期任务
private void renewExpiration() {
  ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  if (ee == null) {
    return;
  }
  //创建一个延时任务,底层实现是netty时间轮。当每过了lockWatchdogTimeout/3的时间,执行该任务
  Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    @Override
    public void run(Timeout timeout) throws Exception {
      ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ent == null) {
          return;
        }
        Long threadId = ent.getFirstThreadId();
        //若当前锁已经被当前线程释放,则锁不再续期
        if (threadId == null) {
          return;
        }
        //调用Lua脚本,判断当前锁是否被当前线程占有,若是则返回true,
        //并且重新设置key的过期时间,默认30s
        CompletionStage<Boolean> future = renewExpirationAsync(threadId);
        future.whenComplete((res, e) -> {
          if (e != null) {
            log.error("Can't update lock " + getRawName() + " expiration", e);
            EXPIRATION_RENEWAL_MAP.remove(getEntryName());
            return;
            }
            //当锁仍然被当前线程占有,说明业务代码还在执行,则递归调用续期任务
            if (res) {
              // reschedule itself
              log.info("续期任务执行"+ "threadId:" +threadId);
              renewExpiration();
            } else {
              //否则移除该续期任务,直接在EXPIRATION_RENEWAL_MAP移除ExpirationEntry
              cancelExpirationRenewal(null);
            }
        });
    }
  }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
  
  ee.setTimeout(task);
}

当没有显式指定锁过期时间时候,就默认 key 过期时间 30s,然后定时任务每 10 秒( lockWatchdogTimeout/3 )进行一次调用,执行锁续期动作,若这个线程还持有这个锁,就对这个线程持有的锁进行续期操作(通过 pexpire 续期 key 30s),若途中持有锁的线程 手动被 unlock 或者机器宕机才会取消这个任务。否则会一直续期。

四.总结

Redisson 作为一个 Redis 客户端,基于 Redis、Lua 和 Netty 建立起了一套完善的分布式解决方案,比如分布式锁的实现,分布式对象的操作等。本文主要简单讲述了在 Redisson 中分布式锁的实现。其实在 Redisson 中还有很多值得深挖的点。比如:Redisson 中使用了大量 Netty 的特性。大家有兴趣的话,可以仔细研究一下。

五.参考文章

https://Github.com/redisson/redisson/wiki

https://cloud.tencent.com/developer/article/1500854



Tags:Redisson   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
Redisson杂谈,你学到了什么?
一.Redisson 简介Redisson 是一个基于 Netty 通信框架的高性能 Redis 客户端, 实现了分布式和可扩展的 Java 数据结构,提供很多分布式相关操作服务以及大量便利的工具方法,让...【详细内容】
2023-10-16  Search: Redisson  点击:(297)  评论:(0)  加入收藏
Redisson锁机制源码分析
1、了解分布式锁的特性 1、锁的互斥性 也就是说,在任意时刻,只能有一个客户端能获取到锁,不能同时有两个或多个客户端获取到锁。 简单来说,就比如上厕所,一个厕所只有一个坑位...【详细内容】
2023-08-28  Search: Redisson  点击:(259)  评论:(0)  加入收藏
Springboot+Redisson封装分布式锁Starter
我们将分布式锁基于缓存扩展了一版,也就是说本starter即有分布式缓存功能,又有分布式锁功能。而注解版的分布式锁能够解决大多数场景的并核问题,小粒度的Lock锁方式补全其他场...【详细内容】
2023-08-28  Search: Redisson  点击:(254)  评论:(0)  加入收藏
Redisson看门狗机制你了解多少
前言谈到Redisson就不得不说Redis了,一想到Redis就不得不想到并发编程锁机制,一想到锁机制那么就不能不考虑一个很头疼的问题,如何保证原子性的问题,高QPS请求量的系统对每次执...【详细内容】
2023-08-22  Search: Redisson  点击:(195)  评论:(0)  加入收藏
Jedis 与 Redisson 对比有什么优缺点?
Jedis 和 Redisson 都是 Java 开发者广泛使用的 Redis Java 客户端之一。它们能够很好地与 Redis 数据库进行交互,提供丰富的功能和灵活的配置选项。虽然它们都可以用于在 Ja...【详细内容】
2023-05-18  Search: Redisson  点击:(438)  评论:(0)  加入收藏
一文看懂Redisson分布式锁的Watchdog机制源码实现
一、 分布式锁简介分布式锁是一种常见的协调分布式系统的机制,在分布式环境下保证数据的一致性和可用性。分布式锁的实现有很多种方式,其中较为常见的方式是利用Redis实现分布...【详细内容】
2023-04-12  Search: Redisson  点击:(247)  评论:(0)  加入收藏
使用redisson实现分布式秒杀功能
redisson相比原生的jredis具有排队的功能,不一致秒杀时,一时获取锁失败就返回失败。秒杀的原理就是使用redis的分布式锁的功能,保证每次抢购不会出现超卖的情况 1 引入pom...【详细内容】
2022-09-01  Search: Redisson  点击:(466)  评论:(0)  加入收藏
厉害了,原来 Redisson 这么好用
Redis 是最流行的 NoSQL 数据库是解决方案之一,而 Java 是世界上最流行(注意,我没有说“最好”)的编程语言之一。虽然两者看起来很自然地在一起“工作”,但是要知道,Redis 其实并...【详细内容】
2022-06-17  Search: Redisson  点击:(740)  评论:(0)  加入收藏
聊一聊Redis官方置顶推荐的Java客户端Redisson
写这篇的时候,相信有很多朋友还在用Jedis作为Redis的客户端,我不禁有很多问号,Jedis还香吗?如果你早些年说它香我信,但是都2020年了,它真的不那么香了。那为什么还继续使用它呢?大...【详细内容】
2020-10-12  Search: Redisson  点击:(412)  评论:(0)  加入收藏
Redis的三个框架:Jedis,Redisson,Lettuce
Jedis api 在线网址:http://tool.oschina.net/uploads/apidocs/redis/clients/jedis/Jedis.htmlredisson 官网地址:https://redisson.org/redisson git项目地址:https://githu...【详细内容】
2019-11-11  Search: Redisson  点击:(1117)  评论:(0)  加入收藏
▌简易百科推荐
Web Components实践:如何搭建一个框架无关的AI组件库
一、让人又爱又恨的Web ComponentsWeb Components是一种用于构建可重用的Web元素的技术。它允许开发者创建自定义的HTML元素,这些元素可以在不同的Web应用程序中重复使用,并且...【详细内容】
2024-04-03  京东云开发者    Tags:Web Components   点击:(8)  评论:(0)  加入收藏
Kubernetes 集群 CPU 使用率只有 13% :这下大家该知道如何省钱了
作者 | THE STACK译者 | 刘雅梦策划 | Tina根据 CAST AI 对 4000 个 Kubernetes 集群的分析,Kubernetes 集群通常只使用 13% 的 CPU 和平均 20% 的内存,这表明存在严重的过度...【详细内容】
2024-03-08  InfoQ    Tags:Kubernetes   点击:(12)  评论:(0)  加入收藏
Spring Security:保障应用安全的利器
SpringSecurity作为一个功能强大的安全框架,为Java应用程序提供了全面的安全保障,包括认证、授权、防护和集成等方面。本文将介绍SpringSecurity在这些方面的特性和优势,以及它...【详细内容】
2024-02-27  风舞凋零叶    Tags:Spring Security   点击:(52)  评论:(0)  加入收藏
五大跨平台桌面应用开发框架:Electron、Tauri、Flutter等
一、什么是跨平台桌面应用开发框架跨平台桌面应用开发框架是一种工具或框架,它允许开发者使用一种统一的代码库或语言来创建能够在多个操作系统上运行的桌面应用程序。传统上...【详细内容】
2024-02-26  贝格前端工场    Tags:框架   点击:(47)  评论:(0)  加入收藏
Spring Security权限控制框架使用指南
在常用的后台管理系统中,通常都会有访问权限控制的需求,用于限制不同人员对于接口的访问能力,如果用户不具备指定的权限,则不能访问某些接口。本文将用 waynboot-mall 项目举例...【详细内容】
2024-02-19  程序员wayn  微信公众号  Tags:Spring   点击:(39)  评论:(0)  加入收藏
开发者的Kubernetes懒人指南
你可以将本文作为开发者快速了解 Kubernetes 的指南。从基础知识到更高级的主题,如 Helm Chart,以及所有这些如何影响你作为开发者。译自Kubernetes for Lazy Developers。作...【详细内容】
2024-02-01  云云众生s  微信公众号  Tags:Kubernetes   点击:(50)  评论:(0)  加入收藏
链世界:一种简单而有效的人类行为Agent模型强化学习框架
强化学习是一种机器学习的方法,它通过让智能体(Agent)与环境交互,从而学习如何选择最优的行动来最大化累积的奖励。强化学习在许多领域都有广泛的应用,例如游戏、机器人、自动驾...【详细内容】
2024-01-30  大噬元兽  微信公众号  Tags:框架   点击:(67)  评论:(0)  加入收藏
Spring实现Kafka重试Topic,真的太香了
概述Kafka的强大功能之一是每个分区都有一个Consumer的偏移值。该偏移值是消费者将读取的下一条消息的值。可以自动或手动增加该值。如果我们由于错误而无法处理消息并想重...【详细内容】
2024-01-26  HELLO程序员  微信公众号  Tags:Spring   点击:(84)  评论:(0)  加入收藏
SpringBoot如何实现缓存预热?
缓存预热是指在 Spring Boot 项目启动时,预先将数据加载到缓存系统(如 Redis)中的一种机制。那么问题来了,在 Spring Boot 项目启动之后,在什么时候?在哪里可以将数据加载到缓存系...【详细内容】
2024-01-19   Java中文社群  微信公众号  Tags:SpringBoot   点击:(86)  评论:(0)  加入收藏
花 15 分钟把 Express.js 搞明白,全栈没有那么难
Express 是老牌的 Node.js 框架,以简单和轻量著称,几行代码就可以启动一个 HTTP 服务器。市面上主流的 Node.js 框架,如 Egg.js、Nest.js 等都与 Express 息息相关。Express 框...【详细内容】
2024-01-16  程序员成功  微信公众号  Tags:Express.js   点击:(86)  评论:(0)  加入收藏
站内最新
站内热门
站内头条