您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

时间轮在Netty、Kafka中的应用

时间:2022-08-23 16:48:00  来源:  作者:鸨哥学Java

时间轮是一个高性能、低消耗的数据结构,它适合用非准实时,延迟的短平快任务,例如心跳检测。.NETty、Kafka、Zookeeper中都有使用。

时间轮可通过时间与任务存储分离的形式,轻松实现百亿级海量任务调度。

Netty中的时间轮

Netty动辄管理100w+的连接,每一个连接都会有很多超时任务。比如发送超时、心跳检测间隔等,如果每一个定时任务都启动一个Timer,不仅低效,而且会消耗大量的资源。

时间轮

时间轮的格子

格子里的任务

时间轮运转线程

HashedWheelTimer

HashedWheelBucket

HashedWheelTimeout

Worker

其他一些属性:

时间轮零点时间:startTime
当前指针所指格子:tick
格子长度(持续时间):tickDuration
时间轮运转轮次、回合:remAIningRounds
任务截止时间、触发时间(相对时间轮的startTime):deadline

概括时间轮工作流程

(阅读Netty3.10.6)

1、时间轮的启动并不是在构造函数中,而是在第一次提交任务的时候newTimeout()
2、启动时间轮第一件事就是初始化时间轮的零点时间startTime,以后时间轮上的任务、格子触发时间计算都相对这个时间
3、随着时间的推移第一个格子(tick)触发,在触发每个格子之前都是处于阻塞状态,并不是直接去处理这个格子的所有任务,而是先从任务队列timeouts中拉取最多100000个任务,根据每个任务的触发时间deadline放在不同的格子里(注意,Netty中会对时间轮上的每一个格子进行处理,即使这个格子没有任务)
4、时间轮运转过程中维护着一个指针tick,根据当前指针获取对应的格子里的所有任务进行处理
5、任务自身维护了一个剩余回合(remainingRounds),代表任务在哪一轮执行处理,只有该值为0时才进行处理

代码做了删减,只体现重点

时间轮构造器:

初始化了时间轮大小、每个格子大小、时间轮运转线程

public HashedWheelTimer(
    ThreadFactory threadFactory,
    ThreadNameDeterminer determiner,
    long tickDuration, TimeUnit unit, int ticksPerWheel) {
 

    // TODO : 创建时间轮底层存储任务的数据结构
    wheel = createWheel(ticksPerWheel);
    // TODO : 求某一个任务落到哪个格子时需要用到的编码
    mask = wheel.length - 1;

    // TODO : 每个格子的时间
    this.tickDuration = unit.toNanos(tickDuration);

    // TODO : 时间轮处理任务的线程
    workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
        worker, "Hashed wheel timer #" + id.incrementAndGet(),
        determiner));
}
// TODO : 时间轮真正存储数据的容器
private final HashedWheelBucket[] wheel;
// TODO : 存放任务的队列
private final Queue<HashedWheelTimeout> timeouts = new ConcurrentLinkedQueue<HashedWheelTimeout>();

外界提交任务的时候,代码如下

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
 
    // TODO : 启动时间轮运转线程
    start();

    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    // TODO : 任务放入到队列中,并没有一开始就放到时间轮上
    timeouts.add(timeout);
    return timeout;
}

时间轮运转执行任务,代码如下

public void run() {
 
    // TODO : 初始化时间轮的
    startTime = System.nanoTime();

    do {
 
        // TODO : 这个方法会阻塞,随着时间的推动会触发新的任务(tick),返回当前时间
        final long deadline = waitForNextTick();
        if (deadline > 0) {
 
            // TODO : 将队列中的任务最多取100000放到时间轮上
            transferTimeoutsToBuckets();
            // TODO : 获取当前格子
            HashedWheelBucket bucket = wheel[(int) (tick & mask)];
            // TODO : 执行时间轮上当前格子上的任务
            bucket.expireTimeouts(deadline);
            // TODO : 指针走动
            tick++;
        }
    } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
}

run内部方法解析

waitForNextTick等待下一个格子触发,代码如下

