今天闲聊下消息中间件的一些关键特性,对于消息中间件基础知识,包括各种开源消息中间件的比较选型文章,网上已经有很多,在这里就不再重复进行描述。
因此这篇文章仅仅选择一些消息中间件的一些关键特性和实践中常遇到的问题进行总结。
点对点队列模式消息模式
发布订阅模型Topic消息模型
消息中间件有一个关键能力,即1对多的消息发布订阅模式。因此对于消息中间件也常说两个能力,一个是Queue消息队列能力,一个是Topiic消息主题能力。对于消息Topic即消息主题模式,支持消息的发布订阅。
对于Queue模式消息被消费掉即从队列中清除,不管是哪个消息端消费掉。对于Topic模式,单个订阅端获取到消息,消息并不会清除,而是需要所有订阅端全部消费掉消息。
那么我们来举一个最简单的场景,即ERP系统需要将消息分发给CRM和SRM两个系统。因此启用了消息发布模式,建立了一个消息主题,比如会计科目分发消息。同时SRM和CRM系统是消息的订阅方,因此在订阅端写了相应的监听程序来订阅消息。如下:
但是CRM和SRM都是集群部署,每个应用都有三个集群节点,每个节点在启动的时候都会对消息主题进行订阅和监控。
那么当ERP分发会计科目的时候,如果常规模式就变成了该消息会被6个订阅端全部获取到。这个时候对于CRM系统中集群三个点,获取到三次消息显然是重复的。因此我们需要对于订阅端,在一个集群分组内部应该只要有一个节点获取到消息,消息就应该清除掉。但是在不同的集群分组间,仍然应该是Topic模式。
因此在消息中间件实现中,需要有一个针对集群节点的进一步分组能力,比如上面的CRM和SRM应该分为两个组,也就是常说的ClientID。有了ClientID分组,就可以实现组间的Topic能力,而组内启用Queue模式。
不同的消息中间件在这块的实现思路基本一致,即都需要有一个ClientID分组的概念。
比如在ActiveMQ中实现了虚拟Topic的功能。使用起来非常简单。对于消息发布者来说,就是一个正常的Topic,名称以VirtualTopic.开头。例如VirtualTopic.TEST。
对于消息接收端来说,是个队列,不同应用里使用不同的前缀作为队列的名称,即可表明自己的身份即可实现消费端应用分组。例如Consumer.A.VirtualTopic.TEST,说明它是名称为A的消费端,同理Consumer.B.VirtualTopic.TEST说明是一个名称为B的客户端。
可以在同一个应用里使用多个consumer消费此queue,则可以实现上面两个功能。又因为不同应用使用的queue名称不同(前缀不同),所以不同的应用中都可以接收到全部的消息。每个客户端相当于一个持久订阅者,而且这个客户端可以使用多个消费者共同来承担消费任务。
进一步来谈消息发布订阅,还是基于前面的场景,ERP分发会计科目消息,当前有SRM和CRM两个订阅端。但是仍然会出现一些特殊的场景:
比如CRM接收到消息,但是自己处理出现问题导致消息丢失,需要让ERP系统重发消息。其次就是某些会计科目可能只有SRM系统需要使用,需要定向只发送给SRM系统。那么在这种情况下一个Topic主题往往并不能支持该需求。当然对于不同的Route规则可以建立不同的Topic,但是如果路由规则复杂,这样会建立出大量的Topic本身也不现实。
而对于这种常见,类似主流的基于JMS的消息中间件往往并没有topic+路由规则的消息投递能力,对于这种情况只能够单独新实现接口来进行处理。或者是消息仍然按传统规则进行分发,对于订阅端拿到消息如果确实自己不需要或已经存在,就自己丢弃掉。
而对于RabbitMQ,可以看到专门有一个Topic Exchange模式。Topic Exchange是根据routing key和Exchange的类型将message发送到一个或者多个Queue中,可以通过它来实现pub/sub模式,即发布订阅。类似下图:
在整个技术架构中引入了消息中间件后,虽然达到了异步解耦的作用,但实质是增加了整个集成架构的复杂度,同时也影响了整体架构的可靠性。
对于消息中间件如何增加可靠性,一个方面是高可用集群架构,一个则是消息本身要支持持久化存储。通过消息持久化存储来确保消息不丢失。
对于消息持久化,一般来说分为文件和数据库两大类。对于文件本身又分为了本地磁盘文件和共享文件系统两个方式。
本地磁盘方式可以看到,如果当前Broker节点宕机,虽然消息可以在本地暂存,但是在当前节点没有手工恢复前,消费端无法及时获取消息。对于这个问题的实际解决思路是启用类似Master-Slave的部署架构方式,即消息会快速的实时同步到Slave节点。这样即使Master节点无法访问,订阅端也可以快速的从Slave节点获取消息。
对于共享文件系统或数据库方式,实际消息只持久化到一个集中的地方,因此不存在消息多点复制的操作,某个节点宕机,共享存储的消息也可以快速的恢复到其它冗余节点。
分布式事务
在前面谈到过基于消息中间件来实现消息最终一致性,具体如下:
场景:在采购系统中拟制采购订单,在提交单据申请的时候既需要将单据成功保存到本地,同时又需要启动远程流程平台提供的流程启动服务。在该场景中,第二个步骤属于必须要最终完成的操作,同时业务上也允许最终一致(不要因为流程平台本身问题导致单据提交不成功,启流程失败如何重试是系统内部的事情)
对于该场景,基于消息实现最终一致性逻辑如下:
可以看到,报账单提交和流程启动仍然需要控制到一个事务里面,这个本身也是一个分布式事务控制场景。特别是在消息中间件需要在获取到消息进行持久化的时候。
也就是说从报账单提交,到消息发送给消息中间件,两个活动需要控制在一个事务里面。而对于消息中间件本身又有两种做法。
一种是只要获取到消息,消息还在内存里面就认为成功并返回给发送端,即异步刷盘。还有一种就是拿到消息后必须持久化写入成功后才返回,即同步刷盘。
因此要确保无任何消息丢失,必须是同步刷盘模式,确保消息持久化成功才能够返回。
持久化还有什么作用?
当订阅端取消息的进度缓慢,或者说订阅端迟迟不取走消息,那么消息就一直在消息中间件中堆积,消息中间件内存容量优先,必须将更多的历史消息先进行持久化处理进行内存释放。否则消息中间件本身也很难应对。
那么消息大量积压如何办?常用的方式就是设置消息的过期机制,比如设置为5天,那么超过5天订阅端还没有取走的消息就自动废弃。但是这种方式本身也影响到消息发送的完整性。
当然对于挤压消息也可以手工清理。
比如当前已经产生1亿条消息,分别由ERP分发给SRM和CRM系统,但是CRM系统对这些消息没有消费而处于积压状态。我们需要手工对CRM订阅的这部分消息进行清理。这个时候可以看到和基于数据库进行大表的Delete条件删除差不多。
即性能极其缓慢。因为清理消息过程实际就是条件删除过程,同时这些数据还可能存在于多个磁盘文件中,清除后还需要对磁盘文件进行重整。对于消息的持久化存储前面谈到,写消息到磁盘文件性能极高,比数据库快很多,但是一遇到这种条件删除或消息清理场景,那么性能就会极差。
JMS(JAVA Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务。
传统企业内部使用的消息集成中间件,比如IBM MQ,Weblogic JMS, 开源的ActiveMQ等基本都基于JMS标准规范和协议。支持点对点队列模式,也支持消息发布订阅。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
比如RabbitMQ即是通过AMQP协议实现。
在这里并不能说基于AMQP实现的消息中间件性能就一定好于JMS,对于阿里基于JMS实现的RocketMQ网上能够查到的性能测试数据实际是好于RabbitMQ的。
具体的测试数据可以参考网上的一个对比图:
对于集群可以分为两类,一类是应用中间件集群,一类是涉及到持久化中间件集群。
应用中间件集群
对于应用中间件集群,比如常见的weblogic,Tomcat应用中间件和Web容器。这类要实现高可用性本身相对容易,因为本身不涉及到数据的持久化存储方面的问题。
应用集群高可用,唯一就是服务器节点之间的Session状态信息同步,当前也有很多的方案,类似Session同步复制,采用数据库,redis等共享库来保持Sesion会话信息等。再简单点,你完全可以在上层接入到负载均衡设备,通过负载均衡设备配置Session会话保持来保持会话信息。在这种情况下仅是需要容忍集群节点出现故障,当前在该节点的会话不可用。
其次就是集群节点的心跳检测,一种是最简单的方法就是心跳ping节点,如果出现异常则认为节点出现故障。而这种情况下无法判断集群节点是否假死或Hang住了。因此也有进一步的做法,即采用OpenRestry来实现集群负载和动态的心跳监控。
持久化中间件集群
对于消息中间件,Redis缓存数据库等都可以看到,典型特点就是涉及到数据持久化的问题。而这些中间件本身的高可用集群思路基本完全一样。
集群节点配置问题
集群节点配置,可以看到有1主1从,多主,多(主+从)等多种模式来实现集群扩展。要看到主从的目的是实现Master宕机的时候消息还能够从Slave上获取,不影响消息实时性,Slave节点实际是实时在复制主节点数据,并不提供性能分担。
而多主的目的则是真正分散消息并发泄到集群时候的性能问题。即使是一个最简单的点对点消息模型,Producer也可以将消息分发到两个Master节点进行性能分担,以减轻单Master节点下的性能压力。
所以如果仅仅是高可靠,Master+Slave基本就能够满足。但是要满足高可靠+高性能,则必须搭建多个Master+多个Slave组成的集群。
管理或心跳监控节点
管理节点有很多方法,比如RocketMQ里面自己实现的Name Server集群,Redis集群实现的Sentinel哨兵集群,或者通用的Zookeeper集群来实现分布式协调等。
但是不论哪种方式都能看到管理节点往往是3台服务器配置模式。
首先Admin节点采用单点肯定不行,即存在单点故障。
因此你至少需要配置两台Server来作为Admin管理节点,但是配置2台服务器的时候又发现一个新问题,即如果2台服务器检测到的节点状态不一致那么听谁的?
正是这样原因引入了第3台服务器,即能够超过半数的投票机制来实现文档的集群管理节点。可以看到类似Kurbernetes集群同样也是需要3台服务器来实现多主的高可用配置。
持久化存储
前面已经谈到了持久化存储。对于消息中间件在集群下,实际每个集群节点都可能接收到大量的消息,消息是存储在本地磁盘还是共享文件存储是需要考虑的问题。
如果是本地磁盘,那么一般需考虑Master-Slave模式,实现消息的实时同步复制,以确保在Master宕机的情况下不影响到消息的实时获取。如果是共享存储模式,实际可以看到不一定必须配置Slave节点,即某个Master宕机后,另外一个节点可以从共享存储中重新装载消息,虽然稍微有一定延迟,但是基本不会影响到消息的实时性。
类似Weblogic JMS集群基本采用这种方式,即共享文件进行持久化存储,当节点出现故障的时候,请求信息会漂移到其它非故障节点完成高可用性。