在任务调度的场景中,经常遇到以下需求:
1、某个操作失败后,每隔1、2、5、10、20秒去重试
2、一篇公众号文章发布之后,要在5分钟之后推送到粉丝终端
这种场景当然可以通过 RocketMQ 这类支持延迟消息的中间件来做,如果从任务调度的角度该怎么做,任务队列怎么选择呢?从题干可知以下三要素:触发条件,延迟任务存储,时间到达之后的操作。这是一个非常典型的生产者消费者模型,生产者往任务队列放任务,消费者从队列中消费任务。先抛开生产者、消费者不讲,以下只讨论任务队列的选择。
JAVA DelayQueue、redis Sorted Set 都是延迟任务很好的选择,除此之外时间轮队列也是一种非常好,也非常适合的设计。本文对 DelayQueue 和 时间轮做特点分析和对比。
DelayQueue 本质是一个无界的优先级阻塞队列,内部封装了 PriorityQueue,提供了 put 接口放入任务,take 接口获取任务。添加元素时需要设置一个延迟时间,在有任务到达执行时间前,take 接口是阻塞的。所以 DelayQueue 实现延时,核心功能有:
1. 有序性,延时短的先出列2. put、take 后依然有序
3. 没有元素到达时间,take被阻塞
4. 线程安全
第1、2点是 PriorityQueue 实现的,第 3、4 点是 DelayQueue 自己实现的。
PriorityQueue 内部使用小顶堆来维持顺序,最小堆并没有通过二叉树实现,而是使用了数组存储 Object[] queue。每次取出最小元素的时间复杂度是 O(1),使用数组寻址非常快,但是增删元素时需要重排序,时间复杂度是O(logN),最终其每次存取时间复杂度是 O(logN)。
take 元素取第 0 个
每次 put 时,需要排序达到新平衡。比较下标通过 int parent = (k - 1) >>> 1; 计算,take 也是类似的计算。
OK,明白了,当队列为空或者到达时间之前,通过 ReentrantLock + condition 配合使用实现 take 时线程安全和阻塞,put 时同样使用同一把 ReentrantLock,进而 condition signal 缓存等待的线程,和AQS基本类似。
DelayQueue 是一个很优秀的设计,精巧、漂亮,使用小顶堆 O(logN) 时间复杂度维持队列有序,ReentrantLock + condition 组合保证线程安全、阻塞,高效而也没有队列空轮询。这样优秀的设计存在什么问题?小任务量是没有问题,当任务非常多,且高频时就不太好了,为什么?
i. 锁
put、take 共用一把锁,在 take lock 生效时,是不能 put 的。什么是take lock 生效呢?在 take 过程中,有几处 await ,一旦调用了 condition.await() 即便没有调用 lock.unlock(),也会暂时释放锁,等待唤醒。此时是不会阻塞put操作的,但是,当有需要出列的元素时,put、take就会产生互斥,任务量大时,这俩就会产生性能问题。
ii. 数据结构
PriorityQueue 使用数组来存储元素,这是一种高效的结构,但是要求连续空间,任务量很大时是否一定有足够大的连续空间?如果没有就需要 GC,或者进入老年代了。
iii. 不支持去重
同一个时间点的同一个任务,不应该在队列出现两次。
微信平台上有多少公众号不清楚,应该是一个很大的数字,每天早上和晚上都会产生大量的公众号推送,如果使用 DelayQueue 能否满足需要呢?恐怕很难。
时间轮也被广泛用于延时任务调度,比如Kafka、Netty,同样 XXL-JOB 也使用时间轮做任务调度。时间轮是一种非常巧妙的思想,没有查到其出处,个人感觉应该跟「CPU 时间片轮转调度算法」有关。基本结构如下
时间轮基本结构
Kafka 为了实现多时间跨度调度,实现了多级时间轮
Kafka 时间轮
时间轮类似于钟表,上面有时分秒的刻度,秒针一秒一秒的跳,对应到秒级任务,如果刻度跨度为毫秒则可以实现毫秒调度。每一个刻度后面都连接着当前秒应该执行的任务,系统每次都调度当前时间指针下的任务。其数据结构就可以看成「数组+链表」的实现,时间轮是一种算法思想,在各开发语言中没有对应的实现,HashMap、Dict 也可以满足要求。XXL-JOB 就是使用的 ConcurrentHashMap<Integer,List<Integer>>。时间轮相比于 DelayQueue 怎么样呢,又存在哪些问题,该怎么解决呢?
时间轮靠时间指针调度,不存在任务排序,每次都是取出当前刻度下的任务。每次都是O(1),假如当前时刻下有多个任务,当前队列需要全部弹出。
与 DelayQueue 不同,时间轮每个刻度下都是一个独立的队列,各队列的存取互不影响,可以通过线程安全的队列来实现,所以存取的并发性能比 DelayQueue 高很多。
时间轮的本质是随着时间的推移,不断的轮询当前时间刻度下的任务,与 DelayQueue 不同,即使时间轮为空时,依然轮询,会产生空轮询。在 XXL-JOB 里就存在这种问题,幸好一般情况下,都是秒级调度,空轮询也没有大影响。但如果是毫秒级调度,空轮询是会影响系统性能的,比如 epoll CPU100% 的bug 和 Netty 空轮询bug。
需要保证在一个时间刻度下,一个任务不能出现两次,不然可能会多次调度。
理想中的时间轮是能够满足以上 4 点要求,revolver 通过跳跃表数组实现基本结构ConcurrentSkipListSet<Integer>[],ConcurrentSkipListSet 是一个基于跳跃表实现的无锁的线程安全的、有序列、去重的列表表,向列表put元素并排序的时间复杂度是O(logN),ReentrantLock + condition + AtomicInteger 计数器避免空轮询。
当任务数=0 时,调用 condition.await() take 被阻塞,当元素个数从无变成有的时候,调用 condition.signal(),也是一个典型的生产者消费者模型。
某资料上就用这种设计做出了监控10亿级定时任务检测系统,时间轮是一种设计方式,非特定的数据结构,除了性能好之外其扩展性也是非常好的。
欢迎关注公众号:看起来很美(kanqilaihenmei_)
-- 完 --