前置掌握:SpringBoot基础使用、RocketMQ和SpringBoot的整合使用
源码地址:gitee.com/tianxincode… 文章只会说明核心代码,其他的基础整合配置和多环境自动隔离参考源码即可
为了不产生歧义,文章中提到的二次封装均是基于原始使用方式的封装,而非源码级别的二次封装
换句话说:如果都需要对源码进行封装了,那么说明公司业务规模都到一定程度了,二次封装这种东西已经不需要讨论了,封装已经是一个共识
让我们以一个生活中的蛋炒饭开个头
原始框架好比提供了原材料:厨具、鸡蛋,米饭等食材、菜谱
问题:哪种方案更好? 答案:两种各有各的优势(在说废话,哈哈~)
2.1.1 封装主要讨论点
2.1.2 发送/消费的几种消息实体
2.2.1 封装基础实体类
package com.codecoord.rocketmq.domAIn;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.UUID;
/**
* 基础消息实体,包含基础的消息
* 根据自己的业务消息设置更多的字段
*
* @author tianxincoord@163.com
* @since 2022/6/16
*/
@Data
public abstract class BaseMqMessage {
/**
* 业务键,用于RocketMQ控制台查看消费情况
*/
protected String key;
/**
* 发送消息来源,用于排查问题
*/
protected String source = "";
/**
* 发送时间
*/
protected LocalDateTime sendTime = LocalDateTime.now();
/**
* 跟踪id,用于slf4j等日志记录跟踪id,方便查询业务链
*/
protected String traceId = UUID.randomUUID().toString();
/**
* 重试次数,用于判断重试次数,超过重试次数发送异常警告
*/
protected Integer retryTimes = 0;
}
2.2.2 RocketMQTemplate
package com.codecoord.rocketmq.template;
import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.util.JsonUtil;
import org.Apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* RocketMQ模板类
*
* @author tianxincoord@163.com
* @since 2022/4/15
*/
@Component
public class RocketMqTemplate {
private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqTemplate.class);
@Resource(name = "rocketMQTemplate")
private RocketMQTemplate template;
/**
* 获取模板,如果封装的方法不够提供原生的使用方式
*/
public RocketMQTemplate getTemplate() {
return template;
}
/**
* 构建目的地
*/
public String buildDestination(String topic, String tag) {
return topic + RocketMqSysConstant.DELIMITER + tag;
}
/**
* 发送同步消息
*/
public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message) {
// 注意分隔符
return send(topic + RocketMqSysConstant.DELIMITER + tag, message);
}
public <T extends BaseMqMessage> SendResult send(String destination, T message) {
// 设置业务键,此处根据公共的参数进行处理
// 更多的其它基础业务处理...
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult = template.syncSend(destination, sendMessage);
// 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
LOGGER.info("[{}]同步消息[{}]发送结果[{}]", destination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));
return sendResult;
}
/**
* 发送延迟消息
*/
public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
return send(topic + RocketMqSysConstant.DELIMITER + tag, message, delayLevel);
}
public <T extends BaseMqMessage> SendResult send(String destination, T message, int delayLevel) {
Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
LOGGER.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JsonUtil.toJson(message), JsonUtil.toJson(sendResult));
return sendResult;
}
}
package com.codecoord.rocketmq.template;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.validation.constraints.NotNull;
import java.time.LocalDate;
import java.time.LocalDateTime;
/**
* 订单类发送消息模板工具类
*
* @author tianxincode@163.com
* @since 2022/6/16
*/
@Component
public class OrderMessageTemplate extends RocketMqTemplate {
/// 如果不采用继承也可以直接注入使用
/* @Resource
private RocketMqTemplate rocketMqTemplate; */
/**
* 入参只需要传入是哪个订单号和业务体消息即可,其他操作根据需要处理
* 这样对于调用者而言,可以更加简化调用
*/
public SendResult sendOrderPaid(@NotNull String orderId, String body) {
RocketMqEntityMessage message = new RocketMqEntityMessage();
message.setKey(orderId);
message.setSource("订单支付");
message.setMessage(body);
// 这两个字段只是为了测试
message.setBirthday(LocalDate.now());
message.setTradeTime(LocalDateTime.now());
return send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.ORDER_PAID_TAG, message);
}
}
package com.codecoord.rocketmq.listener;
import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import com.codecoord.rocketmq.util.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import javax.annotation.Resource;
import java.time.Instant;
import java.util.Objects;
/**
* 抽象消息监听器,封装了所有公共处理业务,如
* 1、基础日志记录
* 2、异常处理
* 3、消息重试
* 4、警告通知
* 5、....
*
* @author tianxincoord@163.com
* @since 2022/4/17
*/
public abstract class BaseMqMessageListener<T extends BaseMqMessage> {
/**
* 这里的日志记录器是哪个子类的就会被哪个子类的类进行初始化
*/
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Resource
private RocketMqTemplate rocketMqTemplate;
/**
* 消息者名称
*
* @return 消费者名称
*/
protected abstract String consumerName();
/**
* 消息处理
*
* @param message 待处理消息
* @throws Exception 消费异常
*/
protected abstract void handleMessage(T message) throws Exception;
/**
* 超过重试次数消息,需要启用isRetry
*
* @param message 待处理消息
*/
protected abstract void overMaxRetryTimesMessage(T message);
/**
* 是否过滤消息,例如某些
*
* @param message 待处理消息
* @return true: 本次消息被过滤,false:不过滤
*/
protected boolean isFilter(T message) {
return false;
}
/**
* 是否异常时重复发送
*
* @return true: 消息重试,false:不重试
*/
protected abstract boolean isRetry();
/**
* 消费异常时是否抛出异常
*
* @return true: 抛出异常,false:消费异常(如果没有开启重试则消息会被自动ack)
*/
protected abstract boolean isThrowException();
/**
* 最大重试次数
*
* @return 最大重试次数,默认10次
*/
protected int maxRetryTimes() {
return 10;
}
/**
* isRetry开启时,重新入队延迟时间
*
* @return -1:立即入队重试
*/
protected int retryDelayLevel() {
return -1;
}
/**
* 由父类来完成基础的日志和调配,下面的只是提供一个思路
*/
public void dispatchMessage(T message) {
MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId());
// 基础日志记录被父类处理了
logger.info("[{}]消费者收到消息[{}]", consumerName(), JsonUtil.toJson(message));
if (isFilter(message)) {
logger.info("消息不满足消费条件,已过滤");
return;
}
// 超过最大重试次数时调用子类方法处理
if (message.getRetryTimes() > maxRetryTimes()) {
overMaxRetryTimesMessage(message);
return;
}
try {
long start = Instant.now().toEpochMilli();
handleMessage(message);
long end = Instant.now().toEpochMilli();
logger.info("消息消费成功,耗时[{}ms]", (end - start));
} catch (Exception e) {
logger.error("消息消费异常", e);
// 是捕获异常还是抛出,由子类决定
if (isThrowException()) {
throw new RuntimeException(e);
}
if (isRetry()) {
// 获取子类RocketMQMessageListener注解拿到topic和tag
RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
if (Objects.nonNull(annotation)) {
message.setSource(message.getSource() + "消息重试");
message.setRetryTimes(message.getRetryTimes() + 1);
SendResult sendResult;
try {
// 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
// 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息
sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
// 发送失败的处理就是不进行ACK,由RocketMQ重试
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
throw new RuntimeException("重试消息发送失败");
}
}
}
}
}
}
4.封装消费最终类 注意:收到的消息是先委派给父类,父类进行调度管理
package com.codecoord.rocketmq.listener;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 实体类消费监听器,在实现RocketMQListener中间还加了一层BaseMqMessageListener来处理基础业务消息
*
* @author tianxincoord@163.com
* @since 2022/5/12
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMqBizConstant.SOURCE_TOPIC,
consumerGroup = RocketMqBizConstant.SOURCE_GROUP,
selectorExpression = RocketMqBizConstant.SOURCE_TAG,
// 指定消费者线程数,默认64,生产中请注意配置,避免过大或者过小
consumeThreadMax = 5
)
public class RocketEntityMessageListener extends BaseMqMessageListener<RocketMqEntityMessage>
implements RocketMQListener<RocketMqEntityMessage> {
/**
* 此处只是说明封装的思想,更多还是要根据业务操作决定
* 内功心法有了,无论什么招式都可以发挥最大威力
*/
@Override
protected String consumerName() {
return "RocketMQ二次封装消息消费者";
}
@Override
public void onMessage(RocketMqEntityMessage message) {
// 注意,此时这里没有直接处理业务,而是先委派给父类做基础操作,然后父类做完基础操作后会调用子类的实际处理类型
super.dispatchMessage(message);
}
@Override
protected void handleMessage(RocketMqEntityMessage message) throws Exception {
// 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
System.out.println("业务消息处理");
}
@Override
protected void overMaxRetryTimesMessage(RocketMqEntityMessage message) {
// 当超过指定重试次数消息时此处方法会被调用
// 生产中可以进行回退或其他业务操作
}
@Override
protected boolean isRetry() {
return false;
}
@Override
protected int maxRetryTimes() {
// 指定需要的重试次数,超过重试次数overMaxRetryTimesMessage会被调用
return 5;
}
@Override
protected boolean isThrowException() {
// 是否抛出异常,到消费异常时是被父类拦截处理还是直接抛出异常
return false;
}
}
5.封装后对于子类来说,只需要告诉父类要不要做就拥有了最开始说的所有功能,简化了使用,此时子类消费者只需要专注于自己的业务核心处理就可以了
package com.codecoord.rocketmq.listener;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 广播消息
* 应用场景:多租户或者服务有内部缓存需要刷新情况下如果需要刷新租户信息或者缓存信息
* 也就是需要所有服务节点都需要同事做某一件事情的时候
* 此时可以借助广播消息发送消息到所有节点刷新,无需一个节点一个节点的处理
*
* 特别说明:广播消息默认会在家目录下创建消费进度文件,会以www.tianxincoord.com:9876@www.tianxincoord.com:9876
* 这种地址形式生成文件路径,但是由于带有:符号,windows下是不允许此符号作为文件夹名称的
* 所以如果rocketMQ的链接地址不是连接串(不带有端口)可以取消下面的messageModel注释
* 否则启动的时候就会提示目标卷或者路径不存在,其实是因为这个问题
*
* @author tianxincoord@163.com
* @since 2022/5/12
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMqBizConstant.SOURCE_TOPIC,
consumerGroup = RocketMqBizConstant.SOURCE_BROADCASTING_GROUP,
selectorExpression = RocketMqBizConstant.SOURCE_BROADCASTING_TAG
// messageModel = MessageModel.BROADCASTING
)
public class RocketBroadcastingListener implements RocketMQListener<MessageExt> {
/**
* MessageExt:内置的消息实体,生产中根据需要自己封装实体
*/
@Override
public void onMessage(MessageExt message) {
log.info("收到广播消息【{}】", new String(message.getBody()));
}
}
封装测试大家可以直接参考RocketMqController即可
package com.codecoord.rocketmq.controller;
import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import com.codecoord.rocketmq.template.OrderMessageTemplate;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.RequestMApping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.UUID;
/**
* 消息发送
*
* @author tianxin01@huice.com
* @since 2022/6/16
*/
@RestController
@RequestMapping("/rocketmq")
@Slf4j
public class RocketMqController {
/**
* 注意此处注入的是封装的RocketMqTemplate
*/
@Resource
private RocketMqTemplate rocketMqTemplate;
/**
* 注入对应业务的模板类
*/
@Resource
private OrderMessageTemplate orderMessageTemplate;
/**
* 通过实体类发送消息,发送注意事项请参考实体类
* 说明:也可以在RocketMqTemplate按照业务封装发送方法,这样只需要调用方法指定基础业务消息接口
*/
@RequestMapping("/entity/message")
public Object sendMessage() {
RocketMqEntityMessage message = new RocketMqEntityMessage();
// 设置业务key
message.setKey(UUID.randomUUID().toString());
// 设置消息来源,便于查询we年
message.setSource("封装测试");
// 业务消息内容
message.setMessage("当前消息发送时间为:" + LocalDateTime.now());
// Java时间字段需要单独处理,否则会序列化失败
message.setBirthday(LocalDate.now());
message.setTradeTime(LocalDateTime.now());
return rocketMqTemplate.send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_TAG, message);
}
/**
* 此时对于调用者而且,无需创建任何类
* 如果某天需要调整消息发送来源,如果不封装,所有原来产生message的地方全部改
* 如果封装了,只需要改sendOrderPaid就可以切换
*/
@RequestMapping("/order/paid")
public Object sendOrderPaidMessage() {
return orderMessageTemplate.sendOrderPaid(UUID.randomUUID().toString(), "客户下单了...,快快备货");
}
/**
* 直接将对象进行传输,也可以自己进行json转化后传输
*/
@RequestMapping("/messageExt/message")
public SendResult convertAndSend() {
// 生产中不推荐使用jsonObject传递,不看发送者无法知道传递的消息包含什么信息
JSONObject jsonObject = new JSONObject();
jsonObject.put("type", "messageExt");
String destination = rocketMqTemplate.buildDestination(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_BROADCASTING_TAG);
// 如果要走内部方法发送则必须要按照标准来,否则就使用原生的消息发送
return rocketMqTemplate.getTemplate().syncSend(destination, jsonObject);
}
}
>作者:TianXinCoord
链接:
https://juejin.cn/post/7110782118547947550
来源:稀土掘金