您当前的位置:首页 > 电脑百科 > 数据库 > Redis

实战基于Redis实现阻塞队列

时间:2020-08-02 15:27:06  来源:  作者:

日常需求开发过程中,不免会遇到需要通过代码进行异步处理的情况,比如批量发送邮件,批量发送短信,数据导入,为了减少用户的等待,不希望一直菊花转啊转,因此需要进行异步处理,做法就是讲要处理的数据添加到队列当中,然后按照排队的先后顺序进行异步处理。

这个队列,可以是专业的消息队列,如 RocketMQ/RabbitMQ 等,一般项目中,如果只是为了进行异步,未免有点杀鸡用牛刀的意味。

也可以使用基于 JVM 内存实现队列,但是如果项目进行了重启,就会造成队列数据丢失。

大部分的项目都会用到 redis 中间件作为缓存使用,此时使用 Redis 的 list 结构来实现队列则是非常合适的选择。

因此,本文主要讲解基于 Redis 的方式实现异步队列。

基于 Redis 的 list 实现队列的方式也有多种,先说第一种不推荐的方式,即使用 LPUSH 生产消息,然后 while(true) 中通过 RPOP 消费消息,这种方式的确可以实现,但是不断代码不断的轮询,势必会消耗一些系统的资源。

第二种方式也是不推荐的方式,也是通过 LPUSH 生产消息,然后通过 BRPOP 进行 阻塞地 等待并消费消息,这种方式较第一种方式减少了无用的轮询,降低系统资源的消耗,但是可能会存在队列消息丢失的情况,如果取出了消息然后处理失败,这个被取出的消息就将丢失。

第二种方式就是下文要介绍的方式,首先也是通过 LPUSH 生产消息,然后通过 BRPOPLPUSH 阻塞 地等待 list 新消息到来,有了新消息才开始消费,同时将消息备份到另外一个 list 当中,这种方式具备了第二种方式的优点,即减少了无用的轮询,同时也对消息进行了备份不会丢失数据,如果处理成功,可以通过 LREM 对备份的 list 中当前的这条消息进行删除处理。这种方式实现方式可以参考 模式: 安全的队列 .

实战基于Redis实现阻塞队列

 

Redis 基础

# 将一个或多个值 value 插入到列表 key 的表头
LPUSH key value [value …]

# 阻塞式等待,将列表 source 中的最后一个元素 (尾元素) 弹出,并返回给客户端。将 source 弹出的元素插入到列表 destination ,作为 destination 列表的的头元素。超时参数 timeout 接受一个以秒为单位的数字作为值。超时参数设为 0 表示阻塞时间可以无限期延长 (block indefinitely) 。
BRPOPLPUSH source destination timeout

# 根据参数 count 的值,移除列表中与参数 value 相等的元素。
LREM key count value

代码实现队列消息生产者

笔者使用的是 Spring 相关 API 实现对 Redis 指令的调用。首先实现消息的生产代码,封装到一个工具类方法当中。这里很简单,就是调用了 lpush 方法,将序列化的 key 和 value 添加到列表当中去。

@Resource
private RedisConnectionFactory connectionFactory;

public void lPush(@Nonnull String key, @Nonnull String value) {
  RedisConnection connection = RedisConnectionUtils.getConnection(connectionFactory);
  try {
    byte[] byteKey = RedisSerializer.string().serialize(getKey(key));
    byte[] byteValue = RedisSerializer.string().serialize(value);
    assert byteKey != null;
    connection.lPush(byteKey, byteValue);
  } finally {
    RedisConnectionUtils.releaseConnection(connection, connectionFactory);
  }
}

代码实现队列消息消费者

因为实现队列消费消息的代码比较多,不可能每个需要阻塞消费的地方,对需要写这一坨代码,因此使用 JAVA8 的函数式接口实现方法的传递,同时阻塞式获取消息代码使用新线程去执行。

有人看到以下代码要吐槽了,不是说不用 while(true) 吗,怎么你这里面还是有,这里稍微解释一下,因为 SpringBoot 一般会指定 timeout 的全局超时时间,即使 BRPOPLPUSH 设置了 0,即无限期,当超出了 timeout 设置的值时,就会抛出 QueryTimeoutException 异常导致线程退出,因此添加了 try/catch 对异常进行捕获并忽略,同时使用 while(true) 保证线程可以继续执行。

代码中记录了当前消息处理结果,如果处理结果为成功,需要对备份队列的当前消息进行删除。

public void bRPopLPush(@Nonnull String key, Consumer<String> consumer) {
  CompletableFuture.runAsync(() -> {
    RedisConnection connection = RedisConnectionUtils.getConnection(connectionFactory);
    try {
      byte[] srcKey = RedisSerializer.string().serialize(getKey(key));
      byte[] dstKey = RedisSerializer.string().serialize(getBackupKey(key));
      assert srcKey != null;
      assert dstKey != null;
      while (true) {
        byte[] byteValue = new byte[0];
        boolean success = false;
        try {
          byteValue = connection.bRPopLPush(0, srcKey, dstKey);
          if (byteValue != null && byteValue.length != 0) {
            consumer.accept(new String(byteValue));
            success = true;
          }
        } catch (Exception ignored) {
          // 防止获取 key 达到超时时间抛出 QueryTimeoutException 异常退出
        } finally {
          if (success) {
            // 处理成功才删除备份队列的 key
            connection.lRem(dstKey, 1, byteValue);
          }
        }
      }
    } finally {
      RedisConnectionUtils.releaseConnection(connection, connectionFactory);
    }
  });
}

