您当前的位置:首页 > 电脑百科 > 程序开发 > 架构

好奇延时消息如何实现吗?来看RocketMQ是怎么做的-源码解读

时间:2022-08-03 17:08:55  来源:  作者:杂文论

一、背景

之前在做电商相关业务的时候,有一个常见的需求场景是:用户下单之后,超过半小时不支付,就取消订单。现在我们在淘宝京东买东西,或者通过美团点外卖,下单之后,如果不在指定时间内支付,订单也会取消。 那么,如何实现这样的超时取消逻辑呢,通过消息队列的延时消息,是一个非常稳定的实现方案。

RocketMQ 就提供了这样的延时消息功能,producer 端在发送消息时,设置延迟级别,从秒级到分钟小时等等。消息在发送之后,会在消息队列的服务器进行存储。等过了设定的延迟时间之后,消息才会被consumer端消费到。

 

如果我们在下单的时候,发送一条设置延时30分钟的消息,这条消息会在30分钟之后被下游系统消费,然后判断订单有没有支付,如果没有支付,则取消订单。那么这样,通过消息队列就完成了一个延迟取消的逻辑了。

二、原理

设置延时

先来看一下如何设置消息的延时 消息体可以通过setDelayTimeLevel方法来设置延时级别

public void produce() {
    Message msg = new Message("TopicTest",
        "TagA",
        "OrderID188",
        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    msg.setDelayTimeLevel(1)
    SendResult sendResult = producer.send(msg);
}

public void consume() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.subscribe("TopicTest", "TagA");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
}

其实是将延迟信息存到 Message 的 property 中(property是一个保存meta信息的hashmap)

public void setDelayTimeLevel(int level) {
    this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
}

void putProperty(final String name, final String value) {
    if (null == this.properties) {
        this.properties = new HashMap<String, String>();
    }

    this.properties.put(name, value);
}

之后,broker收到 message之后,会根据 message 中设置的延时级别进行处理 可以看看延时级别的具体情况: 一共分为18个级别(1-18),对应时间从1s到2h

public class MessageStoreConfig {
    
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

}

那么整个系统是怎么做到让consumer在设定的延时之后,开始消费指定消息的呢?

不得不说,RocketMQ 的设计还是挺巧妙的,我们接着往下看。

消息预存

对于broker收到的延时消息,并不是和普通消息一样,进入发送端指定的topic中, 而是进入专门的延时topic中,延时topic有18条队列(queueId 编号0-17),queueId 和 delayLevel 的关系是 queueId + 1 = delayLevel,是一一对应的。所以计算延时消息的待执行时间deliverTimestamp之后,会将消息存入对应延时级别的队列中。

// 如果是延迟消息
if (msg.getDelayTimeLevel() > 0) {
    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
    }
    
    // 重设延迟消息的topic和queueId,topic为指定的RMQ_SYS_SCHEDULE_TOPIC
    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

    ...
    // 将实际的指定topic和queueId进行存入property,进行备份
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));        
    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

    msg.setTopic(topic);
    msg.setQueueId(queueId);
}

之后,会由ScheduleMessageService来进行任务处理。ScheduleMessageService是broker启动时就开始执行的,用来处理延迟队列中的消息,处理的逻辑如下所示。

public class ScheduleMessageService extends ConfigManager {

    // key: delayLevel | value: delayTimeMillis 
    private final ConcurrentMap<Integer, Long> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);

    public void start() {
        // 创建一个Timer,用于执行定时任务
        this.timer = new Timer("ScheduleMessageTimerThread", true);
            
        // 这里对每个delayLevel的queue都创建一个DeliverDelayedMessageTimerTask,
        // 用来处理对应queue中的消息
        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }
    }
    
}

ScheduleMessageService启动之后,会根据延时队列的数目创建一一对应的
DeliverDelayedMessageTimerTask,然后周期执行。该类继承自TimerTask,是JDK的工具类,用于执行定时任务,原理可以参考这篇文章 如何实现定时任务- JAVA Timer/TimerTask 源码原理解析

消息转投

可以看到
DeliverDelayedMessageTimerTask实现的 run 方法,主要逻辑都在executeOnTimeup方法中,从对应的延迟队列中取出时间已到的 message,发送到 message 对应原始topic的队列中。只要队列没有发生消费积压,message 就会马上被消费了。(这部分的代码实现比较复杂,感兴趣可以去看对应的源码)

