您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因

时间:2023-04-13 13:56:38  来源:微信公众号  作者:三友

在众多关于MQ的面试八股文中有这么一道题,“如何保证MQ消息消费的幂等性”。

为什么需要保证幂等性呢?是因为消息会重复消费。

为什么消息会重复消费?

明明已经消费了,为什么消息会被再次被消费呢?

不同的MQ产生的原因可能不一样

本文就以RocketMQ为例,来扒一扒RocketMQ中会导致消息重复消息的原因,最终你会发现,其实消息重复消费算是RocketMQ无奈的“bug”。


 

消息发送异常时重复发送

首先,我们来瞅瞅RocketMQ发送消息和消费消息的基本原理。


 

如图,简单说一下上图中的概念:

 

  • Broker,就是RocketMQ的服务端,如上图就有两个服务实例
  • Topic就是一类消息集合的名字
  • Queue就是Topic的对应的队列,消息都存在Queue上,每个Topic都会有自己的几个Queue

 

所以,整个消息发送和消费过程大致如下:

 

  • 生产者在发送消息之前根据负载均衡策略(默认是轮询)选择一个Queue,然后跟这个Queue所在的机器建立连接,把消息发送到这个Queue上
  • 消费者只要消费这个Queue,那么就能消费到消息

 

在正常情况下,生产者的确是按照这个方式来发送消息的

但是当出现了异常时,这种异常包括消息发送超时、响应超时等等,RocketMQ为了保证消息成功发送,会进行消息发送的重试操作,默认情况下会最多会重试两次


 

重试操作比较简单,就是选择另一台机器的Queue来发送。

虽然重试操作可以很大程度保证消息能够发送成功,但是同时也会带来消息重复发送的问题。

举个例子,假设生产者向A机器发送消息,发生了异常,响应超时了,但是就一定代表消息没发成功么?

不一定,有可能会出现服务端的确接收到并处理了消息,但是由于网络波动等等,导致生产者接收不到服务端响应的情况,此时消息处理成功了,但是生成者还是以为发生了异常

此时如果发生重试操作,那么势必会导致消息被发送了两次甚至更多次,导致服务端存了多条相同的消息,那么就一定会导致消费者重复消费消息

消费消息抛出异常

在RocketMQ的并发消费消息的模式下,需要用户实现MessageListenerConcurrently接口来处理消息


 

当消费者获取到消息之后会调用MessageListenerConcurrently的实现,传入需要消费的消息集合msgs,这里提到的msgs很重要


 

如上代码,当消息消费出现异常的时候,status就会为null,后面就会将status设置成为RECONSUME_LATER。

RECONSUME_LATER翻译成功中文就是稍后重新消费的意思

所以从这可以看出,一旦抛出异常,那么消息之后就可以被重复消息。

到这其实可能有小伙伴觉得消息消费失败重新消费很正常,保证消息尽可能消费成功。

对,这句话不错,的确可以在一定程度上保证消费异常的消息可以消费成功。

但是坑不在这,而是前面提到的消费时传入的整个集合中的消息都需要被重新消费。

具体的原因我们接着往下看

当消息处理之后,不论是成功还是异常,都需要对结果进行处理,代码如下


 

当处理结果为RECONSUME_LATER的时候(异常会设置为RECONSUME_LATER),此时ackIndex会设置成-1,后面循环遍历的时候就会遍历到所有这次消费的消息,然后调用sendMessageBack方法,sendMessageBack方式是用来实现消息重新消费的逻辑,这里就不展开说了。

所以,一旦被消费的一批消息中出现一个消费异常的情况,那么就会导致整批消息被重新消费,从而会导致在出现异常之前的成功处理的消息都会被重复消费,非常坑。

不过好在消费时传入的消息集合中的消息数量是可以设置的,并且默认就是1


 

也就说默认情况下那个集合中就一条消息,所以默认情况下不会出现消费成功的消息被重复消费的情况。

所以这个参数不要轻易设置,一旦设置大了,就可能导致消息被重新消费。

除了并发消费消息的模式以外,RocketMQ还支持顺序消费消息的模式,也会造成重复消费,逻辑其实差不多,但是在实现消息重新消费的逻辑不一样。

消费者提交offset失败

首先来讲一讲什么是offset。

前面说过,消息在发送的时候需要指定发送到,消息最后会被放到Queue中,其实真正的消息不是在Queue中,Queue存的是每个消息的位置,但是你可以理解为Queue存的是消息。

