您当前的位置:首页 > 电脑百科 > 数据库 > Redis

一篇详解Redis延时队列

时间:2019-12-25 11:54:54  来源:  作者:

redis的 list 数据结构常用来作为 异步消息队列 使用,使用 rpush/lpush 操作 入队 ,使用 lpop/rpop 来操作 出队

一篇详解Redis -- 延时队列

 

> rpush my-queue Apple banana pear
(integer) 3
> llen my-queue
(integer) 3
> lpop my-queue
"apple"
> llen my-queue
(integer) 2
> lpop my-queue
"banana"
> llen my-queue
(integer) 1
> lpop my-queue
"pear"
> llen my-queue
(integer) 0
> lpop my-queue
(nil)

空队列

  1. 如果队列为空,客户端会陷入 pop的死循环 , 空轮询 不仅拉高了 客户端的CPU , Redis的QPS 也会被拉高
  2. 如果空轮询的客户端有几十个, Redis的慢查询 也会显著增加,可以尝试让客户端线程 sleep 1s
  3. 但睡眠会导致消息的 延迟增大 ,可以使用 blpop/brpop (blocking, 阻塞读 )
  • 阻塞读在队列没有数据时,会立即进入 休眠 状态,一旦有数据到来,会立即被 唤醒, 消息延迟几乎为0

空闲连接

  1. 如果线程一直阻塞在那里,Redis的客户端连接就成了 闲置连接
  2. 闲置过久, 服务器 一般会 主动断开 连接, 减少闲置的资源占用 ,此时 blpop/brpop 会 抛出异常

锁冲突处理

  1. 分布式锁 加锁失败 的处理策略
  2. 直接抛出异常 ,通知用户稍后重试
  3. sleep 后再重试
  4. 将请求转移到 延时队列 ,过一会重试
  5. 抛出异常
  6. 这种方式比较适合由 用户直接发起 的请求
  7. sleep
  8. sleep会 阻塞 当前的消息处理线程,从而导致队列的后续消息处理出现 延迟
  9. 如果 碰撞比较频繁 ,sleep方案不合适
  10. 延时队列
  11. 比较适合异步消息处理的场景,通过将当前冲突的请求转移到另一个队列 延后处理来 避免冲突

延时队列

  1. 可以通过Redis的 zset 来实现延时队列
  2. 将消息序列化成一个字符串作为zet的 value ,将该消息的 到期处理时间 作为 score
  3. 然后 多线程轮询 zset获取 到期的任务 进行处理
  • 多线程是为了保障 可用性 ,但同时要考虑 并发安全 ,确保 任务不能被多次执行
public class RedisDelayingQueue<T> {
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
 private static class TaskItem<T> {
 private String id;
 private T msg;
 }
 private Type taskType = new TypeReference<TaskItem<T>>() {
 }.getType();
 private Jedis jedis;
 private String queueKey;
 public RedisDelayingQueue(Jedis jedis, String queueKey) {
 this.jedis = jedis;
 this.queueKey = queueKey;
 }
 public void delay(T msg) {
 TaskItem<T> task = new TaskItem<>(UUID.randomUUID().toString(), msg);
 jedis.zadd(queueKey, System.currentTimeMillis() + 5000, JSON.toJSONString(task));
 }
 public void loop() {
 // 可以进一步优化,通过Lua脚本将zrangeByScore和zrem统一挪到Redis服务端进行原子化操作,减少抢夺失败出现的资源浪费
 while (!Thread.interrupted()) {
 // 只取一条
 Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
 if (values.isEmpty()) {
 try {
 Thread.sleep(500);
 } catch (InterruptedException e) {
 break;
 }
 continue;
 }
 String s = values.iterator().next();
 if (jedis.zrem(queueKey, s) > 0) {
 // zrem是多线程多进程争夺任务的关键
 TaskItem<T> task = JSON.parseobject(s, taskType);
 this.handleMsg(task.msg);
 }
 }
 }
 private void handleMsg(T msg) {
 try {
 System.out.println(msg);
 } catch (Throwable ignored) {
 // 一定要捕获异常,避免因为个别任务处理问题导致循环异常退出
 }
 }
 public static void main(String[] args) {
 final RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(new Jedis("localhost", 16379), "q-demo");
 Thread producer = new Thread() {
 @Override
 public void run() {
 for (int i = 0; i < 10; i++) {
 queue.delay("zhongmingmao" + i);
 }
 }
 };
 Thread consumer = new Thread() {
 @Override
 public void run() {
 queue.loop();
 }
 };
 producer.start();
 consumer.start();
 try {
 producer.join();
 Thread.sleep(6000);
 consumer.interrupt();
 consumer.join();
 } catch (InterruptedException ignored) {
 }
 }
}


