普通消息队列,消息一旦入队就会被消费者立刻消费,而延迟队列则需要指定固定时间后被延迟消费.
学习的话可以去官网下载rabbitmq并通过安装插件的方式安装延迟队列 插件地址:
https://www.rabbitmq.com/community-plugins.html
但我们这里使用现成的镜像就可以简单学习
docker pull lianlianyi/rabbitmq
rabbitmq:
image: lianlianyi/rabbitmq:3.9.13-management-alpine-delayed
ports:
- 15672:15672
- 5672:5672
environment:
- TIME_ZONE=Asia/ShanghAI
#用户
- RABBITMQ_DEFAULT_USER=admin
#密码
- RABBITMQ_DEFAULT_PASS=admin
volumes:
- /etc/localtime:/etc/localtime
- /etc/timezone:/etc/timezone
- ./data/rabbitmq/db:/var/lib/rabbitmq/mnesia/rabbit@my-rabbit
交换机有出现 x-delayed-message 字样表示插件运行成功
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.Application.xml
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: /
publisher-returns: true #开启发送失败返回
publisher-confirm-type: correlated #配置确认回调
listener:
simple:
acknowledge-mode: auto #开启ack
concurrency: 5 #指定最小的消费者数量.
max-concurrency: 10 #指定最大的消费者数量.
prefetch: 1 # 最多一次消费多少条数据 -限流
direct:
acknowledge-mode: auto
#支持消息的确认与返回
template:
mandatory: true
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, correlationDate -> {
//延迟3秒
correlationDate.getMessageProperties().setDelay(1000 * 3);
return correlationDate;
});
log.info("生产者发送时间:{},msg:{}", DateUtil.formatDateTime(new Date()),msg);
@RabbitListener(queuesToDeclare = @Queue(MsgProduct.DELAYED_QUEUE_NAME))
public void receive(@Payload MsgDemo msgDemo, Message message, Channel channel) {
log.info("调用消费者时间:{},MSG:{}¨", DateUtil.formatDateTime(new Date()),msgDemo);
}
https://Github.com/lianlianyi/toutiao_demo