而消息在Queue中是有序号的,这个序号就被称为offset,从0开始,单调递增1。


 

比如说,如上图,消息1的offset就是0,消息2的offset就是1,依次类推。

这个offset的一个作用就是用来管理消费者的消费进度。

当消费者在成功消费消息之后,需要将所消费的消息的offset提交给RocketMQ服务端,告诉RocketMQ,这个Queue的消息我已经消费到了这个位置了。

提交offset的代码就在上述第二节提到的处理结果的后面


 

这样有一个好处,那么一旦消费者重启了或者其它啥的要从这个Queue拉取消息的时候,此时他只需要问问RocketMQ服务端上次这个Queue消息消费到哪个位置了,之后消费者只需要从这个位置开始消费消息就行了,这样就解决了接着消费的问题。


 

但是RocketMQ在设计的时候,当消费完消息的时候并不是同步告诉RocketMQ服务端offset,而是定时发送。


 

如图,当消费者消费完消息的时候,会将offset保存到内存中的一个Map数据结构中,所以上面截图的那段代码其实是更新内存中的offset


 

而在消费者启动的时候会开启一个定时任务,默认是5s一次,会通过网络请求将内存中的每个Queue的消费进度offset发送给RocketMQ服务端。


 

由于是定时任务,所以就可能出现服务器一旦宕机,导致最新消费的offset没有成功告诉RocketMQ服务端的情况

此时,消费进度offset就丢了,那么消费者重启的时候只能从RocketMQ中获取到上一次提交的offset,从这里开始消费,而不是最新的offset,出现明明消费到了第8个消息,RocketMQ却告诉他只消费到了第5个消息的情况,此时必然会导致消息又出现重复消费的情况。

服务端持久化offset失败

上一节说到,消费者会有一个每隔5s钟的定时任务将每个队列的消费进度offset提交到RocketMQ服务端

当RocketMQ服务端接收到提交请求之后,会将这个消费进度offset保存到内存中


 

同时为了保证RocketMQ服务端重启消费进度不会丢失,也会开启一个定时任务,默认也是5s一次,将内存中的消费进度持久化到磁盘文件中


 

所以,整个消费进度offset的数据流转过程如下


 

当RocketMQ服务端重启之后,会从磁盘中读取文件的数据加载到内存中。

跟消费者产生的问题一样,一旦RocketMQ发生宕机,那么offset就有可能丢失5s钟的数据,RocketMQ服务端一旦重启,消费者从RocketMQ服务端获取到的消息消费进度就比实际消费的进度低,同样也会导致消息重复消费。

主从同步offset失败

在RocketMQ的高可用模式中,有一种名叫主从同步的模式,当主节点挂了之后,从节点可以手动升级为主节点对外提供访问,保证高可用。

在主从同步模式下,从节点默认每隔10s会向主节点发送请求,同步一些元数据,这些元数据就包括消费进度


 

当从节点获取到主节点的消费进度之后,会将主节点的消费进度设置到自己的内存中,同时也会持久化到磁盘。

所以整个消费进度offset的数据的流转过程就会变成如下


 

同样,由于也是定时任务,那么一旦主节点挂了,从节点就会丢10s钟的消费进度,此时如果从节点升级为主节点对外提供访问,就会出现跟上面提到的一样的情况,消费者从这个新的主节点中拿到的消费进度比实际的低,自然而然就会重复消费消息。

所以,总的来说,在消费进度数据流转的过程中,只要某个环节出现了问题,都有很有可能会导致消息重复消费。

重平衡

先来讲一讲什么是重平衡,其实重平衡很好理解,我说一下你就明白了。

前面说到,消费者是从队列中获取消息的


 

在RocketMQ中,有个消费者组的概念,一个消费者组中可以有多个消费者,不同消费者组之间消费消息是互不干扰的,所以前面提到的消费者其实都在消费组下


 

在同一个消费者组中,消息消费有两种模式:

 

  • 集群消费模式
  • 广播消费模式

 

由于RocketMQ默认是集群消费模式,并且绝大多数业务场景都是使用集群消费模式,所以这里就不讨论广播消费模式了,感兴趣的同学可以看看RocketMQ消息短暂而又精彩的一生 这篇文章。

集群消费模式是指同一条消息只能被这个消费者组消费一次,这就叫集群消费。

并且前面提到提交消费进度给RocketMQ服务端的情况只会集群消费模式下才会有,在广播消费模式不会提给到RocketMQ服务端,仅仅持久化到本地磁盘

