您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

一张图带你理解和实现RabbitMQ的延迟队列功能

时间:2019-09-02 14:20:05  来源:  作者:

先熟悉下面会用到的一些名词~

  • exchange: 交换机
  • routingkey: 路由key
  • queue: 队列

exchange和queue是需要绑定在一起的,然后消息发送到exchange再由exchange通过routingkey发送到对应的队列中。

一张图带你理解和实现RabbitMQ的延迟队列功能

exchange分四种

Default Exchange

这种是特殊的Direct Exchange,是rabbitmq内部默认的一个交换机。该交换机的name是空字符串,所有queue都默认binding 到该交换机上。所有binding到该交换机上的queue,routing-key都和queue的name一样。

注意: 这就是为什么你直接创建一个queue也能正常的生产与消费,因为对应的exchange是RabbitMQ默认的,routingkey就是该队列的名字

Topic Exchange

通配符交换机,exchange会把消息发送到一个或者多个满足通配符规则的routing-key的queue。其中表号匹配一个word,#匹配多个word和路径,路径之间通过.隔开。如满足a..c的routing-key有a.hello.c;满足#.hello的routing-key有a.b.c.helo。

Fanout Exchange

扇形交换机,该交换机会把消息发送到所有binding到该交换机上的queue。这种是publisher/subcribe模式。用来做广播最好。

所有该exchagne上指定的routing-key都会被ignore掉。

Header Exchange

设置header attribute参数类型的交换机。

简单的了解之后,下面就是延迟队列的实现方式

延迟队列的实现

延迟分两种

  • 在msg上设置过期时间
  • 在队列上设置过期时间
一张图带你理解和实现RabbitMQ的延迟队列功能

一定要看懂这张图!!!

如上图创建三个exchange和三个队列

@Bean

public DirectExchange delayExchange() {

return new DirectExchange(DELAY_EXCHANGE_NAME);

}

@Bean

public DirectExchange processExchange() {

return new DirectExchange(PROCESS_EXCHANGE_NAME);

}

@Bean

public DirectExchange delayQueueExchange() {

return new DirectExchange(DELAY_QUEUE_EXCHANGE_NAME);

}

/**

* 存放延迟消息的队列 最后将会转发给exchange(实际消费队列对应的)

* @return

*/

@Bean

Queue delayQueue4Msg(){

return QueueBuilder.durable(DELAY_QUEUE_MSG)

.withArgument("x-dead-letter-exchange", PROCESS_EXCHANGE_NAME)

.withArgument("x-dead-letter-routing-key", ROUTING_KEY)

.build();

}

@Bean

public Queue processQueue() {

return QueueBuilder.durable(PROCESS_QUEUE)

.build();

}

/**

* 存放消息的延迟队列 最后将会转发给exchange(实际消费队列对应的)

* @return

*/

@Bean

public Queue delayQueue4Queue() {

return QueueBuilder.durable(DELAY_QUEUE_NAME)

.withArgument("x-dead-letter-exchange", PROCESS_EXCHANGE_NAME) // DLX

.withArgument("x-dead-letter-routing-key", ROUTING_KEY)

.withArgument("x-message-ttl", 3000) // 设置队列的过期时间 单位毫秒

.build();

}

接下来将每个exchange和对应的mq绑定

@Bean

Binding delayBinding() {

return BindingBuilder.bind(delayQueue4Msg())

.to(delayExchange())

.with(ROUTING_KEY);

}

@Bean

Binding queueBinding() {

return BindingBuilder.bind(processQueue())

.to(processExchange())

.with(ROUTING_KEY);

}

@Bean

Binding delayQueueBind() {

return BindingBuilder.bind(delayQueue4Queue())

.to(delayQueueExchange())

.with(ROUTING_KEY);

}

发送消息的方式

public void sendDelayMsg(Msg msg) {

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

System.out.println(msg.getId() + " 延迟消息发送时间:" + sdf.format(new Date()));

rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_NAME, "delay", msg, new MessagePostProcessor() {

@Override

public Message postProcessMessage(Message message) throws AmqpException {

message.getMessageProperties().setExpiration(msg.getTtl() + "");

return message;

}

});

}