private long waitForNextTick() {
 
    // TODO : 截止时间、触发时间
    // TODO : 获取当前格子的触发时间,因为时间轮底层是使用数组存储任务数据,所以tick需要+1
    long deadline = tickDuration * (tick + 1);
    /**
             * tick : 时间轮上的格子
             * tickDuration : 每个格子的长度,持续时间
             * deadline : 这里表示下一个格子的触发时间(触发一个格子的任务)相对时间轮起点时间(startTime)的时长
             */
    
    for (;;) {
 
        // TODO : 相对时间轮起点的当前时间
        final long currentTime = System.nanoTime() - startTime;
        // TODO : 当当前时间大于等于deadline的时候,就会跳出循环
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

        if (sleepTimeMs <= 0) {
 
            if (currentTime == Long.MIN_VALUE) {
 
                return -Long.MAX_VALUE;
            } else {
 
                return currentTime;
            }
        }
        try {
 
            // TODO : 并不是一直循环
            Thread.sleep(sleepTimeMs);
        } catch (InterruptedException e) {
 
            if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
 
                return Long.MIN_VALUE;
            }
        }
    }
}

transferTimeoutsToBuckets将队列中任务存储到时间轮上,代码如下

private void transferTimeoutsToBuckets() {
 
    for (int i = 0; i < 100000; i++) {
 
        // TODO : 从队列中取出任务
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
 
            // all processed 已全部处理
            break;
        }
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED
            || !timeout.compareAndSetState(HashedWheelTimeout.ST_INIT, HashedWheelTimeout.ST_IN_BUCKET)) {
 
            // 期间被取消。所以只需从队列中删除它并继续下一个 HashedWheelTimeout
            timeout.remove();
            continue;
        }
        // TODO : 计算这个任务要走多少个格子
        long calculated = timeout.deadline / tickDuration;
        // TODO : 计算触发当前这个任务还要走多少轮,剩余回合!
        /**
                 * calculated:触发该任务一共要走的格子数
                 * tick:当前已经走的格子数
                 * wheel.length:时间轮的长度
                 */
        long remainingRounds = (calculated - tick) / wheel.length;
        // TODO : 任务自身携带了触发自己的轮次
        timeout.remainingRounds = remainingRounds;
        final long ticks = Math.max(calculated, tick); 
        // TODO : mask = wheel.length - 1
        int stopIndex = (int) (ticks & mask);

        // TODO : 将任务放到时间轮的对应格子中
        HashedWheelBucket bucket = wheel[stopIndex];
        bucket.addTimeout(timeout);
    }
}

expireTimeouts执行处理任务,代码如下