class DeliverDelayedMessageTimerTask extends TimerTask {
    private final int delayLevel;
    private final long offset;

    public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
        this.delayLevel = delayLevel;
        this.offset = offset;
    }

    @Override
    public void run() {
        try {
            if (isStarted()) {
                this.executeOnTimeup();
            }
        } catch (Exception e) {
            // XXX: warn and notify me
            log.error("ScheduleMessageService, executeOnTimeup exception", e);
            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
        }
    }

    public void executeOnTimeup() {
        ConsumeQueue cq =
            ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                delayLevel2QueueId(delayLevel));

        long fAIlScheduleOffset = offset;

        if (cq != null) {
            
            // 这部分是核心逻辑,实现的是 从延时消息队列中取出 deliverTimestamp - now <= 0 的消息,
            // 将消息从延时queue移到原本指定Topic的queue中,这些消息就马上会被consumer消费。
        } 

        ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
            failScheduleOffset), DELAY_FOR_A_WHILE);
    }
}

总体的原理示意图,如下所示:

 

broker 在接收到延时消息的时候,会将延时消息存入到延时TOPIC的队列中,然后ScheduleMessageService中,每个 queue 对应的定时任务会不停地被执行,检查 queue 中哪些消息已到设定时间,然后转发到消息的原始TOPIC,这些消息就会被各自的 producer 消费了。

三、拓展-消费重试

平常在使用RocketMQ的时候,一般会依赖consumer的消费重试功能。 而consumer端的消费重试,其实也是通过这个和延时队列差不多的原理来实现的。

public void consume() {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.subscribe("TopicTest", "TagA");
    consumer.registerMessageListener(new MessageListenerConcurrently() {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            // 这里如果返回RECONSUME_LATER,就会重试消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
}

RocketMQ规定,以下三种情况统一按照消费失败处理并会发起重试。

  • 业务消费方返回ConsumeConcurrentlyStatus.RECONSUME_LATER
  • 业务消费方返回null
  • 业务消费方主动/被动抛出异常

业务代码中,一般会利用重试功能去做下游逻辑的重试。而RocketMQ的重试并不是固定时间间隔重复进行,二是采取的退避式重试,重试的时间间隔会不断变长。 这个时间间隔,和设置delayLevel的延时类似。

Consumer客户端会通过processConsumeResult方法处理每一条消息的消费结果,如果判断需要进行重试,则会通过sendMessageBack方法将消息发送到broker,重试消息会带上已重试次数的信息。

broker收到消息之后,SendMessageProcessor会对重试消息进行处理,设置topic为RETRY_TOPIC,具体逻辑如下:

public class SendMessageProcessor
    extends AbstractSendMessageProcessor
    implements.NETtyRequestProcessor {

    private RemotingCommand asyncConsumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request)
        throws RemotingCommandException {

        // 给重试消息设置新的topic
        String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
        int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

        // 根据已经发生重试的次数确定delayLevel
        if (0 == delayLevel) {
            delayLevel = 3 + msgExt.getReconsumeTimes();
        }
        msgExt.setDelayTimeLevel(delayLevel);

        // 重试次数+1
        msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);

        // 存储消息
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

        // ...
    }   
}

存储消息的时候,CommitLog.putMessage方法内部判断如果设置了delayLevel,就会重设topic为SCHEDULE_TOPIC,然后将消息存储到延时队列中,后续就是和ScheduleMessageService的逻辑相同。

整个消息重试的逻辑示意图如下所示:

 

如图所示

  1. Consumer在消费的时候,都会订阅指定的TOPIC-NORMAL_TOPIC和该ConsumerGroup对应的重试TOPIC-RETRY_GROUP1_TOPIC,同时消费来自这两个topic中的消息。
  2. 当发生消费失败后,Client端会调用sendMessageBack方法将失败消息发送回broker。
  3. broker端的SendMessageProcessor会根据当前的重试次数确定延时级别,将消息存入延时队列-SCHEDULE_TOPIC中。
  4. ScheduleMessageService会将到期的消息重新发送到重试TOPIC-RETRY_GROUP1_TOPIC中,这个时候消息被Consumer消费,就完成了整个重试过程。

可以对比之前的延时消息流程去看,其实重试消息就是将失败的消息当作延时消息进行处理,只不过最后投入的是专门的重试消息队列中。

四、总结

