时间轮
基于排序链表的定时器(Linux定时器 - 定时的方法)存在一个问题:添加定时器的效率偏低。下面我们要讨论的时间轮解决了这个问题。下图是一种简单的时间轮:
上图时间轮内,(实现)指针指向轮子上的一个槽(slot)。它以恒定的速度顺时针转动,每转动一步就指向下一个槽(虚线指针指向的槽),每次转动称为一个滴答(tick)。一个滴答的时间称为时间轮的槽间隔si(slot interval),它实际上就是心跳时间。该时间轮共有N个槽,因此它运转一周的时间是N*si。每个槽指向一条定时器链表,每条链表上的定时器具有相同的特征:它们的定时时间相差N*si的整数倍。时间轮正是利用这个关系将定时器散列到不同的链表中。假如现在指针指向槽cs,我们要添加一个定时时间为ti的定时器,则该定时器被插入的槽ts(time slot)对应的链表中:
ts = (cs + ti/si) % N
基于排序链表的定时器使用唯一的一条链表来管理所有定时器,所以插入操作的效率随着定时器数目的增多而降低。而时间轮使用哈希表的思想,将定时器散列到不同的链表上。这样每条链表上的定时器数目都将明显少于原来的排序链表上的定时器数目,插入操作的效率基本不受定时器数目的影响。
很显然,对时间轮而言,要提高定时精度,就要使si值足够小;要提高执行效率,则要求N值足够大。
上图描述的是一种简单的时间轮,因为它只有一个轮子。而复杂的时间轮可能有多个轮子,不同的轮子拥有不同的粒度。相邻的两个轮子,精度高的转一圈,精度低的仅往前移动一槽,就像水表一样。
下面的示例代码展示了一个较为简单的时间轮实现代码:
#ifndef TIME_WHEEL_TIMER_H
#define TIME_WHEEL_TIMER_H
#include <time.h>
#include <netinet/in.h>
#include <stdio.h>
#define BUFFER_SIZE 64
class tw_timer;
/*解绑socket和定时器*/
struct client_data
{
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
tw_timer* timer;
};
/*定时器类*/
class tw_timer
{
public:
tw_timer(int rot, int ts)
:next(NULL), prev(NULL), rotation(rot), time_slot(ts){}
public:
int rotation; /*记录定时器在时间轮转多少圈生效*/
int time_slot; /*记录定时器属于时间轮上哪个槽(对应的链表)*/
void (*cb_func)(client_data*); /*定时器回调函数*/
client_data* user_data; /*客户数据*/
tw_timer* next;
tw_timer* prev;
};
class time_wheel
{
public:
time_wheel():cur_slot(0)
{
for(int i = 0; i < N; ++i){
slots[i] = NULL;
}
}
~time_wheel()
{
for(int i = 0; i < N; ++i)
{
tw_timer* tmp = slots[i];
while(tmp){
slots[i] = tmp->next;
delete tmp;
tmp = slots[i];
}
}
}
tw_timer* add_timer(int timeout)
{
if(timeout < 0){
return NULL;
}
int ticks = 0;
/*根据插入定时器的超时时间计算它将在时间轮转动多少个滴答后被触发
*并将该滴答数存于变量ticks。
*如果待输入定时器的超时值小于时间轮的槽间隔SI,则将ticks向上折为1,
*否则就将ticks向下折为timeout/SI*/
if (timeout < SI){
ticks = 1;
}
else{
ticks = timeout / SI;
}
/*计算待插入的定时器在时间轮转动多少圈后被触发*/
int rotation = ticks/ N;
/*计算待插入的定时器应该被插入哪个槽中*/
int ts = (cur_slot + ticks%N) % N;
/*创建新的定时器,它在时间论转动rotation圈之后触发,且位于ts个槽上*/
tw_timer* timer = new tw_timer(rotation, ts);
/*如果第ts个槽上尚无任何定时器,则把新建的定时器插入其中,
*并将该定时器设置为该槽的头节点*/
if(!slots[ts]){
printf("add timer, rotation is %d, ts is %d, cur_slot is %dn",
rotation, ts, cur_slot);
slots[ts] = timer;
}
else{
timer->next = slots[ts];
slots[ts]->prev = timer;
slots[ts] = timer;
}
return timer;
}
void del_timer(tw_timer* timer)
{
if(!timer){
return;
}
int ts = timer->time_slot;
if(timer == slots[ts]){
slots[ts] = slots[ts]->next;
if(slots[ts]){
slots[ts]->prev = NULL;
}
delete timer;
}
else{
timer->prev->next = timer->next;
if(timer->next){
timer->next->prev = timer->prev;
}
delete timer;
}
}
void tick()
{
tw_timer* tmp = slots[cur_slot]; /*取得时间轮上当前槽点的头节点*/
printf("current slot is %dn", cur_slot);
while(tmp){
printf("tick the timer oncen");
/*如果定时器rotation值大于0, 则它在这一轮不起作用*/
if(tmp->rotation > 0){
tmp->rotation--;
tmp = tmp->next;
}
/*否则定时器已经到期于是执行任务,然后删除该定时器*/
else{
tmp->cb_func(tmp->user_data);
if(tmp == slots[cur_slot]){
printf("delete header in cur_slot");
slots[cur_slot] = tmp->next;
delete tmp;
if(slots[cur_slot]){
slots[cur_slot]->prev = NULL;
}
tmp = slots[cur_slot];
}
else{
tmp->prev->next = tmp->next;
if(tmp->next){
tmp->next->prev = tmp->prev;
}
tw_timer* tmp2 = tmp->next;
delete tmp;
tmp = tmp2;
}
}
}
cur_slot = ++cur_slot % N;
}
private:
/*时间轮上槽的数目*/
static const int N = 60;
/*每1秒时间轮转动一次,即槽间隔1s*/
static const int SI = 1;
/*时间轮的槽,其中每个元素只想一个定时器链表,链表无序*/
tw_timer *slots[N];
int cur_slot; /*时间轮的当前槽*/
};
#endif
时间堆
前面讨论的定时方案都是以固定的频率调用心跳函数tick,并在其中依次检测到期的定时器,然后执行到期定时器上的回调函数。
设计定时器的另外一种思路是:将所有定时器中超时时间最小的一个定时器的超时值作为心跳间隔。这样一旦心跳函数tick被调用,超时时间最小的定时器必然到期,我们就可以在tick函数中处理该定时器。然后,再次从剩余的定时器中找出较小的一个,并将这段最小时间设置为下一次心跳间隔,如此反复,就实现了较为精确的定时。
最小堆很适合处理这种方案。最小堆是值每个节点的值都小于或等于其字节点的值的完全二叉树。
下图给了一个具有6个元素的最小堆。
树的基本操作是插入节点和删除节点。对最小堆而言,它们都很简单。为了将一个元素X插入最小堆,我们可以在树的下一个空闲位置创建一个空穴。如果X可以放在空穴而不破坏堆序,则完成插入。都则就执行上滤操作,即交换空穴和它的父节点上的元素。不断执行上述过程,直到X可以被放入空穴,则插入操作完成。
比如我们要在最小堆中插入值为14的元素,则按如下步骤操作:
最小堆的删除操作指的是删除其根节点上的元素,并且不破坏堆序性质。执行删除操作时,我们需要先在根节点处创建一个空穴。由于堆现在少了一个元素,因此我们可以把堆的最后一个元素X移动到该堆的某个地方。如果X可以可以放入空穴,则删除操作完成,否则就执行下滤操作,即交换空穴和它的两个儿子节点中的较小者,不断进行上述过程,直到X可以放入空穴,则删除操作完成。
下图展示了对最小堆执行删除操作的步骤:
由于最小堆是一种完全二叉树,所以我们可以用数组来组织其中的元素,最先的最小堆图可以如下图所示:
对于数组中的任意位置i上的元素,其左儿子节点在位置2i+1上,其右儿子节点则在位置2i+2上,其父节点在[(i-2)/2](i>0)上。与用链表来表示堆相比,用数组表示堆不仅节省空间,而且更实现堆的插入、删除等操作。
假设我们已经有一个包含N个元素的数组,现在要把它初始化为一个最小堆。那么最简单的方法是:初始化一个空堆,然后将数组中的每个元素插入该堆中。不过这样做的效率偏低。实际上,我们只需要对数组中第[(N-1)/2]~0个元素执行下滤操作,即可确保该数组构成一个最小堆。这是因为对包含N个元素的完全二叉树而言,它具有[(N-1)/2]个非叶子节点。这些非叶子节点正式完全二叉树的第0~[(N-1)/2]和节点。我们只需要这些非叶子节点构成的子树都具有堆序性质,整个树就具有堆序性质。
下面是示例代码:
#ifndef MIN_HEAP
#define MIN_HEAP
#include <IOStream>
#include <netinet/in.h>
#include <time.h>
using std::exception;
#define BUFFER_SIZE 64
class heap_timer;
struct client_data
{
sockaddr_in address;
int sockfd;
char buf[BUFFER_SIZE];
heap_timer* timer;
};
class heap_timer
{
public:
heap_timer(int delay)
{
expire = time(NULL) + delay;
}
public:
time_t expire; /*定时器生效的绝对时间*/
void (*cb_func)(client_data*); /*定时器的回调函数*/
client_data* user_data; /*用户数据*/
};
class time_heap
{
public:
/*构造函数之一, 初始化一个大小为cap的空栈*/
time_heap(int cap) throw(std::exception): capacity(cap), cur_size(0)
{
array = new heap_timer*[capacity];
if(!array){
throw std::exception();
}
for(int i = 0; i < capacity ; ++i){
array[i] = NULL;
}
}
/*构造函数之二,用已有数组来初始化堆*/
time_heap(heap_timer** init_array, int size, int capacity) throw
(std::exception): cur_size(size), capacity(capacity)
{
if(capacity < size){
throw std::exception();
}
array = new heap_timer*[capacity];
if(!array){
throw std::exception();
}
for(int i = 0; i < capacity; ++i){
array[i] = NULL;
}
if(size != 0){
for(int i = 0; i < size; ++i){
array[i] = init_array[i];
}
for(int i = (cur_size - 1)/2; i >= 0; --i){
/*对数组的第[(cur_size -1)/2] ~ 0和元素执行下滤操作*/
percolate_down(i);
}
}
}
/*销毁时间堆*/
~time_heap()
{
for(int i = 0; i < cur_size; ++i){
delete array[i];
}
delete [] array;
}
void add_timer(heap_timer* timer) throw(std::exception)
{
if(!timer){
return;
}
if(cur_size >= capacity){
resize();
}
int hole = cur_size++;
int parent = 0;
/*对从空穴到根节点的路径上的所有节点执行上滤操作*/
for(; hole >0; hole = parent){
parent = (hole-1)/2;
if(array[parent]->expire <= timer->expire){
break;
}
array[hole] = array[parent];
}
array[hole] = timer;
}
void del_timer(heap_timer* timer)
{
if(!timer){
return;
}
/*仅仅将目标定时器的回调函数设置为空,即所谓的延迟销毁。
*这将节省真正删除该定时器造成的开销,但这样做容易市堆数组膨胀*/
timer->cb_func = NULL;
}
heap_timer* top() const
{
if(empty()){
return NULL;
}
return array[0];
}
void pop_timer()
{
if(empty()){
return;
}
if(array[0]){
delete array[0];
array[0] = array[--cur_size];
percolate_down(0);
}
}
bool empty() const{
return cur_size == 0;
}
void tick(){
heap_timer* tmp = array[0];
time_t cur = time(NULL);
while(!empty()){
if(!tmp){
break;
}
if(tmp->expire > cur){
break;
}
if(array[0]->cb_func){
array[0]->cb_func(array[0]->user_data);
}
pop_timer();
tmp = array[0];
}
}
public:
private:
/*最小堆下率操作,确保堆数组中以第hole个节点为根的子树拥有最小堆性质*/
void percolate_down(int hole)
{
heap_timer *temp = array[hole];
int child = 0;
for( ; (hole*2 + 1) <= cur_size-1 ; hole = child){
child = hole*2+1;
if(child < (cur_size -1) && array[child+1]->expire < array[child]->expire){
++child;
}
if(array[child]->expire < temp->expire){
array[hole] = array[child];
}
else{
break;
}
}
array[hole] = temp;
}
/*将堆数组容量扩大一倍*/
void resize() throw (std::exception)
{
heap_timer ** temp = new heap_timer* [2*capacity];
for(int i = 0; i < 2*capacity; ++i){
temp[i] = NULL;
}
if(!temp){
throw std::exception();
}
capacity = 2 * capacity;
for(int i = 0; i < cur_size; ++i){
temp[i] = array[i];
}
delete [] array;
array = temp;
}
heap_timer** array; /*堆数组*/
int capacity; /*堆数组的容量*/
int cur_size; /*堆数组当前包含元素的个数*/
};
#endif