public void expireTimeouts(long deadline) {
 
    HashedWheelTimeout timeout = head;

    while (timeout != null) {
 
        boolean remove = false;
        // TODO : 根据剩余回合判断是否要处理该任务,如果大于0说明还没轮到该任务
        if (timeout.remainingRounds <= 0) {
 
            // TODO : 如果时间已经到了,则执行任务
            /**
                     * deadline 是相对时间轮startTime的当前时间,也是当前格子的触发时间
                     * timeout.deadline 是任务的触发时间
                     */
            if (timeout.deadline <= deadline) {
 
                // TODO :
                timeout.expire();
            } else {
 
                // The timeout was placed into a wrong slot. This should never hAppen.
                throw new IllegalStateException(String.format(
                    "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            }
            remove = true;
        } else if (timeout.isCancelled()) {
 
            remove = true;
        } else {
 
            timeout.remainingRounds --;
        }
        // store reference to next as we may null out timeout.next in the remove block.
        HashedWheelTimeout next = timeout.next;
        if (remove) {
 
            remove(timeout);
        }
        timeout = next;
    }
}

Kafka中的时间轮

Produce 时等待 ISR 副本复制成功、延迟删除主题、会话超时检查、延迟创建主题或分区等,会被封装成不同的 DelayOperation 进行延迟处理操作,防止阻塞 Kafka请求处理线程。

名称

时间轮

时间轮的格子(桶)

格子(桶)里的任务

时间轮运转线程

处理过期任务线程

类名

TimingWheel

TimerTaskList

TimerTaskEntry

ShutdownableThread

ExecutorService

属性名

timingWheel

bucket

rootheadtail

expirationReaper

taskExecutor

其他一些属性:

时间轮零点时间:startMs
当前时间:currentTime
格子长度(持续时间):tickMs
时间轮大小:wheelSize
时间轮的当前层时间跨度:interval = tickMs * wheelSize
到期时间:expiration
溢出轮、升层的时间轮:overflowWheel: TimingWheel

概括时间轮工作流程

(阅读Kafka-3.1.0)

Kafka 中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。

1、Kafka启动的时候就启动了时间轮
2、ExpiredOperationReaper.doWork() 循环执行,首先从全局的delayQueue中获取一个bucket,如果不为空则上锁处理
3、根据bucket的到期时间尝试推进,然后会刷一次bucket中的所有任务,这些任务要么是需要立即执行的(即到期时间在 currentTime 和 currentTime + tickMs 之间),要么是需要换桶的,往前移位(即到期时间大于等于 currentTime + tickMs);立即计算的直接提交给专门的线程处理
4、最后拉取delayQueue中下一个bucket处理,一直循环下去
5、添加一个任务,首先是根据任务的到期时间expiration来判断自己会落到哪一个bucket,如果expiration不小于currentTime + tickMs,则可能是当前时间轮的任一个bucket,也可能是溢出轮中的任一个bucket
6、当任务添加到某一个bucket后会判断是否跟新了桶的到期时间,如果更新了则需要入队处理delayQueue.offer

代码做了删减,只体现重点

1、Kafka中自己封装了一个可关闭的线程类 Shutdown’able’Thread ,也就是实现了该类的 ExpiredOperationReaper 内部实现了 doWork() 方法,维护着时间轮的运转

private class ExpiredOperationReaper extends ShutdownableThread(
    "ExpirationReaper-%d-%s".format(brokerId, purgatoryName),
    false) {
 

    override def doWork(): Unit = {
 
        advanceClock(200L)
    }
}

2、推进时钟的内部实现

def advanceClock(timeoutMs: Long): Boolean = {
 
    // TODO : 阻塞 timeoutMs = 200 毫秒,拉取一个桶:有直接返回,没有则阻塞200毫秒
    var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
        if (bucket != null) {
 
            writeLock.lock()
                try {
 
                    while (bucket != null) {
 
                        // TODO : 传入当前桶的过期时间,尝试推进时间
                        timingWheel.advanceClock(bucket.getExpiration)
                        // TODO : 无论推进时间是否成功,当前桶的这些任务要么是需要立即执行的(即到期时间在 currentTime 和 currentTime + tickMs 之间),
                        //  要么是需要换桶的,往前移位(即到期时间大于等于 currentTime + tickMs);立即计算的直接提交给专门的线程处理
                        bucket.flush(addTimerTaskEntry)
                        // TODO : 进行下一个桶处理
                        bucket = delayQueue.poll()
                    }
                } finally {
 
                    writeLock.unlock()
                }
            true
        } else {
 
            false
        }
}

3、尝试推进时钟

def advanceClock(timeMs: Long): Unit = {
 
    /**
     * currentTime + tickMs :当前桶过期时间的截止时间
     * timeMs :下一个桶的过期时间
     */
    if (timeMs >= currentTime + tickMs) {
 
      // currentTime 是 tickMs 的整数倍
      currentTime = timeMs - (timeMs % tickMs)
      // TODO : 尝试推进溢出轮的时间
      if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
    }
  }

4、bucket.flush(addTimerTaskEntry) 传入的是一个方法之后桶内的每一个任务都会走一次该方法

// TODO : 添加或处理任务
  private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
 
    // TODO : 只有到期时间在 currentTime 和 currentTime + tickMs 之间的任务才会被直接处理
    if (!timingWheel.add(timerTaskEntry)) {
 
      // Already expired or cancelled
      if (!timerTaskEntry.cancelled) {
 
        // TODO : 只处理过期时间到达且不是被取消的任务
        taskExecutor.submit(timerTaskEntry.timerTask)
      }
    }
  }

5、添加任务到时间轮的入口也是地4步的方法,其中timingWheel.add(timerTaskEntry) 方法中会判断每一个任务是立即处理还是入队

/**
   * 添加一个任务
   * 添加任务的过程比较复杂,首先是根据任务的到期时间来判断自己会落到哪一个bucket,可能是当前时间轮任一个bucket,也可能是溢出轮中的任一个bucket
   */
  def add(timerTaskEntry: TimerTaskEntry): Boolean = {
 
    // TODO : 任务到期时间
    val expiration = timerTaskEntry.expirationMs
    if (timerTaskEntry.cancelled) {
 
      false
    } else if (expiration < currentTime + tickMs) {
 
      // TODO : 距离该任务到期仅剩最多 tickMs 毫秒了
      // TODO : currentTime当前指向的时间格也属于到期部分,表示刚好到期
      false
    } else if (expiration < currentTime + interval) {
 
      // TODO : 距离该任务到期小于一整轮的时间,大于一个格子的时间,说明它就在当前层,不需要升层
      val virtualId = expiration / tickMs
      val bucket = buckets((virtualId % wheelSize.toLong).toInt)
      bucket.add(timerTaskEntry)
      // TODO : 如果该任务的到来改变了他所进入的桶的过期时间,即轮子已经前进并且之前的桶被重用了
      // TODO : 桶是同一个桶,但是数据可能不是同一轮的,这时需要重新入队 DelayQueue
      if (bucket.setExpiration(virtualId * tickMs)) {
 
        queue.offer(bucket)
      }
      true
    } else {
 
      // TODO : 需要升层 过期时间超过了 interval
      if (overflowWheel == null) addOverflowWheel()
      overflowWheel.add(timerTaskEntry)
    }
  }

需要升层的情况:其实每一个时间轮对象内都有一个溢出轮的指针 overflowWheel ,他会指向父级时间轮。

Kafka 使用时间轮来实现延时队列,因为其底层是任务的添加和删除是基于链表实现的,是 O(1) 的时间复杂度,满足高性能的要求;

对于时间跨度大的延时任务,Kafka 引入了层级时间轮,能更好控制时间粒度,可以应对更加复杂的定时任务处理场景;

对于如何实现时间轮的推进和避免空推进影响性能,Kafka 采用空间换时间的思想,通过 DelayQueue 来推进时间轮,算是一个经典的 trade off。



Tags:Netty   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
Netty入门实践:模拟IM聊天
我们使用的框架几乎都有网络通信的模块,比如常见的Dubbo、RocketMQ、ElasticSearch等。它们的网络通信模块使用Netty实现,之所以选择Netty,有两个主要原因: Netty封装了复杂的JD...【详细内容】
2023-12-08  Search: Netty  点击:(241)  评论:(0)  加入收藏
Netty入门实践-模拟IM聊天
我们使用的框架几乎都有网络通信的模块,比如常见的Dubbo、RocketMQ、ElasticSearch等。它们的网络通信模块使用Netty实现,之所以选择Netty,有2个主要原因: Netty封装了复杂的JDK...【详细内容】
2023-12-05  Search: Netty  点击:(108)  评论:(0)  加入收藏
如何使用Netty模拟一个Web服务端
Netty作为Web服务端具有以下好处:高性能Netty是一个基于事件驱动和异步非阻塞的网络编程框架,它使用了高效的NIO(非阻塞输入输出)模型。这使得Netty在处理大量并发连接时表现出...【详细内容】
2023-09-11  Search: Netty  点击:(272)  评论:(0)  加入收藏
玩转Netty,从“Hello World”开始!
为什么要用Netty?首先当然是NIO的使用,本身比较复杂,而且还存在一些问题。除此之外,如果在项目的开发中,要实现稳定的网络通信,就得考虑网络的闪断、客户端的重复接入、客户端的...【详细内容】
2023-05-16  Search: Netty  点击:(417)  评论:(0)  加入收藏
Netty和原生Java的性能比较
Netty是一种基于异步事件循环的网络应用编程方法。本文对比Netty与Java的本地服务器。虽然目前本地服务器使用的人不多,但我仍要找出netty比本地服务器的好处有多少。让我们...【详细内容】
2023-05-16  Search: Netty  点击:(88)  评论:(0)  加入收藏
基于Netty搭建WebSocket,模仿微信聊天页面
作者:小傅哥 博客:https://bugstack.cn 沉淀、分享、成长,让自己和他人都能有所收获!前言介绍 本章节我们模仿微信聊天页面,开发一个基于Netty搭建WebSocket通信案例。Netty的应...【详细内容】
2023-03-17  Search: Netty  点击:(237)  评论:(0)  加入收藏
Netty:遇到TCP发送缓冲区满了 写半包操作该如何处理
什么是写半包写半包:一份数据,一次发送没有把他全部发送,需要循环发送,那么第一次的操作称为写半包什么情况下会出现写半包:发送方发送200byte,但是接收方只能接受100byte,因此发送...【详细内容】
2023-03-13  Search: Netty  点击:(209)  评论:(0)  加入收藏
Java使用Netty框架自建DNS代理服务器教程
前言DNS协议作为着互联网客户端-服务器通信模式得第一关,在当下每天都有成千上亿上网记录产生得当今社会,其重要性自然不可言喻。在国内比较有名得DNS服务器有电信得114.114.1...【详细内容】
2023-02-09  Search: Netty  点击:(149)  评论:(0)  加入收藏
Netty基础介绍(使用场景、组件、模型、代码示例等)
Netty简介Netty 对 JDK 自带的 NIO 的 API 进行了良好的封装,解决了如客户端面临断线重连、 网络闪断、心跳处理、半包读写、 网络拥塞和异常流的处理等等问题。且Netty拥有...【详细内容】
2023-01-10  Search: Netty  点击:(146)  评论:(0)  加入收藏
从Redis、HTTP协议,看Nett协议设计,我发现了个惊天大秘密
1. 协议的作用TCP/IP 中消息传输基于流的方式,没有边界协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则2. Redis 协议如果我们要向 Redis 服务器发送一条 set...【详细内容】
2023-01-03  Search: Netty  点击:(308)  评论:(0)  加入收藏
▌简易百科推荐
Netflix 是如何管理 2.38 亿会员的
作者 | Surabhi Diwan译者 | 明知山策划 | TinaNetflix 高级软件工程师 Surabhi Diwan 在 2023 年旧金山 QCon 大会上发表了题为管理 Netflix 的 2.38 亿会员 的演讲。她在...【详细内容】
2024-04-08    InfoQ  Tags:Netflix   点击:(2)  评论:(0)  加入收藏
即将过时的 5 种软件开发技能!
作者 | Eran Yahav编译 | 言征出品 | 51CTO技术栈(微信号:blog51cto) 时至今日,AI编码工具已经进化到足够强大了吗?这未必好回答,但从2023 年 Stack Overflow 上的调查数据来看,44%...【详细内容】
2024-04-03    51CTO  Tags:软件开发   点击:(7)  评论:(0)  加入收藏
跳转链接代码怎么写?
在网页开发中,跳转链接是一项常见的功能。然而,对于非技术人员来说,编写跳转链接代码可能会显得有些困难。不用担心!我们可以借助外链平台来简化操作,即使没有编程经验,也能轻松实...【详细内容】
2024-03-27  蓝色天纪    Tags:跳转链接   点击:(13)  评论:(0)  加入收藏
中台亡了,问题到底出在哪里?
曾几何时,中台一度被当做“变革灵药”,嫁接在“前台作战单元”和“后台资源部门”之间,实现企业各业务线的“打通”和全域业务能力集成,提高开发和服务效率。但在中台如火如荼之...【详细内容】
2024-03-27  dbaplus社群    Tags:中台   点击:(9)  评论:(0)  加入收藏
员工写了个比删库更可怕的Bug!
想必大家都听说过删库跑路吧,我之前一直把它当一个段子来看。可万万没想到,就在昨天,我们公司的某位员工,竟然写了一个比删库更可怕的 Bug!给大家分享一下(不是公开处刑),希望朋友们...【详细内容】
2024-03-26  dbaplus社群    Tags:Bug   点击:(5)  评论:(0)  加入收藏
我们一起聊聊什么是正向代理和反向代理
从字面意思上看,代理就是代替处理的意思,一个对象有能力代替另一个对象处理某一件事。代理,这个词在我们的日常生活中也不陌生,比如在购物、旅游等场景中,我们经常会委托别人代替...【详细内容】
2024-03-26  萤火架构  微信公众号  Tags:正向代理   点击:(11)  评论:(0)  加入收藏
看一遍就理解:IO模型详解
前言大家好,我是程序员田螺。今天我们一起来学习IO模型。在本文开始前呢,先问问大家几个问题哈~什么是IO呢?什么是阻塞非阻塞IO?什么是同步异步IO?什么是IO多路复用?select/epoll...【详细内容】
2024-03-26  捡田螺的小男孩  微信公众号  Tags:IO模型   点击:(9)  评论:(0)  加入收藏
为什么都说 HashMap 是线程不安全的?
做Java开发的人,应该都用过 HashMap 这种集合。今天就和大家来聊聊,为什么 HashMap 是线程不安全的。1.HashMap 数据结构简单来说,HashMap 基于哈希表实现。它使用键的哈希码来...【详细内容】
2024-03-22  Java技术指北  微信公众号  Tags:HashMap   点击:(11)  评论:(0)  加入收藏
如何从头开始编写LoRA代码,这有一份教程
选自 lightning.ai作者:Sebastian Raschka机器之心编译编辑:陈萍作者表示:在各种有效的 LLM 微调方法中,LoRA 仍然是他的首选。LoRA(Low-Rank Adaptation)作为一种用于微调 LLM(大...【详细内容】
2024-03-21  机器之心Pro    Tags:LoRA   点击:(12)  评论:(0)  加入收藏
这样搭建日志中心,传统的ELK就扔了吧!
最近客户有个新需求,就是想查看网站的访问情况。由于网站没有做google的统计和百度的统计,所以访问情况,只能通过日志查看,通过脚本的形式给客户导出也不太实际,给客户写个简单的...【详细内容】
2024-03-20  dbaplus社群    Tags:日志   点击:(4)  评论:(0)  加入收藏
站内最新
站内热门
站内头条