您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

RabbitMQ 简单工作模式

时间:2022-09-24 11:58:14  来源:  作者:互联共商

准备工作纯JAVA项目依赖

创建一个maven-java项目在pom.xml中添加以下依赖:

com.RabbitMQ:amqp-client:5.15.0org.projectlombok:lombok:1.18.24junit:junit:4.13.2org.Apache.logging.log4j:log4j-api:2.18.0org.apache.logging.log4j:log4j-core:2.18.0org.apache.logging.log4j:log4j-slf4j-impl:2.18.0org.slf4j:slf4j-api:1.7.36cn.hutool:hutool-core:5.8.5日志配置文件

创建log4j2.xml文件,用于输出日志:

连接工具类

  • 每一次连接时,地址、端口等信息都相同,所以将这些相同的代码写成一个工具类。
  • 这儿没有将Connection声明成静态的成员变量,因为,对于生产者与消费者,应该是分开部署的,也不可能使用同一个Connection对象。
package wj.mq.utils;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class ConnUtils {public static Connection newConnection(){ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.56.61");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("123456");Connection connection = null;try {connection = factory.newConnection();} catch (IOException e) {throw new RuntimeException(e);} catch (TimeoutException e) {throw new RuntimeException(e);return connection;

 

最后的项目结构如下:


 

SpringBoot项目依赖


 

配置文件server.port=8888spring.Application.name=helloworld# 配置mqspring.rabbitmq.host=192.168.56.61spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=admin# 配置日志logging.level.root=INFOlogging.file.name=./logs/log.log相关注解

@RabbitListener(queues = {"HelloSpring"}) : 可以注解在类上,也可以注解在方法上,消费消息。

@RabbitHandler() : 注解在方法上,与@RabbitListener共同使用。

简单模式


 

java项目生产者Publisher代码package wj.rabbitmq.demo01;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import lombok.extern.slf4j.Slf4j;import wj.mq.utils.ConnUtils;import java.nio.charset.StandardCharsets;@Slf4jpublic class Publisher {public static void mAIn(String[] args) throws Exception {String queueName = "HelloQueue";//获取一个连接try(Connection connection = ConnUtils.newConnection()){//获取信道Channel channel = connection.createChannel();//声明一个队列名称为channel.queueDeclare(queueName, true, false,false, null);//发送消息:Exchange空默认使用/交换机channel.basicPublish("",queueName,null,"HelloWorld".getbytes(StandardCharsets.UTF_8));log.info("送信息完成");


 

查看UI

发送完成以后,查看RabbitMQ的UI管理界面中的Queue选项卡:


 

点HelloQueue这个队列名称,进入详细界面:


 

查看Bindings,可以看到目前是默认交换机(即AMQP Default)绑定到当前这个队列:


 

消费者Consumer代码package wj.rabbitmq.demo01;import com.rabbitmq.client.*;import lombok.extern.slf4j.Slf4j;import wj.mq.utils.ConnUtils;@Slf4jpublic class Consumer {public static void main(String[] args) throws Exception {String queueName = "HelloQueue";try (Connection con = ConnUtils.newConnection()) {Channel channel = con.createChannel();channel.queueDeclare(queueName, true, false,false, null);//接收到数据以后的回调函数Delivercallback callback = (consumerTag, message) -> {byte[] body = message.getBody();String str = new String(body);log.info("接收到信息:{}", str);//取消处理信息的回调CancelCallback cancelCallback = consumerTag -> {//ignore//消费监听channel.basicConsume(queueName, false, callback, cancelCallback);

截图:


 

运行后效果,即输出从队列中读取的数据:

09:40:22.886 [pool-2-thread-4] 接收到信息:HelloWorld

SpringBoot项目创建Springboot项目并添加以下依赖

  • org.springframework.boot:spring-boot-starter-amqp:2.7.3
  • org.springframework.boot:spring-boot-starter-thymeleaf:2.7.3
  • org.springframework.boot:Spring-boot-starter-web:2.7.3
  • org.springframework.boot:spring-boot-devtools:2.7.3
  • org.springframework.boot:spring-boot-configuration-processor:2.7.3
  • org.projectlombok:lombok:1.18.24
  • org.springframework.boot:spring-boot-starter-test:2.7.3
  • org.springframework.amqp:spring-rabbit-test:2.4.6
  • cn.hutool:hutool-core:5.8.6
配置类

 

配置类的主要功能是配置Queue,Exchange等。以下我这儿仅需要一个Queue所以仅配置了一个Queue。

package wj.mq.config;import java.util.HashMap;import java.util.Map;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import lombok.extern.slf4j.Slf4j;@Slf4j@Configurationpublic class RabbitMQConfig {* 在使用队列之前,必须要先声明这个队列,如果不在这儿声明,就必须要在@RabbitListener时添加queuesToDeclare如下:@RabbitListener(queues = {"HelloSpring"},queuesToDeclare = @Queue("HelloSpring"))* @return@Beanpublic Queue helloSpringQueue() {Map args = new HashMap<>();args.put("name", "HelloSpring");Queue queue = new Queue("HelloSpring",true, false, false,args);log.info("队列创建成功:{}",queue.getName());return queue;


 

启动类

与普通的启动的类类似,只是为了使用调度功能,添加了@EnableScheduling注解。

package wj.mq;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.springframework.scheduling.annotation.EnableScheduling;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.ResponseBody;import lombok.extern.slf4j.Slf4j;@Slf4j@Controller@EnableScheduling@SpringBootApplicationpublic class HelloWorldApplication {public static void main(String[] args) {SpringApplication.run(HelloWorldApplication.class, args);log.info("Started..");@ResponseBody@RequestMapping(value = {"/",""})public String index() {return "index";

添加EnableScheduling注解,后面用到定时任务,让生产者定时发布消息:


 

生产者

convertAndSend可以发布任意的对象,默认会被转换成指定的格式

package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;@Slf4j@Componentpublic class HelloWorldSender {@Autowiredprivate RabbitTemplate rabbitTemplate;private static int times = 1;@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send() {String msg = ""+times;rabbitTemplate.convertAndSend("HelloSpring",msg);log.info("Send {} ok",msg);times++;

添加@Component注解为Spring Bean组件。

注入rabbitTemplate。

通过convertAndSend(QueueName,Object)发送数据给批定的队列。


 

消费者package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;@Slf4j@Component@RabbitListener(queues = {"HelloSpring"})public class HelloWorldReceiver {@RabbitHandlerpublic void onMessage(String message) {log.info("消费者1,接收到信息: {}",message);

在类上添加注解@RabbitListener(queues=..)

在接收信息的方法上添加@RabbitHandler


 

消费者2(另一种使用注解的方法)

package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;* 第二种接收 信息的方法,就是直接在方法上添加RabbitListener@Slf4j@Componentpublic class HelloWorldReceiver2 {* 并设置为手工确认@RabbitListener(queues = "HelloSpring",ackMode ="MANUAL")public void onMessage(Message message,Channel channel) throws Exception {log.info("消费者2,接收到信息: {}",new String(message.getBody()));long id = message.getMessageProperties().getDeliveryTag();//手工确认channel.basicAck(id, false);

直接将注解写在方法上,并设置为手工确认。

注意接收的参数为Message对象和Channel对象。

使用channel.ack进行手工确认。


 

生产者-发布一个对象声明JavaBean必须实现序列化接口package wj.mq.domain;import java.io.Serializable;import lombok.Getter;import lombok.Setter;@Getter@Setterpublic class Person implements Serializable {private static final long serialVersionUID = 1L;private String name;private Integer age;生产者

注意以下通过convertAndSend直接发布了一个对象。

package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import wj.mq.domain.Person;@Componentpublic class HelloWorldSender {@Autowiredprivate Queue helloSpringQueue;@Autowiredprivate RabbitTemplate rabbitTemplate;private static int times = 1;@Scheduled(fixedDelay = 1000, initialDelay = 500)public void send() {Person person = new Person();//声明对象并直接发布person.setName("Jack");person.setAge(times);rabbitTemplate.convertAndSend(helloSpringQueue.getName(),person);times++;

查看RabbitMQ UI可见编码类型为序列化再转base64的方式:


 

消费者-消费一个对象直接接收消费对象package wj.mq.rabbitmq.helloworld;import cn.hutool.core.thread.ThreadUtil;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.alibaba.fastJSON2.JSONObject;import lombok.extern.slf4j.Slf4j;import wj.mq.domain.Person;import java.util.Random;@Slf4j@Component@RabbitListener(queues = {"HelloSpring"})public class HelloWorldReceiver {private Random random = new Random();private int count =1;* 直接接收Peson对象即可@RabbitHandler()public void onMessage(Person message) {int sleep = 1000*random.nextInt(20);ThreadUtil.sleep(sleep);log.info("消费者1,休眠{}ms后处理完成信息:{},共处理{}消息",sleep,JSONObject.toJSONString(message),count++);接收Message对象,然后自己做转换package wj.mq.rabbitmq.helloworld;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;import com.alibaba.fastjson2.JSONObject;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import wj.mq.domain.Person;import java.io.ByteArrayInputStream;import java.io.ObjectInputStream;* 第二种接收 信息的方法,就是直接在方法上添加RabbitListener@Slf4j@Componentpublic class HelloWorldReceiver2 {int count = 1;* 并设置为手工确认@RabbitListener(queues = "HelloSpring",ackMode ="MANUAL")public void onMessage(@Payload()Message message,Channel channel) throws Exception {int sleep = 0;//通过ObjectReader读取对象ObjectInputStream objIn = new ObjectInputStream(new ByteArrayInputStream(message.getBody()));Object readObject = objIn.readObject();Person p = (Person) readObject;objIn.close();log.info("消费者2,休眠{}ms后处理完成信息: {},共处理{}消息",sleep,JSONObject.toJSONString(p),count++);long id = message.getMessageProperties().getDeliveryTag();channel.basicAck(id, false);//手工确认

最后显示的结果,与之前相同


 

最终项目结构


 

运行测试

两个消费者,依次接收RabbitMQ发送的消息:


 

关于springboot中的更多设置设置MessageConvert

默认情况下,通过rabbitTemplate.sendAndConvert(..)发送的object对象会使用Java序列化方式传输。可以在UI界面上,通过读取一个Queue中的消息的方式,查看队列中的数据格式:java-serialized。


 

我们可以修改这种格式,如JSON

以下注意给RabbitTemplate.setMessageConverter(..)即可以设置为JSON转换格式。

package wj.mq.config.helloworld;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class HelloWorldConfig {@Autowiredprivate ConnectionFactory connectionFactory;@Beanpublic Queue helloSpringQueue() {return new Queue("HelloSpringQueue", true, false,false);@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rt =new RabbitTemplate();rt.setConnectionFactory(connectionFactory);rt.setMessageConverter(messageConverter());return rt;

设置后,查看Message的格式


 

接收 deliverTag

仅通过@RabbitListener的方法上,可以通过添加@Header接收具体指定的数据,如下。

注意@header的部分的代码。

package wj.mq.rabbitmq.helloworld;import cn.hutool.core.thread.ThreadUtil;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.messaging.handler.annotation.Payload;import org.springframework.stereotype.Component;import com.alibaba.fastjson2.JSONObject;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import wj.mq.domain.Person;import java.util.Random;@Slf4j@Component@RabbitListener(queues = {"#{helloSpringQueue.name}"})public class HelloWorldReceiver {private Random random = new Random();private int count =1;* 直接接收Peson对象即可@RabbitHandler()public void onMessage(@Payload() Person message,@Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag,Channel channel) {int sleep = 1000*random.nextInt(20)*0;ThreadUtil.sleep(sleep);log.info("消费者1,休眠{}ms后处理完成信息:{},共处理{}消息,消息tag={}",sleep,JSONObject.toJSONString(message),count++,deliveryTag);


 

或,直接将@RabbitListener注解在方法上


 

可也可以接收一个Message对象


 

最后显示的效果如下:


 



Tags:RabbitMQ   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
RabbitMQ 如何实现延迟队列?
延迟队列是指当消息被发送以后,并不是立即执行,而是等待特定的时间后,消费者才会执行该消息。延迟队列的使用场景有以下几种: 未按时支付的订单,30 分钟过期之后取消订单。 给...【详细内容】
2023-09-05  Search: RabbitMQ  点击:(261)  评论:(0)  加入收藏
RabbitMQ 简单工作模式
准备工作纯Java项目依赖创建一个maven-java项目在pom.xml中添加以下依赖:com.RabbitMQ:amqp-client:5.15.0org.projectlombok:lombok:1.18.24junit:junit:4.13.2org.apache.l...【详细内容】
2022-09-24  Search: RabbitMQ  点击:(279)  评论:(0)  加入收藏
新来个技术总监,把 RabbitMQ 讲得那叫一个透彻,佩服
整体阅读时间,在 40 分钟左右。常见的消息队列很多,主要包括 RabbitMQ、Kafka、RocketMQ 和 ActiveMQ,相关的选型可以看我之前的系列,这篇文章只讲 RabbitMQ,先讲原理,后搞实战。...【详细内容】
2022-08-19  Search: RabbitMQ  点击:(386)  评论:(0)  加入收藏
RabbitMQ 使用指南
1 MQ 简 介消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展...【详细内容】
2020-03-12  Search: RabbitMQ  点击:(305)  评论:(0)  加入收藏
▌简易百科推荐
Netflix 是如何管理 2.38 亿会员的
作者 | Surabhi Diwan译者 | 明知山策划 | TinaNetflix 高级软件工程师 Surabhi Diwan 在 2023 年旧金山 QCon 大会上发表了题为管理 Netflix 的 2.38 亿会员 的演讲。她在...【详细内容】
2024-04-08    InfoQ  Tags:Netflix   点击:(2)  评论:(0)  加入收藏
即将过时的 5 种软件开发技能!
作者 | Eran Yahav编译 | 言征出品 | 51CTO技术栈(微信号:blog51cto) 时至今日,AI编码工具已经进化到足够强大了吗?这未必好回答,但从2023 年 Stack Overflow 上的调查数据来看,44%...【详细内容】
2024-04-03    51CTO  Tags:软件开发   点击:(7)  评论:(0)  加入收藏
跳转链接代码怎么写?
在网页开发中,跳转链接是一项常见的功能。然而,对于非技术人员来说,编写跳转链接代码可能会显得有些困难。不用担心!我们可以借助外链平台来简化操作,即使没有编程经验,也能轻松实...【详细内容】
2024-03-27  蓝色天纪    Tags:跳转链接   点击:(13)  评论:(0)  加入收藏
中台亡了,问题到底出在哪里?
曾几何时,中台一度被当做“变革灵药”,嫁接在“前台作战单元”和“后台资源部门”之间,实现企业各业务线的“打通”和全域业务能力集成,提高开发和服务效率。但在中台如火如荼之...【详细内容】
2024-03-27  dbaplus社群    Tags:中台   点击:(9)  评论:(0)  加入收藏
员工写了个比删库更可怕的Bug!
想必大家都听说过删库跑路吧,我之前一直把它当一个段子来看。可万万没想到,就在昨天,我们公司的某位员工,竟然写了一个比删库更可怕的 Bug!给大家分享一下(不是公开处刑),希望朋友们...【详细内容】
2024-03-26  dbaplus社群    Tags:Bug   点击:(5)  评论:(0)  加入收藏
我们一起聊聊什么是正向代理和反向代理
从字面意思上看,代理就是代替处理的意思,一个对象有能力代替另一个对象处理某一件事。代理,这个词在我们的日常生活中也不陌生,比如在购物、旅游等场景中,我们经常会委托别人代替...【详细内容】
2024-03-26  萤火架构  微信公众号  Tags:正向代理   点击:(11)  评论:(0)  加入收藏
看一遍就理解:IO模型详解
前言大家好,我是程序员田螺。今天我们一起来学习IO模型。在本文开始前呢,先问问大家几个问题哈~什么是IO呢?什么是阻塞非阻塞IO?什么是同步异步IO?什么是IO多路复用?select/epoll...【详细内容】
2024-03-26  捡田螺的小男孩  微信公众号  Tags:IO模型   点击:(9)  评论:(0)  加入收藏
为什么都说 HashMap 是线程不安全的?
做Java开发的人,应该都用过 HashMap 这种集合。今天就和大家来聊聊,为什么 HashMap 是线程不安全的。1.HashMap 数据结构简单来说,HashMap 基于哈希表实现。它使用键的哈希码来...【详细内容】
2024-03-22  Java技术指北  微信公众号  Tags:HashMap   点击:(11)  评论:(0)  加入收藏
如何从头开始编写LoRA代码,这有一份教程
选自 lightning.ai作者:Sebastian Raschka机器之心编译编辑:陈萍作者表示:在各种有效的 LLM 微调方法中,LoRA 仍然是他的首选。LoRA(Low-Rank Adaptation)作为一种用于微调 LLM(大...【详细内容】
2024-03-21  机器之心Pro    Tags:LoRA   点击:(12)  评论:(0)  加入收藏
这样搭建日志中心,传统的ELK就扔了吧!
最近客户有个新需求,就是想查看网站的访问情况。由于网站没有做google的统计和百度的统计,所以访问情况,只能通过日志查看,通过脚本的形式给客户导出也不太实际,给客户写个简单的...【详细内容】
2024-03-20  dbaplus社群    Tags:日志   点击:(4)  评论:(0)  加入收藏
站内最新
站内热门
站内头条