在金融系统中MQ消息的消息丢失是不允许的,消息的丢失会导致支付状态订单状态出现混乱。接下来聊一下如何保证MQ消息不丢失,以笔者公司使用的RocketMQ为例。
MQ的消息生成到消费主要经历三个阶段:MQ消息生产、RocketMQ Broker存储消息、消费者消息对应的消息。如下图:
从上图可以知道消息丢失主要会发生在下面几个地方:
解决消息丢失从消息丢失的地方入手。
RocketMQ消息生产方式有三种:同步发送消息、异步发送消息、One-Way发送消息。不同的发送方式使用不同的场景:
同步发送消息: 重要的通知(订单状态的更新)、短信系统。
异步发送消息: 通常用于响应时间敏感的业务场景。
One-way: 主要用于对可靠性要求不高的场景,在金融的场景下不适用。一般是用于日志收集。
根据上面三种发送方式的特点, one-way 消息发送模式本身就是对消息的丢失无法保证。所以如果你的系统对消息丢失零容忍不能使用 one-way 的方式发送。同步发送消息和异步发送消息 都可以判断消息的发送状态判断消息是否已经发送到Broker。这里是选择同步发送还是异步发送消息看业务的需要,同步发送比较关心发送后返回的结果对时间的要求不是那么敏感。异步发送对消息返回时间敏感。
SendResult定义说明(来自RocketMQ官方)
首先了解一下Broker集群部署模式(官方方案)。
单Master模式
这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上环境使用,可以用于本地测试。
多Master模式
一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如下
多Master多Slave模式-异步复制
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下
多Master多Slave模式-同步双写
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
如果是想不存在消息丢失的情况,那么在多Master的情况下要配置消息同步刷盘,而在 多Master多Slave模式-同步双写 的情况下配置同步刷盘。
消息消费示例代码如下:
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// Specify name server addresses.
consumer.setNamesrvAddr("localhost:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicTest", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
//处理消息
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
在处理消息的时候,如果消息处理失败返回的状态不应该是
ConsumeConcurrentlyStatus.CONSUME_SUCCESS 。如果消息处理失败返回的是
ConsumeConcurrentlyStatus.CONSUME_SUCCESS 消息就不能再次被消息。在Broker看来就是已经消费完成。
MQ消息的丢失主要发生在发送、存储、消费消息的三个阶段,所以需要防止消息丢失也要从这三个方面着手。
我是蚂蚁背大象,文章对你有帮助点赞关注我,文章有不正确的地方请您斧正留言评论~谢谢!