public void sendDelayQueue(Msg msg) {

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

System.out.println(msg.getId() + " 延迟队列消息发送时间:" + sdf.format(new Date()));

rabbitTemplate.convertAndSend(RabbitConfig.DELAY_QUEUE_EXCHANGE_NAME,"delay", msg);

}

验证结果

为每个消息设置过期时间

一张图带你理解和实现RabbitMQ的延迟队列功能

为队列设置过期时间

一张图带你理解和实现RabbitMQ的延迟队列功能

如果你把设置了过期时间的消息发送到设置了过期时间的队里中的时候,以最短的时间为准。

最后

其实我在实现的过程中也花了很长的时间,主要就是被exchange和queue搞乱掉了,最后索性自己画了个图,按照图来一个一个创建与绑定。之后就很清晰很容易的实现了。

强调!!! 如果在开发的过程中发现exchange和queue绑定错误了,建议从管理界面将queue和exchange unbind或者删除重新创建!



Tags:RabbitMQ   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
RabbitMQ 介绍RabbitMQ 是一个由erlang语言编写的、开源的、在AMQP基础上完整的、可复用的企业消息系统。支持多种语言,包括java、Python、ruby、PHP、C/C++等。1.1.AMQP模型...【详细内容】
2021-11-17  Tags: RabbitMQ  点击:(16)  评论:(0)  加入收藏
下载Erlang和RabbitMQ官网下载地址Erlang下载地址: http://www.erlang.org/downloadsRabbitMQ下载地址: http://www.rabbitmq.com/download.html版本:( Erlang23+RabbitMQ3.8.4...【详细内容】
2021-08-31  Tags: RabbitMQ  点击:(49)  评论:(0)  加入收藏
环境:Spring Boot2.3.10 + RabbitMQ 3.8.12 + Erlang 23.2.51.1 RabbitMQ介绍RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务...【详细内容】
2021-04-22  Tags: RabbitMQ  点击:(337)  评论:(0)  加入收藏
RabbitMQ环境搭建erlang和RabbitMQ版本对应关系:https://www.rabbitmq.com/which-erlang.htmlerlang环境安装yum方式安装 yum源配置[root@iyeed RabbitMQ]# curl -s https://...【详细内容】
2021-04-14  Tags: RabbitMQ  点击:(281)  评论:(0)  加入收藏
Direct 模式# 所有发送到 Direct Exchange 的消息被转发到 RouteKey 中指定的 Queue。 Direct 模式可以使用 RabbitMQ 自带的 Exchange: default Exchange,所以不需要将 Exch...【详细内容】
2021-04-13  Tags: RabbitMQ  点击:(220)  评论:(0)  加入收藏
一、关于 RabbitMQ说到 RabbitMQ,相信大家都不会陌生,微服务开发中必不可少的中间件。 在上篇关于消息队列的文章中,我们了解到 RabbitMQ 本质其实是用 Erlang 开发的 AMQP(Adva...【详细内容】
2021-03-11  Tags: RabbitMQ  点击:(229)  评论:(0)  加入收藏
说明:想要理解RabbitMQ,需要先理解MQ是什么?能做什么?然后根据基础知识去理解RabbitMQ是什么、提供了什么功能。一、MQ的简单理解1. 什么是MQ? 消息队列(Message Queue),是基础数据...【详细内容】
2021-02-07  Tags: RabbitMQ  点击:(156)  评论:(0)  加入收藏
1、 查找Docker容器中的RabbitMQ镜像docker ps -a[root@linux ~]# docker ps -aCONTAINER ID IMAGE COMMAND CREATED...【详细内容】
2020-11-27  Tags: RabbitMQ  点击:(222)  评论:(0)  加入收藏
一、Maven依赖添加 <!-- rabbitmq相关依赖 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId>...【详细内容】
2020-08-01  Tags: RabbitMQ  点击:(53)  评论:(0)  加入收藏
一、简单的发送与接收消息 HelloWorld1. 发送消息发送消息首先要获取与rabbitmq-server的连接,然后从渠道(chann)中指定的queue发送消息 , 不能定义两个queue名字相同,但属性...【详细内容】
2020-04-03  Tags: RabbitMQ  点击:(61)  评论:(0)  加入收藏
▌简易百科推荐
本文分为三个等级自顶向下地分析了glibc中内存分配与回收的过程。本文不过度关注细节,因此只是分别从arena层次、bin层次、chunk层次进行图解,而不涉及有关指针的具体操作。前...【详细内容】
2021-12-28  linux技术栈    Tags:glibc   点击:(3)  评论:(0)  加入收藏
摘 要 (OF作品展示)OF之前介绍了用python实现数据可视化、数据分析及一些小项目,但基本都是后端的知识。想要做一个好看的可视化大屏,我们还要学一些前端的知识(vue),网上有很多比...【详细内容】
2021-12-27  项目与数据管理    Tags:Vue   点击:(2)  评论:(0)  加入收藏
程序是如何被执行的&emsp;&emsp;程序是如何被执行的?许多开发者可能也没法回答这个问题,大多数人更注重的是如何编写程序,却不会太注意编写好的程序是如何被运行,这并不是一个好...【详细内容】
2021-12-23  IT学习日记    Tags:程序   点击:(9)  评论:(0)  加入收藏
阅读收获✔️1. 了解单点登录实现原理✔️2. 掌握快速使用xxl-sso接入单点登录功能一、早期的多系统登录解决方案 单系统登录解决方案的核心是cookie,cookie携带会话id在浏览器...【详细内容】
2021-12-23  程序yuan    Tags:单点登录(   点击:(8)  评论:(0)  加入收藏
下载Eclipse RCP IDE如果你电脑上还没有安装Eclipse,那么请到这里下载对应版本的软件进行安装。具体的安装步骤就不在这赘述了。创建第一个标准Eclipse RCP应用(总共分为六步)1...【详细内容】
2021-12-22  阿福ChrisYuan    Tags:RCP应用   点击:(7)  评论:(0)  加入收藏
今天想简单聊一聊 Token 的 Value Capture,就是币的价值问题。首先说明啊,这个话题包含的内容非常之光,Token 的经济学设计也可以包含诸多问题,所以几乎不可能把这个问题说的清...【详细内容】
2021-12-21  唐少华TSH    Tags:Token   点击:(10)  评论:(0)  加入收藏
实现效果:假如有10条数据,分组展示,默认在当前页面展示4个,点击换一批,从第5个开始继续展示,到最后一组,再重新返回到第一组 data() { return { qList: [], //处理后...【详细内容】
2021-12-17  Mason程    Tags:VUE   点击:(14)  评论:(0)  加入收藏
什么是性能调优?(what) 为什么需要性能调优?(why) 什么时候需要性能调优?(when) 什么地方需要性能调优?(where) 什么时候来进行性能调优?(who) 怎么样进行性能调优?(How) 硬件配...【详细内容】
2021-12-16  软件测试小p    Tags:性能调优   点击:(20)  评论:(0)  加入收藏
Tasker 是一款适用于 Android 设备的高级自动化应用,它可以通过脚本让重复性的操作自动运行,提高效率。 不知道从哪里听说的抖音 app 会导致 OLED 屏幕烧屏。于是就现学现卖,自...【详细内容】
2021-12-15  ITBang    Tags:抖音防烧屏   点击:(25)  评论:(0)  加入收藏
11 月 23 日,Rust Moderation Team(审核团队)在 GitHub 上发布了辞职公告,即刻生效。根据公告,审核团队集体辞职是为了抗议 Rust 核心团队(Core team)在执行社区行为准则和标准上...【详细内容】
2021-12-15  InfoQ    Tags:Rust   点击:(25)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条