同时前面说的消费者提交消费进度真正提交的是消费者组对于这个Queue的消费进度,而不是指具体的某个消费者对于Queue消费进度。

虽然说这里将前面提到的一些含义更深一步,但是并不妨碍前面的理解。

集群消费的实现就是将队列按照一定的算法分配给消费者,默认是按照平均分配的。


 

如图所示,假设某个topic有4个Queue,有个消费者组订阅了这个topic,这个消费者组有两个消费者1和消费者2,此时每个消费者就可以被分配两个队列,这样就能保证消息正常情况下只会被消费一次。如果只有一个消费者,那么这个消费者就会消费所有队列,很好理解。

接着后面又启动了一个消费者3,此时为了保证刚上线的消费者3能够消费消息,就要进行重平衡操作,重新分配每个消费者消费的队列。

在重平衡之后就可能会出现下面这种情况


 

如上图,原本被消费者2消费的Queue4被分配给消费者3,此时消费者3就能消费到消息了,这就是重平衡

除了新增消费者会导致重平衡之外,消费者数量减少,队列的数量增加或者减少都会触发重平衡。

在了解了重平衡概念之后,接下来分析一下为什么重平衡会导致消息的重复消费。

假设在进行重平衡时,还未重平衡完之前,消费者2此时还是会按照上面第二节提到的消费消息的逻辑来消费Queue4的消息

当消费者2已经重平衡完成了,发现Queue4自己已经不能消费了,那么此时就会把这个Queue4设置为dropped,就是丢弃的意思


 

但是由于重平衡进行时消费者2仍然在消费Queue4的消息,但是当消费完之后,发现队列被设置成dropped,那么此时被消费者2消费消息的offset就不会被提交,原因如下代码


 

这段代码前面已经出现过,一旦dropped被设置成true,这个if条件就通不过,消费进度就不会被提交。

成功消费消息了,但是却不提交消费进度,这就非常坑了。。

于是当消费者3开始消费Queue4的消息的时候,他就会问问RocketMQ服务端,我消费者3所在的消费者组对于Queue4这个队列消费到哪了,我接着消费就行了。

此时由于没有提交消费进度,RocketMQ服务端告诉消费者3的消费进度就会比实际的低,这就造成了消息重复消费的情况。

清理长时间消费的消息

在RocketMQ中有这么一个机制,会定时清理长时间正在消费的消息。

如图,假设有5条消息现在正在被消费者处理,这5条消息会被存在一个集合中,并且是按照offset的大小排序,消息1的offset最小,消息5的offset最大。

RocketMQ消费者启动时会开启一个默认15分钟执行一次的定时任务


 

这个定时任务会去检查正在处理的消息的第一条消息,也就是图中的消息1,一旦发现消息1已经处理了超过15分钟了,那么此时就会将消息1从集合中移除,之后会隔一定时间再次消费消息1。

这也会有坑,虽然消息1从集合中被移除了,但是消息1并没有消失,仍然被消费者继续处理,但是消息1隔一定时间就会再次被消费,就会出现消息1被重复消费的情况。

这就是清理长时间消费的消息导致重复消费的原因。

但此时又会引出一个新的疑问,为什么要移除这个处理超过15分钟的消息呢?

这就又跟前面提到的消费进度提交有关!

前面说过消息被消费完成之后会提交消费进度,提交的消费进度实际会有两种情况:

第一种就是某个线程消费了所有的消息,当把所有的消息都消费完成之后,就会把消息从集合中全部移除,此时提交的消费进度offset就是图中消息5的offset+1

加1的操作是为了保证如果发生重启,那么消费者下次消费的起始位置就是消息5后面的消息,保证消息5不被重复消费

第二种情况就不太一样了

假设现在有两个线程来处理这5条消息,线程1处理前2条,线程2处理后3条,如图


 

现在线程1出现了长时间处理消息的情况。

此时线程2处理完消息之后,移除后面三条消息,准备提交offset的时候发现集合中还有元素,就是线程1正在处理的前两条消息,此时线程2提交的offset并不是消息5对应的offset,而是消息1的offset,代码如下


 

这么做的主要原因就是保证消息1和消息2至少被消费一次。

因为一旦提交了消息5对应的offset,如果消费者重启了,下次消费就会接着从消息5的后面开始消费,而对于消息1和消息2来说,并不知道有没有被消费成功,就有可能出现消息丢失的情况。

所以,一旦集合中最前面的消息长时间处理,那么就会导致后面被消费的消息进度无法提交,那么重启之后就会导致大量消息被重复消费。

