redis的 list 数据结构常用来作为 异步消息队列 使用,使用 rpush/lpush 操作 入队 ,使用 lpop/rpop 来操作 出队
> rpush my-queue Apple banana pear (integer) 3 > llen my-queue (integer) 3 > lpop my-queue "apple" > llen my-queue (integer) 2 > lpop my-queue "banana" > llen my-queue (integer) 1 > lpop my-queue "pear" > llen my-queue (integer) 0 > lpop my-queue (nil)
空队列
空闲连接
锁冲突处理
延时队列
public class RedisDelayingQueue<T> { @Data @AllArgsConstructor @NoArgsConstructor private static class TaskItem<T> { private String id; private T msg; } private Type taskType = new TypeReference<TaskItem<T>>() { }.getType(); private Jedis jedis; private String queueKey; public RedisDelayingQueue(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } public void delay(T msg) { TaskItem<T> task = new TaskItem<>(UUID.randomUUID().toString(), msg); jedis.zadd(queueKey, System.currentTimeMillis() + 5000, JSON.toJSONString(task)); } public void loop() { // 可以进一步优化,通过Lua脚本将zrangeByScore和zrem统一挪到Redis服务端进行原子化操作,减少抢夺失败出现的资源浪费 while (!Thread.interrupted()) { // 只取一条 Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); if (values.isEmpty()) { try { Thread.sleep(500); } catch (InterruptedException e) { break; } continue; } String s = values.iterator().next(); if (jedis.zrem(queueKey, s) > 0) { // zrem是多线程多进程争夺任务的关键 TaskItem<T> task = JSON.parseobject(s, taskType); this.handleMsg(task.msg); } } } private void handleMsg(T msg) { try { System.out.println(msg); } catch (Throwable ignored) { // 一定要捕获异常,避免因为个别任务处理问题导致循环异常退出 } } public static void main(String[] args) { final RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(new Jedis("localhost", 16379), "q-demo"); Thread producer = new Thread() { @Override public void run() { for (int i = 0; i < 10; i++) { queue.delay("zhongmingmao" + i); } } }; Thread consumer = new Thread() { @Override public void run() { queue.loop(); } }; producer.start(); consumer.start(); try { producer.join(); Thread.sleep(6000); consumer.interrupt(); consumer.join(); } catch (InterruptedException ignored) { } } }