测试代码

@Test
public void testLPush() throws InterruptedException {
  String queueA = "queueA";
  int i = 0;
  while (true) {
    String msg = "Hello-" + i++;
    redisBlockQueue.lPush(queueA, msg);
    System.out.println("lPush: " + msg);
    Thread.sleep(3000);
  }
}

@Test
public void testBRPopLPush() {
  String queueA = "queueA";
  redisBlockQueue.bRPopLPush(queueA, (val) -> {
    // 在这里处理具体的业务逻辑
    System.out.println("val: " + val);
  });

  // 防止 Junit 进程退出
  LockSupport.park();
}

运行结果:

实战基于Redis实现阻塞队列

 

项目使用方式

为了方便使用,我将其抽取为了一个工具类,使用时通过 Spring 注入使用即可,

队列消费可以使用如下方式在项目启动的时候就进行阻塞监听队列,等待消费

@Resource
private RedisBlockQueue redisBlockQueue;

@PostConstruct
public void init() {
   redisBlockQueue.bRPopLPush(xx, (value) -> {
     //...
   });
}


Tags:Redis 阻塞队列   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
日常需求开发过程中,不免会遇到需要通过代码进行异步处理的情况,比如批量发送邮件,批量发送短信,数据导入,为了减少用户的等待,不希望一直菊花转啊转,因此需要进行异步处理,做法就是...【详细内容】
2020-08-02  Tags: Redis 阻塞队列  点击:(65)  评论:(0)  加入收藏
▌简易百科推荐
来源: my.oschina.net/xiaomu0082/blog/2990388首先说下问题现象:内网sandbox环境API持续1周出现应用卡死,所有api无响应现象刚开始当测试抱怨环境响应慢的时候 ,我们重启一下应...【详细内容】
2021-12-08  Java识堂    Tags:Redis   点击:(16)  评论:(0)  加入收藏
我不知道为什么你会选择对特定数量的“错误”(或警告)如此具体。听起来您正在寻找将要发布到 Yahoo! 的某些文章的内容。 Insider (N Foos to Blah for the BlahBlah)。那说:...【详细内容】
2021-12-07  富集云科技有限公司    Tags:Redis   点击:(14)  评论:(0)  加入收藏
目录 一、背景 二、步骤 0.理论支持 1、获取数据 2、结果 3、分析数据并评估大小 三、关于repl-backlog-size 一、背景 repl-backlog-size控制这个环形缓冲区. ​ 主从断...【详细内容】
2021-11-05  弈秋的美好生活    Tags:redis   点击:(41)  评论:(0)  加入收藏
Redis 性能测试是通过同时执行多个命令实现的。1,Redis-benchmarkRedis性能命令:redis性能命令格式: redis-benchmark [option] [option value] redis 性能测试工具可选参数如...【详细内容】
2021-11-02  川石信息    Tags:Redis   点击:(41)  评论:(0)  加入收藏
1 概述数据结构和内部编码 无传统关系型数据库的 Table 模型schema 所对应的db仅以编号区分。同一 db 内,key 作为顶层模型,它的值是扁平化的。即 db 就是key的命名空间。 key...【详细内容】
2021-11-01  JavaEdge    Tags:Redis   点击:(28)  评论:(0)  加入收藏
普通java中使用引用Java redis 驱动,即可连接:import redis.clients.jedis.Jedis; public class RedisTestJava { public static void main(String[] args) { //连...【详细内容】
2021-10-13  faesuite    Tags:Redis   点击:(34)  评论:(0)  加入收藏
Redis常用的数据结构有 string list set zset hashstringstring 是 Redis 的基本的数据类型,一个 key 对应一个 value。string 类型是二进制安全的,Redis的string可以包含任...【详细内容】
2021-10-12  语霖    Tags:Redis   点击:(36)  评论:(0)  加入收藏
列表类型可以存储一组按插入顺序排序的字符串,它非常灵活,支持在两端插入、弹出数据,可以充当栈和队列的角色。> LPUSH fruit apple(integer) 1> RPUSH fruit banana(integer)...【详细内容】
2021-09-17  深夜敲代码    Tags:Redis   点击:(54)  评论:(0)  加入收藏
Redis持久化意义 是做灾难恢复,数据恢复,也可以归类到高可用的一个环节里面去,比如你的redis整个挂了,然后redis就不可用了,你要做的事情是让redis变得可用,尽快变得可用 大量的请...【详细内容】
2021-08-12  小李说IT    Tags:Redis   点击:(77)  评论:(0)  加入收藏
当查询Redis中没有的数据时,该查询会下沉到数据库层,同时数据库层也没有该数据,当这种情况大量出现或被恶意攻击时,接口的访问全部透过Redis访问数据库,而数据库中也没有这些数据...【详细内容】
2021-07-30  随便t    Tags:缓存穿透   点击:(90)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条