Tags:Redis 延时   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
Redis的 list 数据结构常用来作为 异步消息队列 使用,使用 rpush/lpush 操作 入队 ,使用 lpop/rpop 来操作 出队 > rpush my-queue apple banana pear(integer) 3> llen my-qu...【详细内容】
2019-12-25  Tags: Redis 延时  点击:(169)  评论:(0)  加入收藏
▌简易百科推荐
来源: my.oschina.net/xiaomu0082/blog/2990388首先说下问题现象:内网sandbox环境API持续1周出现应用卡死,所有api无响应现象刚开始当测试抱怨环境响应慢的时候 ,我们重启一下应...【详细内容】
2021-12-08  Java识堂    Tags:Redis   点击:(18)  评论:(0)  加入收藏
我不知道为什么你会选择对特定数量的“错误”(或警告)如此具体。听起来您正在寻找将要发布到 Yahoo! 的某些文章的内容。 Insider (N Foos to Blah for the BlahBlah)。那说:...【详细内容】
2021-12-07  富集云科技有限公司    Tags:Redis   点击:(14)  评论:(0)  加入收藏
目录 一、背景 二、步骤 0.理论支持 1、获取数据 2、结果 3、分析数据并评估大小 三、关于repl-backlog-size 一、背景 repl-backlog-size控制这个环形缓冲区. ​ 主从断...【详细内容】
2021-11-05  弈秋的美好生活    Tags:redis   点击:(41)  评论:(0)  加入收藏
Redis 性能测试是通过同时执行多个命令实现的。1,Redis-benchmarkRedis性能命令:redis性能命令格式: redis-benchmark [option] [option value] redis 性能测试工具可选参数如...【详细内容】
2021-11-02  川石信息    Tags:Redis   点击:(41)  评论:(0)  加入收藏
1 概述数据结构和内部编码 无传统关系型数据库的 Table 模型schema 所对应的db仅以编号区分。同一 db 内,key 作为顶层模型,它的值是扁平化的。即 db 就是key的命名空间。 key...【详细内容】
2021-11-01  JavaEdge    Tags:Redis   点击:(28)  评论:(0)  加入收藏
普通java中使用引用Java redis 驱动,即可连接:import redis.clients.jedis.Jedis; public class RedisTestJava { public static void main(String[] args) { //连...【详细内容】
2021-10-13  faesuite    Tags:Redis   点击:(34)  评论:(0)  加入收藏
Redis常用的数据结构有 string list set zset hashstringstring 是 Redis 的基本的数据类型,一个 key 对应一个 value。string 类型是二进制安全的,Redis的string可以包含任...【详细内容】
2021-10-12  语霖    Tags:Redis   点击:(36)  评论:(0)  加入收藏
列表类型可以存储一组按插入顺序排序的字符串,它非常灵活,支持在两端插入、弹出数据,可以充当栈和队列的角色。> LPUSH fruit apple(integer) 1> RPUSH fruit banana(integer)...【详细内容】
2021-09-17  深夜敲代码    Tags:Redis   点击:(54)  评论:(0)  加入收藏
Redis持久化意义 是做灾难恢复,数据恢复,也可以归类到高可用的一个环节里面去,比如你的redis整个挂了,然后redis就不可用了,你要做的事情是让redis变得可用,尽快变得可用 大量的请...【详细内容】
2021-08-12  小李说IT    Tags:Redis   点击:(77)  评论:(0)  加入收藏
当查询Redis中没有的数据时,该查询会下沉到数据库层,同时数据库层也没有该数据,当这种情况大量出现或被恶意攻击时,接口的访问全部透过Redis访问数据库,而数据库中也没有这些数据...【详细内容】
2021-07-30  随便t    Tags:缓存穿透   点击:(91)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条