延时消息都是非常日常业务使用中很重要的功能,而RocketMQ通过时间片分级+多队列+定时任务,就实现了这样的功能,设计上是很巧妙的。并且消费重试采用退避式的策略,重试时间的梯度刚好与延时消息策略一致,这样就可以直接利用延时队列去完成消息重试的功能,从策略上来说非常合理(消息消费重复失败,在短时间内重试成功的可能性比较低),并且复用了底层代码,这些是值得去学习和借鉴的。



Tags:RocketMQ   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
大白话设计RocketMQ延迟消息
延迟消息一般用于:提前发送消息,延迟一段时间后才需要被处理的场景。比如:下单半小时后还未支付,则取消订单 释放库存 等。RocketMQ的延迟消息使用上非常便捷,但是不支持任意时间...【详细内容】
2023-12-27  Search: RocketMQ  点击:(110)  评论:(0)  加入收藏
九个问答牢记RocketMQ架构
RocketMQ是Java兄弟们常用的消息中间件,虽说常用,但对于RocketMQ架构经常忘记。究其原因就2点:忙于业务开发然后长时间不看则忘了、不理解架构设计的根本原因记不牢。本文用大...【详细内容】
2023-12-27  Search: RocketMQ  点击:(114)  评论:(0)  加入收藏
如何应对 RocketMQ 消息堆积
这篇文章,我们聊聊如何应对 RocketMQ 消息堆积。图片1 基础概念消费者在消费的过程中,消费的速度跟不上服务端的发送速度,未处理的消息会越来越多,消息出现堆积进而会造成消息消...【详细内容】
2023-12-21  Search: RocketMQ  点击:(73)  评论:(0)  加入收藏
解锁RocketMQ秘籍:如何保障消息顺序性?
嗨,小伙伴们!小米在这里啦!今天我们要聊的话题是社招面试中一个经典而又百思不得其解的问题&mdash;&mdash;“RocketMQ如何保证顺序性?”不用担心,小米来给你揭秘RocketMQ的秘密武...【详细内容】
2023-12-15  Search: RocketMQ  点击:(101)  评论:(0)  加入收藏
Apache RocketMQ 5.0腾讯云落地实践
Apache RocketMQ 发展历程回顾RocketMQ 最早诞生于淘宝的在线电商交易场景,经过了历年双十一大促流量洪峰的打磨,2016年捐献给 Apache 社区,成为 Apache 社区的顶级项目,并在国...【详细内容】
2023-12-13  Search: RocketMQ  点击:(148)  评论:(0)  加入收藏
聊聊 RocketMQ 5.0 的 POP 消费模式!
大家都知道,RocketMQ 消费模式有 PULL 模式和 PUSH 模式,不过本质上都是 PULL 模式,而在实际使用时,一般使用 PUSH 模式。不过,RocketMQ 的 PUSH 模式有明显的不足,主要体现在以下...【详细内容】
2023-05-16  Search: RocketMQ  点击:(305)  评论:(0)  加入收藏
深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因
在众多关于MQ的面试八股文中有这么一道题,“如何保证MQ消息消费的幂等性”。为什么需要保证幂等性呢?是因为消息会重复消费。为什么消息会重复消费?明明已经消费了,为什么消息会...【详细内容】
2023-04-13  Search: RocketMQ  点击:(240)  评论:(0)  加入收藏
SpringBoot整合RocketMQ,老鸟们都是这么玩的!
今天我们来讨论如何在项目开发中优雅地使用RocketMQ。本文分为三部分,第一部分实现SpringBoot与RocketMQ的整合,第二部分解决在使用RocketMQ过程中可能遇到的一些问题并解决...【详细内容】
2023-04-12  Search: RocketMQ  点击:(431)  评论:(0)  加入收藏
SpringBoot 与RabbitMQ、RocketMQ高可靠、高性能、分布式应用实践
Spring Boot 是一个基于 Spring 框架的快速开发框架,而 RabbitMQ 和 RocketMQ 则是常用的消息队列中间件。下面是它们常用的一些用法和场景。 订单处理在电商等系统中,下单后...【详细内容】
2023-03-09  Search: RocketMQ  点击:(206)  评论:(0)  加入收藏
通过源码分析RocketMQ主从复制原理
作者:京东物流 宫丙来一、主从复制概述 RocketMQ Broker的主从复制主要包括两部分内容:CommitLog的消息复制和Broker元数据的复制。 CommitLog的消息复制是发生在消息写入时,当...【详细内容】
2023-03-02  Search: RocketMQ  点击:(70)  评论:(0)  加入收藏
▌简易百科推荐
对于微服务架构监控应该遵守的原则
随着软件交付方式的变革,微服务架构的兴起使得软件开发变得更加快速和灵活。在这种情况下,监控系统成为了微服务控制系统的核心组成部分。随着软件的复杂性不断增加,了解系统的...【详细内容】
2024-04-03  步步运维步步坑    Tags:架构   点击:(7)  评论:(0)  加入收藏
大模型应用的 10 种架构模式
作者 | 曹洪伟在塑造新领域的过程中,我们往往依赖于一些经过实践验证的策略、方法和模式。这种观念对于软件工程领域的专业人士来说,已经司空见惯,设计模式已成为程序员们的重...【详细内容】
2024-03-27    InfoQ  Tags:架构模式   点击:(20)  评论:(0)  加入收藏
哈啰云原生架构落地实践
一、弹性伸缩技术实践1.全网容器化后一线研发的使用问题全网容器化后一线研发会面临一系列使用问题,包括时机、容量、效率和成本问题,弹性伸缩是云原生容器化后的必然技术选择...【详细内容】
2024-03-27  哈啰技术  微信公众号  Tags:架构   点击:(13)  评论:(0)  加入收藏
DDD 与 CQRS 才是黄金组合
在日常工作中,你是否也遇到过下面几种情况: 使用一个已有接口进行业务开发,上线后出现严重的性能问题,被老板当众质疑:“你为什么不使用缓存接口,这个接口全部走数据库,这怎么能扛...【详细内容】
2024-03-27  dbaplus社群    Tags:DDD   点击:(16)  评论:(0)  加入收藏
高并发架构设计(三大利器:缓存、限流和降级)
软件系统有三个追求:高性能、高并发、高可用,俗称三高。本篇讨论高并发,从高并发是什么到高并发应对的策略、缓存、限流、降级等。引言1.高并发背景互联网行业迅速发展,用户量剧...【详细内容】
2024-03-13    阿里云开发者  Tags:高并发   点击:(12)  评论:(0)  加入收藏
如何判断架构设计的优劣?
架构设计的基本准则是非常重要的,它们指导着我们如何构建可靠、可维护、可测试的系统。下面是这些准则的转换表达方式:简单即美(KISS):KISS原则的核心思想是保持简单。在设计系统...【详细内容】
2024-02-20  二进制跳动  微信公众号  Tags:架构设计   点击:(41)  评论:(0)  加入收藏
详解基于SpringBoot的WebSocket应用开发
在现代Web应用中,实时交互和数据推送的需求日益增长。WebSocket协议作为一种全双工通信协议,允许服务端与客户端之间建立持久性的连接,实现实时、双向的数据传输,极大地提升了用...【详细内容】
2024-01-30  ijunfu  今日头条  Tags:SpringBoot   点击:(23)  评论:(0)  加入收藏
PHP+Go 开发仿简书,实战高并发高可用微服务架构
来百度APP畅享高清图片//下栽のke:chaoxingit.com/2105/PHP和Go语言结合,可以开发出高效且稳定的仿简书应用。在实现高并发和高可用微服务架构时,我们可以采用一些关键技术。首...【详细内容】
2024-01-14  547蓝色星球    Tags:架构   点击:(125)  评论:(0)  加入收藏
GraalVM与Spring Boot 3.0:加速应用性能的完美融合
在2023年,SpringBoot3.0的发布标志着Spring框架对GraalVM的全面支持,这一支持是对Spring技术栈的重要补充。GraalVM是一个高性能的多语言虚拟机,它提供了Ahead-of-Time(AOT)编...【详细内容】
2024-01-11    王建立  Tags:Spring Boot   点击:(135)  评论:(0)  加入收藏
Spring Boot虚拟线程的性能还不如Webflux?
早上看到一篇关于Spring Boot虚拟线程和Webflux性能对比的文章,觉得还不错。内容较长,抓重点给大家介绍一下这篇文章的核心内容,方便大家快速阅读。测试场景作者采用了一个尽可...【详细内容】
2024-01-10  互联网架构小马哥    Tags:Spring Boot   点击:(135)  评论:(0)  加入收藏
站内最新
站内热门
站内头条