大家好呀,我是楼仔。
RabbitMQ 的文章之前写过,但是当时给的示例是 Demo 版的,这篇文章主要是结合之前写的理论知识,将 RabbitMQ 集成到技术派项目中。
不 BB,上文章目录:
下面我们先回顾一下理论知识,如果对这块知识已经清楚的同学,可以直接跳到实战部分。
消息队列目前主要 2 种模式,分别为“点对点模式”和“发布/订阅模式”。
一个具体的消息只能由一个消费者消费,多个生产者可以向同一个消息队列发送消息,但是一个消息在被一个消息者处理的时候,这个消息在队列上会被锁住或者被移除并且其他消费者无法处理该消息。
需要额外注意的是,如果消费者处理一个消息失败了,消息系统一般会把这个消息放回队列,这样其他消费者可以继续处理。
单个消息可以被多个订阅者并发的获取和处理。一般来说,订阅有两种类型:
RabbitMQ 2007 年发布,是使用 Erlang 语言开发的开源消息队列系统,基于 AMQP 协议来实现。
提到RabbitMQ,就不得不提AMQP协议。AMQP协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
先了解一下AMQP协议中间的几个重要概念:
AMQP 协议模型由三部分组成:生产者、消费者和服务端,执行流程如下:
RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种:
因为我用的是mac,所以直接可以参考官网:
需要注意的是,一定需要先执行:
brew update
然后再执行:
brew install rabbitmq
之前没有执行brew update,直接执行brew install rabbitmq时,会报各种各样奇怪的错误,其中“403 Forbidde”居多。
但是在执行“brew install rabbitmq”,会自动安装其它的程序,如果你使用源码安装Rabbitmq,因为启动该服务依赖erlang环境,所以你还需手动安装erlang,但是目前官方已经一键给你搞定,会自动安装Rabbitmq依赖的所有程序,是不是很棒!
最后执行成功的输出如下:
启动服务:
# 启动方式1:后台启动
brew services start rabbitmq
# 启动方式2:当前窗口启动
cd /usr/local/Cellar/rabbitmq/3.8.19
rabbitmq-server
在浏览器输入:
http://localhost:15672/
会出现RabbitMQ后台管理界面(用户名和密码都为guest):
通过brew安装,一行命令搞定,真香!
添加账号:
## 添加账号
./rabbitmqctl add_user admin admin
## 添加访问权限
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
## 设置超级权限
./rabbitmqctl set_user_tags admin administrator
pom 引入依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.5.1</version>
</dependency>
先整一个 ConnectionFactory 单例,每台机器都有自己的 ConnectionFactory,防止每次都初始化(在后面的迭代中,我会把这个去掉,整成连接池)。
/**
* @author LouzAI
* @date 2023/5/10
*/
public class RabbitmqUtil {
/**
* 每个key都有自己的工厂
*/
private static Map<String, ConnectionFactory> executors = new ConcurrentHashMap<>();
/**
* 初始化一个工厂
*
* @param host
* @param port
* @param username
* @param passport
* @param virtualhost
* @return
*/
public static ConnectionFactory init(String host,
Integer port,
String username,
String passport,
String virtualhost) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(passport);
factory.setVirtualHost(virtualhost);
return factory;
}
/**
* 工厂单例,每个key都有属于自己的工厂
*
* @param key
* @param host
* @param port
* @param username
* @param passport
* @param virtualhost
* @return
*/
public static ConnectionFactory getOrInitConnectionFactory(String key,
String host,
Integer port,
String username,
String passport,
String virtualhost) {
ConnectionFactory connectionFactory = executors.get(key);
if (null == connectionFactory) {
synchronized (RabbitmqUtil.class) {
connectionFactory = executors.get(key);
if (null == connectionFactory) {
connectionFactory = init(host, port, username, passport, virtualhost);
executors.put(key, connectionFactory);
}
}
}
return connectionFactory;
}
}
获取 RabbitmqClient:
/**
* @author Louzai
* @date 2023/5/10
*/
@Component
public class RabbitmqClient {
@Autowired
private RabbitmqProperties rabbitmqProperties;
/**
* 创建一个工厂
* @param key
* @return
*/
public ConnectionFactory getConnectionFactory(String key) {
String host = rabbitmqProperties.getHost();
Integer port = rabbitmqProperties.getPort();
String userName = rabbitmqProperties.getUsername();
String password = rabbitmqProperties.getPassport();
String virtualhost = rabbitmqProperties.getVirtualhost();
return RabbitmqUtil.getOrInitConnectionFactory(key, host, port, userName,password, virtualhost);
}
}
重点!敲黑板!!!这里就是 RabbmitMQ 的核心逻辑了。
我们使用的交换机类型是 Direct Exchange,此交换机需要绑定一个队列,要求该消息与一个特定的路由键完全匹配,简单点说就是一对一的,点对点的发送。
至于为什么不用广播和主题交换机模式,因为技术派的使用场景就是发送单个消息,点到点发送和消费的模式完全可以满足我们的需求。
下面 3 个方法都很简单:
@Component
public class RabbitmqServiceImpl implements RabbitmqService {
@Autowired
private RabbitmqClient rabbitmqClient;
@Autowired
private NotifyService notifyService;
@Override
public void publishMsg(String exchange,
BuiltinExchangeType exchangeType,
String toutingKey,
String message) throws IOException, TimeoutException {
ConnectionFactory factory = rabbitmqClient.getConnectionFactory(toutingKey);
// TODO: 这种并发量起不来,需要改造成连接池
//创建连接
Connection connection = factory.newConnection();
//创建消息通道
Channel channel = connection.createChannel();
// 声明exchange中的消息为可持久化,不自动删除
channel.exchangeDeclare(exchange, exchangeType, true, false, null);
// 发布消息
channel.basicPublish(exchange, toutingKey, null, message.getBytes());
System.out.println("Publish msg:" + message);
channel.close();
connection.close();
}
@Override
public void consumerMsg(String exchange,
String queue,
String routingKey) throws IOException, TimeoutException {
ConnectionFactory factory = rabbitmqClient.getConnectionFactory(routingKey);
// TODO: 这种并发量起不来,需要改造成连接池
//创建连接
Connection connection = factory.newConnection();
//创建消息信道
final Channel channel = connection.createChannel();
//消息队列
channel.queueDeclare(queue, true, false, false, null);
//绑定队列到交换机
channel.queueBind(queue, exchange, routingKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Consumer msg:" + message);
// 获取Rabbitmq消息,并保存到DB
// 说明:这里仅作为示例,如果有多种类型的消息,可以根据消息判定,简单的用 if...else 处理,复杂的用工厂 + 策略模式
notifyService.saveArticleNotify(JsonUtil.toObj(message, UserFootDO.class), NotifyTypeEnum.PRAISE);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 取消自动ack
channel.basicConsume(queue, false, consumer);
}
@Override
public void processConsumerMsg() {
System.out.println("Begin to processConsumerMsg.");
Integer stepTotal = 1;
Integer step = 0;
// TODO: 这种方式非常 Low,后续会改造成阻塞 I/O 模式
while (true) {
step ++;
try {
System.out.println("processConsumerMsg cycle.");
consumerMsg(CommonConstants.EXCHANGE_NAME_DIRECT, CommonConstants.QUERE_NAME_PRAISE,
CommonConstants.QUERE_KEY_PRAISE);
if (step.equals(stepTotal)) {
Thread.sleep(10000);
step = 0;
}
} catch (Exception e) {
}
}
}
}
这里只是给个示例,如果要真正用到生产环境,你觉得有哪些问题呢? 你自己先想想,文末再告诉你。
其实之前我们是通过 JAVA 的内置异步调用方式,为了方便验证,我把文章点赞的功能迁移到 RabbitMQ 中,只要是点赞,就走 RabbitMQ 模式。
// 点赞消息走 RabbitMQ,其它走 Java 内置消息机制
if (notifyType.equals(NotifyTypeEnum.PRAISE) && rabbitmqProperties.getSwitchFlag()) {
rabbitmqService.publishMsg(
CommonConstants.EXCHANGE_NAME_DIRECT,
BuiltinExchangeType.DIRECT,
CommonConstants.QUERE_KEY_PRAISE,
JsonUtil.toStr(foot));
} else {
Optional.ofNullable(notifyType).ifPresent(notify -> SpringUtil.publishEvent(new NotifyMsgEvent<>(this, notify, foot)));
}
那消费入口放哪里呢?其实是在程序启动的时候,我们就启动 RabbitMQ 进行消费,然后整个进程一直在程序中跑。
@Override
public void run(ApplicationArguments args) {
// 设置类型转换, 主要用于MyBatis读取varchar/json类型数据据,并写入到json格式的实体Entity中
JacksonTypeHandler.setObjectMapper(new ObjectMapper());
// 应用启动之后执行
GlobalViewConfig config = SpringUtil.getBean(GlobalViewConfig.class);
if (webPort != null) {
config.setHost("http://127.0.0.1:" + webPort);
}
// 启动 RabbitMQ 进行消费
if (rabbitmqProperties.getSwitchFlag()) {
taskExecutor.execute(() -> rabbitmqService.processConsumerMsg());
}
log.info("启动成功,点击进入首页: {}", config.getHost());
}
我们多次点击“点赞”按钮,触发 RammitMQ 消息发送。
可以通过日志,也可以看到发送和消费过的消息。
我靠!好多没有关闭的链接。。。
还有一堆没有关闭的 channel。。。
估计再多跑一会,内存全部吃光,机器就死机了,怎么破?答案是连接池!
为了方便大家学习功能演变的过程,每个模块都会单独开个分支,包括后面的升级版:
如果需要运行 RabbitMQ,下面的配置需要改成 true,因为代码默认是 false。
这篇文章,让大家知道 RabbitMQ 的基本原理,以及如何去集成 RabbitMQ,但是还不能用到实际生产环境,但是这个确实是我写的第一个版本,存粹是搞着玩的,因为里面存在的问题还非常多。
我简单列举一下:
如果你对上面的问题也非常感兴趣,可以直接基于分支 feature/add_rabbitmq_20230506,然后给我提 PR,技术嘛,我喜欢边玩边学。