为了解决这个问题,RocketMQ引入了定时清理的机制,定时清理长时间消费的消息,这样消费进度就可以提交了。

最后

总得来说,RocketMQ中还是存在很多种导致消息重读消费的情况,并且官方也说了,只是在大多数情况下消息不会重复


 

所以如果你的业务场景中需要保证消息不能重复消费,那么就需要根据业务场景合理的设计幂等技术方案。

 

原文:https://mp.weixin.qq.com/s/XtIZbObkDcDzcwttSDslZg 作者:三友

 

如果感觉本文对你有帮助,点赞关注支持一下



Tags:RocketMQ   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
大白话设计RocketMQ延迟消息
延迟消息一般用于:提前发送消息,延迟一段时间后才需要被处理的场景。比如:下单半小时后还未支付,则取消订单 释放库存 等。RocketMQ的延迟消息使用上非常便捷,但是不支持任意时间...【详细内容】
2023-12-27  Search: RocketMQ  点击:(102)  评论:(0)  加入收藏
九个问答牢记RocketMQ架构
RocketMQ是Java兄弟们常用的消息中间件,虽说常用,但对于RocketMQ架构经常忘记。究其原因就2点:忙于业务开发然后长时间不看则忘了、不理解架构设计的根本原因记不牢。本文用大...【详细内容】
2023-12-27  Search: RocketMQ  点击:(111)  评论:(0)  加入收藏
如何应对 RocketMQ 消息堆积
这篇文章,我们聊聊如何应对 RocketMQ 消息堆积。图片1 基础概念消费者在消费的过程中,消费的速度跟不上服务端的发送速度,未处理的消息会越来越多,消息出现堆积进而会造成消息消...【详细内容】
2023-12-21  Search: RocketMQ  点击:(71)  评论:(0)  加入收藏
解锁RocketMQ秘籍:如何保障消息顺序性?
嗨,小伙伴们!小米在这里啦!今天我们要聊的话题是社招面试中一个经典而又百思不得其解的问题——“RocketMQ如何保证顺序性?”不用担心,小米来给你揭秘RocketMQ的秘密武...【详细内容】
2023-12-15  Search: RocketMQ  点击:(98)  评论:(0)  加入收藏
Apache RocketMQ 5.0腾讯云落地实践
Apache RocketMQ 发展历程回顾RocketMQ 最早诞生于淘宝的在线电商交易场景,经过了历年双十一大促流量洪峰的打磨,2016年捐献给 Apache 社区,成为 Apache 社区的顶级项目,并在国...【详细内容】
2023-12-13  Search: RocketMQ  点击:(132)  评论:(0)  加入收藏
聊聊 RocketMQ 5.0 的 POP 消费模式!
大家都知道,RocketMQ 消费模式有 PULL 模式和 PUSH 模式,不过本质上都是 PULL 模式,而在实际使用时,一般使用 PUSH 模式。不过,RocketMQ 的 PUSH 模式有明显的不足,主要体现在以下...【详细内容】
2023-05-16  Search: RocketMQ  点击:(303)  评论:(0)  加入收藏
深扒RocketMQ源码之后,我找出了RocketMQ消息重复消费的7种原因
在众多关于MQ的面试八股文中有这么一道题,“如何保证MQ消息消费的幂等性”。为什么需要保证幂等性呢?是因为消息会重复消费。为什么消息会重复消费?明明已经消费了,为什么消息会...【详细内容】
2023-04-13  Search: RocketMQ  点击:(238)  评论:(0)  加入收藏
SpringBoot整合RocketMQ,老鸟们都是这么玩的!
今天我们来讨论如何在项目开发中优雅地使用RocketMQ。本文分为三部分,第一部分实现SpringBoot与RocketMQ的整合,第二部分解决在使用RocketMQ过程中可能遇到的一些问题并解决...【详细内容】
2023-04-12  Search: RocketMQ  点击:(429)  评论:(0)  加入收藏
SpringBoot 与RabbitMQ、RocketMQ高可靠、高性能、分布式应用实践
Spring Boot 是一个基于 Spring 框架的快速开发框架,而 RabbitMQ 和 RocketMQ 则是常用的消息队列中间件。下面是它们常用的一些用法和场景。 订单处理在电商等系统中,下单后...【详细内容】
2023-03-09  Search: RocketMQ  点击:(204)  评论:(0)  加入收藏
通过源码分析RocketMQ主从复制原理
作者:京东物流 宫丙来一、主从复制概述 RocketMQ Broker的主从复制主要包括两部分内容:CommitLog的消息复制和Broker元数据的复制。 CommitLog的消息复制是发生在消息写入时,当...【详细内容】
2023-03-02  Search: RocketMQ  点击:(63)  评论:(0)  加入收藏
▌简易百科推荐
Netflix 是如何管理 2.38 亿会员的
作者 | Surabhi Diwan译者 | 明知山策划 | TinaNetflix 高级软件工程师 Surabhi Diwan 在 2023 年旧金山 QCon 大会上发表了题为管理 Netflix 的 2.38 亿会员 的演讲。她在...【详细内容】
2024-04-08    InfoQ  Tags:Netflix   点击:(0)  评论:(0)  加入收藏
即将过时的 5 种软件开发技能!
作者 | Eran Yahav编译 | 言征出品 | 51CTO技术栈(微信号:blog51cto) 时至今日,AI编码工具已经进化到足够强大了吗?这未必好回答,但从2023 年 Stack Overflow 上的调查数据来看,44%...【详细内容】
2024-04-03    51CTO  Tags:软件开发   点击:(6)  评论:(0)  加入收藏
跳转链接代码怎么写?
在网页开发中,跳转链接是一项常见的功能。然而,对于非技术人员来说,编写跳转链接代码可能会显得有些困难。不用担心!我们可以借助外链平台来简化操作,即使没有编程经验,也能轻松实...【详细内容】
2024-03-27  蓝色天纪    Tags:跳转链接   点击:(13)  评论:(0)  加入收藏
中台亡了,问题到底出在哪里?
曾几何时,中台一度被当做“变革灵药”,嫁接在“前台作战单元”和“后台资源部门”之间,实现企业各业务线的“打通”和全域业务能力集成,提高开发和服务效率。但在中台如火如荼之...【详细内容】
2024-03-27  dbaplus社群    Tags:中台   点击:(9)  评论:(0)  加入收藏
员工写了个比删库更可怕的Bug!
想必大家都听说过删库跑路吧,我之前一直把它当一个段子来看。可万万没想到,就在昨天,我们公司的某位员工,竟然写了一个比删库更可怕的 Bug!给大家分享一下(不是公开处刑),希望朋友们...【详细内容】
2024-03-26  dbaplus社群    Tags:Bug   点击:(5)  评论:(0)  加入收藏
我们一起聊聊什么是正向代理和反向代理
从字面意思上看,代理就是代替处理的意思,一个对象有能力代替另一个对象处理某一件事。代理,这个词在我们的日常生活中也不陌生,比如在购物、旅游等场景中,我们经常会委托别人代替...【详细内容】
2024-03-26  萤火架构  微信公众号  Tags:正向代理   点击:(11)  评论:(0)  加入收藏
看一遍就理解:IO模型详解
前言大家好,我是程序员田螺。今天我们一起来学习IO模型。在本文开始前呢,先问问大家几个问题哈~什么是IO呢?什么是阻塞非阻塞IO?什么是同步异步IO?什么是IO多路复用?select/epoll...【详细内容】
2024-03-26  捡田螺的小男孩  微信公众号  Tags:IO模型   点击:(9)  评论:(0)  加入收藏
为什么都说 HashMap 是线程不安全的?
做Java开发的人,应该都用过 HashMap 这种集合。今天就和大家来聊聊,为什么 HashMap 是线程不安全的。1.HashMap 数据结构简单来说,HashMap 基于哈希表实现。它使用键的哈希码来...【详细内容】
2024-03-22  Java技术指北  微信公众号  Tags:HashMap   点击:(11)  评论:(0)  加入收藏
如何从头开始编写LoRA代码,这有一份教程
选自 lightning.ai作者:Sebastian Raschka机器之心编译编辑:陈萍作者表示:在各种有效的 LLM 微调方法中,LoRA 仍然是他的首选。LoRA(Low-Rank Adaptation)作为一种用于微调 LLM(大...【详细内容】
2024-03-21  机器之心Pro    Tags:LoRA   点击:(12)  评论:(0)  加入收藏
这样搭建日志中心,传统的ELK就扔了吧!
最近客户有个新需求,就是想查看网站的访问情况。由于网站没有做google的统计和百度的统计,所以访问情况,只能通过日志查看,通过脚本的形式给客户导出也不太实际,给客户写个简单的...【详细内容】
2024-03-20  dbaplus社群    Tags:日志   点击:(4)  评论:(0)  加入收藏
站内最新
站内热门
站内头条