您当前的位置:首页 > 电脑百科 > 程序开发 > 编程百科

什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

时间:2021-02-07 10:12:36  来源:  作者:

说明:想要理解RabbitMQ,需要先理解MQ是什么?能做什么?然后根据基础知识去理解RabbitMQ是什么、提供了什么功能。

一、MQ的简单理解

1. 什么是MQ?

  • 消息队列(Message Queue),是基础数据结构中 “先进先出” 的一种数据结构。
  • 一般用来解决应用解耦、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构

2.MQ是怎么实现消息传递的?

  1. 生产者产生消息并把传输的数据(消息)放在队列中,用队列机制来实现消息传递。
  2. 消费者可以到指定的队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。

3.MQ的几个主要特性

  • 解耦:一个业务需要多个模块共同实现,或一条消息有多个系统对应处理,只需要在主业务完成以后,发送一条MQ,其余模块消费MQ消息,即可实现业务,降低模块之间的耦合。
  • 异步:主业务执行结束后,从属业务通过MQ异步处理,减少业务的响应时间,提高用户体验。
  • 削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪。

4.MQ的缺点

  • 系统可用性降低。依赖服务越多,服务越容易挂掉。需要考虑MQ瘫痪的情况。
  • 系统的复杂性提高。需要考虑消息丢失、消息重复消费、消息传递的顺序性。
  • 业务一致性。主业务和从属业务一致性的处理。

二、RabbitMQ的简单介绍

1. 什么是RabbitMQ?

  RabbitMQ是消息代理,它接受并转发消息。

  • RabbitMQ可以理解为一个邮箱,或者一个邮局,或者是一个邮递员,保证 “张三” 的信件最终传递给 “李四”。
  • RabbitMQ与上述所描述的邮局(邮箱、邮递员)的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块消息。

2.RabbitMQ和消息传递的三个术语

  • 生产:生产只意味着发送。发送消息的程序是生产者(production)。
  • 队列:队列是位于RabbitMQ中的“邮箱”的名称。尽管消息流经RabbitMQ和应用程序,但他们只能存在于队列中。队列只受主机的内存和磁盘限制,它的本质上是一个打的消息缓冲区。许多生产者可以向一个队列发送消息,许多消费者可以尝试从一个队列接收数据。
  • 消费(接收):消费与接收具有相似的含义。一个消费者(consumer)是一个程序,主要是等待接收信息。

注意:生产者、消费者、代理不必部署在同一主机上,应用程序既可以是生产者,又可以是消费者

什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

 

三、RabbitMQ安装

3.1环境说明(本文以RabbitMQ3.8.11为例)

RabbitMQ对Erlang版本要求(Rabbit是基于Erlang编写的)

什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

 

RabbitMQ对JDK版本要求

什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

 

3.2 安装Erlang步骤(本文以windows版安装为例)

3.2.1 下载Erlang,或访问如下链接进行下载:

  http://erlang.org/download/otp_win64_23.2.exe

3.2.2 双击运行 otp_win64_23.2.exe ,点击下一步完成安装。

3.2.3 安装完成后配置环境变量,如下图所示

什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

 


什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

 

3.2.4 运行窗口输入cmd,在dos窗口输入 erl ,返回如图中所示,则代表erlang安装完成。

什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

 

3.2 安装RibbitMQ步骤(本文以windows版安装为例)

3.2.1 点击下载RibbitMQ,或访问如下链接进行下载:

  https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.11/rabbitmq-server-3.8.11.exe

3.2.2 双击运行 rabbitmq-server-3.8.11.exe,点击下一步完成安装。

3.2.3 双击RabbitMQ Service - start 运行RabbitMQ

什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

 

出现如下提示,则代表服务启动成功:

什么是MQ?什么是RabbitMQ?能做什么?简单理解一下?

 

3.2.4 访问RabbitMQ控制台

  控制台地址:    http://localhost:15672/

  控制台用户名/密码 : guest/guest

四、RabbitMQ传递消息的方式(JAVA客户端)

  • Work queues(工作队列)
  • Publish/Subscribe(发布/订阅)
  • Routing(路由)
  • Topics(主题)
  • RPC(远程过程调用)
  • Publisher Confirms(发布者确认)

环境要求:

  • JDK版本为15(1.8+即可)
  • amqp-client 5.10.0

添加依赖:

<!--ribbitMq-->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

4.1 Work queues(工作队列)

官方描述:

  工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。后台运行的工作进程将弹出任务并最终执行作业。当您运行多个worker时,任务将在它们之间共享。

代码示例:

生产者:

 1 public class NewTask {
 2 
 3     private static final String TASK_QUEUE_NAME = "task_queue";
 4 
 5     public static void main(String[] args) throws Exception{
 6 
 7         ConnectionFactory factory = new ConnectionFactory();
 8 
 9         // 设置IP
10         factory.setHost("127.0.0.1");
11 
12         // 设置端口号
13         factory.setPort(5672);
14 
15         try (Connection connection = factory.newConnection();
16              Channel channel = connection.createChannel()){
17             channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
18 
19             String message = String.join(" ", "four");
20 
21             channel.basicPublish("", TASK_QUEUE_NAME,
22                     MessageProperties.PERSISTENT_TEXT_PLAIN,
23                     message.getBytes(StandardCharsets.UTF_8));
24 
25             System.out.println(" [x] Sent '" + message + "'");
26         }
27     }
28 }

消费者:

 1 public class Worker {
 2 
 3     private static final String TASK_QUEUE_NAME = "task_queue";
 4 
 5     public static void main(String[] args )throws Exception {
 6 
 7         ConnectionFactory factory = new ConnectionFactory();
 8 
 9         // 设置IP
10         factory.setHost("127.0.0.1");
11 
12         // 设置端口号
13         factory.setPort(5672);
14 
15         final Connection connection = factory.newConnection();
16         final Channel channel = connection.createChannel();
17 
18         channel.queueDeclare(TASK_QUEUE_NAME, true, false,false,null);
19         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
20 
21         channel.basicQos(1);
22 
23         DeliverCallback deliverCallback = (comsumerTag, delivery) ->{
24             String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
25 
26             System.out.println(" [x] Received '" + message + "'");
27 
28             try {
29                 doWork(message);
30             } finally {
31                 System.out.println("[x] Done");
32                 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
33             }
34         };
35         channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, comsumerTag -> {});
36     }
37 
38     private static void doWork(String task){
39         for (char ch : task.toCharArray()){
40             if(ch == '.'){
41                 try {
42                     Thread.sleep(1000);
43                 } catch (InterruptedException e) {
44                     Thread.currentThread().interrupt();
45                 }
46             }
47         }
48     }
49 }

4.3 Publish/Subscribe(发布/订阅)

官方描述:

  RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。相反,生产者只能将消息发送到交换机。交流是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交易所必须确切知道如何处理收到的消息。是否应将其附加到特定队列?是否应该将其附加到许多队列中?还是应该丢弃它。规则由交换类型定义 。

简而言之:

  相当于我们关注了一个微信公众号,公众号每次推文我们都能及时的收到。我们就相当于消费者,公众号相当于消息中转站,文章作者相当于生产者。

代码示例:

生产者:

 1 public class EmitLog {
 2 
 3     private static final String ExCHANGE_NAME = "logs";
 4 
 5     public static void main(String[] args) throws Exception{
 6 
 7         ConnectionFactory factory = new ConnectionFactory();
 8 
 9         // 设置IP
10         factory.setHost("127.0.0.1");
11 
12         // 设置端口号
13         factory.setPort(5672);
14 
15         try (Connection connection = factory.newConnection();
16              Channel channel = connection.createChannel()){
17             channel.exchangeDeclare(ExCHANGE_NAME, "fanout");
18 
19             String message = args.length < 1 ? "info: Hello World!" : String.join(" ", args);
20 
21             channel.basicPublish(ExCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
22 
23             System.out.println(" [x] Sent '" + message + "'");
24 
25         }
26 
27     }
28 
29 }

消费者:

 1 public class ReceiveLogs {
 2 
 3     private static final String ExCHANGE_NAME = "logs";
 4 
 5     public static void main(String[] args) throws Exception{
 6         ConnectionFactory factory = new ConnectionFactory();
 7 
 8         // 设置IP
 9         factory.setHost("127.0.0.1");
10 
11         // 设置端口号
12         factory.setPort(5672);
13 
14         Connection connection = factory.newConnection();
15         Channel channel = connection.createChannel();
16 
17         channel.exchangeDeclare(ExCHANGE_NAME, "fanout");
18         String queueName = channel.queueDeclare().getQueue();
19         channel.queueBind(queueName, ExCHANGE_NAME, "");
20 
21         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
22 
23         DeliverCallback deliverCallback = (sonsumerTag, delivery) -> {
24             String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
25             System.out.println(" [x] Received '" + message + "'");
26         };
27         channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
28     }
29 } 

4.4 Routing(路由)

官方描述:

  接上例,我们可能希望将日志消息写入磁盘的程序仅接收严重错误,而不会在警告或信息日志消息上浪费磁盘空间。

简而言之:

  如果我们只想接收某些信息,比如日志级别有INFO、ERROR、DEBUG等,我们只愿接收INFO日志。可以使用Routing进行过滤。

代码示例:

生产者:

 1 public class EmitLogDirect {
 2 
 3     private static final String EXCHANGE_NAME = "direct_logs";
 4 
 5     public static void main(String[] args) throws Exception{
 6 
 7         ConnectionFactory factory = new ConnectionFactory();
 8 
 9         // 设置IP
10         factory.setHost("127.0.0.1");
11 
12         // 设置端口号
13         factory.setPort(5672);
14 
15         try (Connection connection = factory.newConnection();
16              Channel channel = connection.createChannel()){
17             channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
18 
19             String severity = getServerity(args);
20             String message = getMessage(args);
21 
22             channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes(StandardCharsets.UTF_8));
23             System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
24 
25         }
26 
27     }
28 
29     private static String getServerity(String[] strings){
30         if (strings.length < 1){
31             return "info";
32         }
33         return strings[0];
34 
35     }
36 
37     private static String getMessage(String[] strings){
38         if (strings.length < 2) {
39             return "Hello World!";
40         }
41         return joinStrings(strings, " ", 1);
42     }
43 
44     private static String joinStrings(String[] strings, String delimiter, int startIndex){
45         int length = strings.length;
46         if(length == 0){
47             return "";
48         }
49         if(length <= startIndex){
50             return "";
51         }
52         StringBuilder words = new StringBuilder(strings[startIndex]);
53         for (int i = startIndex + 1; i < length; i++){
54             words.Append(delimiter).append(strings[i]);
55         }
56         return words.toString();
57 
58     }
59 }

消费者:

 1 public class ReceiveLogsDirect {
 2 
 3     private static final String EXCHANGE_NAME = "direct_logs";
 4 
 5     public static void main(String[] args) throws Exception{
 6 
 7         ConnectionFactory factory = new ConnectionFactory();
 8 
 9         // 设置IP
10         factory.setHost("127.0.0.1");
11 
12         // 设置端口号
13         factory.setPort(5672);
14 
15         Connection connection = factory.newConnection();
16 
17         Channel channel = connection.createChannel();
18 
19         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
20 
21         String queueName = channel.queueDeclare().getQueue();
22 
23         if(args.length < 1){
24             System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
25             System.exit(1);
26         }
27 
28         for (String severity : args){
29             channel.queueBind(queueName, EXCHANGE_NAME, severity);
30         }
31         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
32 
33         DeliverCallback deliverCallback = (consumerTag, delivery)->{
34             String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
35             System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
36         };
37 
38         channel.basicConsume(queueName, true, deliverCallback, comsumerTag ->{});
39     }
40 }

4.5 Topics(主题)

官方描述:

  发送到主题交换机的消息不能具有任意的 routing_key-它必须是单词列表,以点分隔。这些词可以是任何东西,但通常它们指定与消息相关的某些功能。一些有效的路由关键示例:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。路由关键字中可以包含任意多个单词,最多255个字节。

  绑定密钥也必须采用相同的形式。主题交换背后的逻辑 类似于直接交换-用特定路由键发送的消息将传递到所有用匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况:

  • *(星)只能代替一个单词。#(散列)可以代替零个或多个单词。

简而言之:

  Topic会根据消息自身所携带的路由键(Routing Key)在所有的绑定关系中寻找,与消息相匹配的队列推送该消息。

注意:

  当在绑定中不使用特殊字符“ * ”(星号)和“ # ”(哈希)时,主题交换的行为就像直接的一样。

代码示例:

生产者:

 1 public class EmitLogTopic {
 2 
 3     private static final String EXCHANGE_NAME = "topic_logs";
 4 
 5     public static void main(String[] args) throws Exception{
 6 
 7         ConnectionFactory factory = new ConnectionFactory();
 8         // 设置IP
 9         factory.setHost("127.0.0.1");
10 
11         // 设置端口号
12         factory.setPort(5672);
13 
14         try(Connection connection = factory.newConnection();
15             Channel channel = connection.createChannel()){
16 
17             channel.exchangeDeclare(EXCHANGE_NAME, "topic");
18 
19             String routingKey = getRouting(args);
20             String message = getMessage(args);
21 
22             channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
23             System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
24         }
25     }
26 
27     private static String getRouting(String[] strings){
28         if (strings.length < 1){
29             return "anonymous.info";
30         }
31         return strings[0];
32     }
33 
34     private static String getMessage(String[] strings){
35         if (strings.length < 2){
36             return "hello world";
37         }
38         return joinStrings(strings, " ", 1);
39     }
40 
41     private static String joinStrings(String[] strings, String delimiter, int startIndex){
42         int length = strings.length;
43         if(length == 0){
44             return "";
45         }
46         if(length < startIndex){
47             return "";
48         }
49         StringBuilder words = new StringBuilder(strings[startIndex]);
50         for (int i = startIndex + 1; i < length; i++){
51             words.append(delimiter).append(strings[i]);
52         }
53         return words.toString();
54     }
55 }

消费者:

 1 public class ReceiveLogTopic {
 2 
 3     private static final String EXCHANGE_NAME = "topic_logs";
 4 
 5     public static void main(String[] args) throws Exception{
 6 
 7         ConnectionFactory factory = new ConnectionFactory();
 8         // 设置IP
 9         factory.setHost("127.0.0.1");
10 
11         // 设置端口号
12         factory.setPort(5672);
13 
14         Connection connection = factory.newConnection();
15         Channel channel = connection.createChannel();
16 
17         channel.exchangeDeclare(EXCHANGE_NAME, "topic");
18 
19         String queueName = channel.queueDeclare().getQueue();
20 
21         if(args.length < 1){
22             System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
23             System.exit(1);
24         }
25 
26         for (String bindingKey : args){
27             channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
28         }
29 
30         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
31 
32         DeliverCallback deliverCallback = (consumerTag, delivery) -> {
33             String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
34             System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
35         };
36         channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
37     }
38 }

4.6 RPC(远程过程调用)

官方描述:

  尽管RPC是计算中非常普遍的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是缓慢的RPC时,就会出现问题。这样的混乱会导致系统变幻莫测,并给调试增加了不必要的复杂性。滥用RPC可能会导致无法维护的意大利面条代码,而不是简化软件。

代码示例:

生产者

 1 public class RPCServer {
 2 
 3     private static final String RPC_QUEUE_NAME = "rpc_queue";
 4 
 5     private static int fib(int n){
 6         if(n == 0){
 7             return 0;
 8         }
 9         if(n == 1){
10             return 1;
11         }
12         return fib(n - 1) + fib(n - 2);
13     }
14 
15     public static void main(String[] args) throws Exception{
16 
17         // 创建服务器的连接
18         ConnectionFactory factory = new ConnectionFactory();
19 
20         // 设置IP
21         factory.setHost("127.0.0.1");
22 
23         // 设置端口号
24         factory.setPort(5672);
25 
26         try (Connection connection = factory.newConnection();
27              Channel channel = connection.createChannel()) {
28             channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
29             channel.queuePurge(RPC_QUEUE_NAME);
30 
31             channel.basicQos(1);
32 
33             System.out.println(" [x] Awaiting RPC requests");
34 
35             Object monitor = new Object();
36             DeliverCallback deliverCallback = (consumerTag, delivery) ->{
37                 AMQP.BasicProperties replyProps = new AMQP.BasicProperties
38                         .Builder()
39                         .correlationId(delivery.getProperties().getCorrelationId())
40                         .build();
41 
42                 String response = "";
43 
44                 try{
45                     String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
46                     int n = Integer.parseInt(message);
47 
48                     System.out.println(" [.] fib(" + message + ")");
49                     response += fib(n);
50                 }catch (RuntimeException e){
51                     System.out.println(" [.] " + e.toString());
52                 }finally {
53                     channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));
54                     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
55 
56                     // RabbitMq consumer worker thread notifies the RPC server owner thread
57                     // RabbitMq使用者工作线程通知RPC服务器所有者线程
58                     synchronized (monitor){
59                         monitor.notify();
60                     }
61                 }
62             };
63             channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));
64             // Wait and be prepared to consume the message from RPC client.
65             // 等待并准备使用来自RPC客户端的消息。
66             while(true){
67                 synchronized (monitor){
68                     try {
69                         monitor.wait();
70                     }catch (InterruptedException e){
71                         e.printStackTrace();
72                     }
73                 }
74             }
75         }
76     }
77 }

消费者:

 1 public class RPCClient {
 2 
 3     private Connection connection;
 4     private Channel channel;
 5     private String requestQueueName = "rpc_queue";
 6 
 7     public RPCClient() throws IOException, TimeoutException {
 8         // 创建服务器的连接
 9         ConnectionFactory factory = new ConnectionFactory();
10 
11         // 设置IP
12         factory.setHost("127.0.0.1");
13 
14         // 设置端口号
15         factory.setPort(5672);
16 
17         connection = factory.newConnection();
18         channel = connection.createChannel();
19     }
20 
21     public static void main(String[] args) throws Exception{
22         RPCClient fibonacciRpc = new RPCClient();
23             for (int i = 0; i < 32; i++) {
24                 String i_str = Integer.toString(i);
25                 System.out.println(" [x] Requesting fib(" + i_str + ")");
26                 String response = fibonacciRpc.call(i_str);
27                 System.out.println(" [.] Got '" + response + "'");
28             }
29 
30     }
31 
32     public String call(String message) throws IOException, InterruptedException {
33         final String corrId = UUID.randomUUID().toString();
34 
35         String replyQueueName = channel.queueDeclare().getQueue();
36         AMQP.BasicProperties props = new AMQP.BasicProperties
37                 .Builder()
38                 .correlationId(corrId)
39                 .replyTo(replyQueueName)
40                 .build();
41 
42         channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
43 
44         final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
45 
46         String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
47             if (delivery.getProperties().getCorrelationId().equals(corrId)) {
48                 response.offer(new String(delivery.getBody(), "UTF-8"));
49             }
50         }, consumerTag -> {
51         });
52 
53         String result = response.take();
54         channel.basicCancel(ctag);
55         return result;
56     }
57 
58     public void close() throws IOException {
59         connection.close();
60     }
61 }

4.7 Publisher Confirms(发布者确认)

官方描述:

  在某些应用程序中,确保将发布的消息发送到代理非常重要。发布者确认是RabbitMQ功能,可以帮助满足此要求。发布者确认本质上是异步的,但也可以同步处理它们。没有确定的方法可以实现发布者确认,这通常归结为应用程序和整个系统中的约束。典型的技术有:

  • 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
  • 批量发布消息,同步等待批量确认:简单,合理的吞吐量,但是很难推断出什么时候出了问题。
  • 异步处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是可以正确实施。

代码示例:

  1 public class PublisherConfirms {
  2 
  3     static final int MESSAGE_COUNT = 50_000;
  4 
  5     static Connection createConnection() throws Exception{
  6 
  7         ConnectionFactory cf = new ConnectionFactory();
  8 
  9         // 设置IP
 10         cf.setHost("127.0.0.1");
 11 
 12         // 设置端口号
 13         cf.setPort(5672);
 14 
 15         // 设置用户名
 16         cf.setUsername("guest");
 17 
 18         // 设置密码
 19         cf.setPassword("guest");
 20 
 21         return cf.newConnection();
 22     }
 23 
 24     public static void main(String[] args) throws Exception{
 25         publishMessagesIndividually();
 26         publishMessagesInBatch();
 27         handlePublishConfirmsAsynchronously();
 28     }
 29 
 30     static void publishMessagesIndividually() throws Exception{
 31         try(Connection connection = createConnection()){
 32             Channel ch = connection.createChannel();
 33 
 34             String queue = UUID.randomUUID().toString();
 35             ch.queueDeclare(queue, false, false, true, null);
 36 
 37             ch.confirmSelect();
 38 
 39             long start = System.nanoTime();
 40             for (int i = 0; i < MESSAGE_COUNT; i++){
 41                 String body = String.valueOf(i);
 42                 ch.basicPublish("", queue, null, body.getBytes(StandardCharsets.UTF_8));
 43                 ch.waitForConfirmsOrDie(5_000);
 44             }
 45             long end = System.nanoTime();
 46             System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
 47         }
 48     }
 49 
 50     static void publishMessagesInBatch() throws Exception {
 51         try (Connection connection = createConnection()) {
 52             Channel ch = connection.createChannel();
 53 
 54             String queue = UUID.randomUUID().toString();
 55             ch.queueDeclare(queue, false, false, true, null);
 56 
 57             ch.confirmSelect();
 58 
 59             int batchSize = 100;
 60             int outstandingMessageCount = 0;
 61             long start = System.nanoTime();
 62             for (int i = 0; i < MESSAGE_COUNT; i++) {
 63                 String body = String.valueOf(i);
 64                 ch.basicPublish("", queue, null, body.getBytes());
 65                 outstandingMessageCount++;
 66 
 67                 if (outstandingMessageCount == batchSize) {
 68                     ch.waitForConfirmsOrDie(5_000);
 69                     outstandingMessageCount = 0;
 70                 }
 71             }
 72 
 73             if (outstandingMessageCount > 0) {
 74                 ch.waitForConfirmsOrDie(5_000);
 75             }
 76             long end = System.nanoTime();
 77             System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
 78 
 79         }
 80 
 81     }
 82 
 83     static void handlePublishConfirmsAsynchronously() throws Exception {
 84         try (Connection connection = createConnection()) {
 85             Channel ch = connection.createChannel();
 86 
 87             String queue = UUID.randomUUID().toString();
 88             ch.queueDeclare(queue, false, false, true, null);
 89 
 90             ch.confirmSelect();
 91 
 92             ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
 93 
 94             ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
 95                 if (multiple) {
 96                     ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
 97                             sequenceNumber, true
 98                     );
 99                     confirmed.clear();
100                 } else {
101                     outstandingConfirms.remove(sequenceNumber);
102                 }
103             };
104 
105             ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
106                 String body = outstandingConfirms.get(sequenceNumber);
107                 System.err.format(
108                         "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
109                         body, sequenceNumber, multiple
110                 );
111                 cleanOutstandingConfirms.handle(sequenceNumber, multiple);
112             });
113 
114             long start = System.nanoTime();
115             for (int i = 0; i < MESSAGE_COUNT; i++) {
116                 String body = String.valueOf(i);
117                 outstandingConfirms.put(ch.getNextPublishSeqNo(), body);
118                 ch.basicPublish("", queue, null, body.getBytes());
119             }
120 
121             if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
122                 throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
123             }
124 
125             long end = System.nanoTime();
126             System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
127         }
128     }
129 
130     static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException {
131         int waited = 0;
132         while (!condition.getAsBoolean() && waited < timeout.toMillis()) {
133             Thread.sleep(100L);
134             waited = +100;
135         }
136         return condition.getAsBoolean();
137     }
138 }

五、总结

 总的来说,RabbitMQ还是比较简单的。目前文章只是简单记录一下,后期会更深入学习。

作者: 学海无涯519

原文链接:https://www.cnblogs.com/wgx519/p/14371511.html



Tags:RabbitMQ   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
RabbitMQ 介绍RabbitMQ 是一个由erlang语言编写的、开源的、在AMQP基础上完整的、可复用的企业消息系统。支持多种语言,包括java、Python、ruby、PHP、C/C++等。1.1.AMQP模型...【详细内容】
2021-11-17  Tags: RabbitMQ  点击:(16)  评论:(0)  加入收藏
下载Erlang和RabbitMQ官网下载地址Erlang下载地址: http://www.erlang.org/downloadsRabbitMQ下载地址: http://www.rabbitmq.com/download.html版本:( Erlang23+RabbitMQ3.8.4...【详细内容】
2021-08-31  Tags: RabbitMQ  点击:(49)  评论:(0)  加入收藏
环境:Spring Boot2.3.10 + RabbitMQ 3.8.12 + Erlang 23.2.51.1 RabbitMQ介绍RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务...【详细内容】
2021-04-22  Tags: RabbitMQ  点击:(336)  评论:(0)  加入收藏
RabbitMQ环境搭建erlang和RabbitMQ版本对应关系:https://www.rabbitmq.com/which-erlang.htmlerlang环境安装yum方式安装 yum源配置[root@iyeed RabbitMQ]# curl -s https://...【详细内容】
2021-04-14  Tags: RabbitMQ  点击:(281)  评论:(0)  加入收藏
Direct 模式# 所有发送到 Direct Exchange 的消息被转发到 RouteKey 中指定的 Queue。 Direct 模式可以使用 RabbitMQ 自带的 Exchange: default Exchange,所以不需要将 Exch...【详细内容】
2021-04-13  Tags: RabbitMQ  点击:(220)  评论:(0)  加入收藏
一、关于 RabbitMQ说到 RabbitMQ,相信大家都不会陌生,微服务开发中必不可少的中间件。 在上篇关于消息队列的文章中,我们了解到 RabbitMQ 本质其实是用 Erlang 开发的 AMQP(Adva...【详细内容】
2021-03-11  Tags: RabbitMQ  点击:(229)  评论:(0)  加入收藏
说明:想要理解RabbitMQ,需要先理解MQ是什么?能做什么?然后根据基础知识去理解RabbitMQ是什么、提供了什么功能。一、MQ的简单理解1. 什么是MQ? 消息队列(Message Queue),是基础数据...【详细内容】
2021-02-07  Tags: RabbitMQ  点击:(155)  评论:(0)  加入收藏
1、 查找Docker容器中的RabbitMQ镜像docker ps -a[root@linux ~]# docker ps -aCONTAINER ID IMAGE COMMAND CREATED...【详细内容】
2020-11-27  Tags: RabbitMQ  点击:(221)  评论:(0)  加入收藏
一、Maven依赖添加 <!-- rabbitmq相关依赖 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId>...【详细内容】
2020-08-01  Tags: RabbitMQ  点击:(53)  评论:(0)  加入收藏
一、简单的发送与接收消息 HelloWorld1. 发送消息发送消息首先要获取与rabbitmq-server的连接,然后从渠道(chann)中指定的queue发送消息 , 不能定义两个queue名字相同,但属性...【详细内容】
2020-04-03  Tags: RabbitMQ  点击:(61)  评论:(0)  加入收藏
▌简易百科推荐
摘 要 (OF作品展示)OF之前介绍了用python实现数据可视化、数据分析及一些小项目,但基本都是后端的知识。想要做一个好看的可视化大屏,我们还要学一些前端的知识(vue),网上有很多比...【详细内容】
2021-12-27  项目与数据管理    Tags:Vue   点击:(1)  评论:(0)  加入收藏
程序是如何被执行的&emsp;&emsp;程序是如何被执行的?许多开发者可能也没法回答这个问题,大多数人更注重的是如何编写程序,却不会太注意编写好的程序是如何被运行,这并不是一个好...【详细内容】
2021-12-23  IT学习日记    Tags:程序   点击:(9)  评论:(0)  加入收藏
阅读收获✔️1. 了解单点登录实现原理✔️2. 掌握快速使用xxl-sso接入单点登录功能一、早期的多系统登录解决方案 单系统登录解决方案的核心是cookie,cookie携带会话id在浏览器...【详细内容】
2021-12-23  程序yuan    Tags:单点登录(   点击:(8)  评论:(0)  加入收藏
下载Eclipse RCP IDE如果你电脑上还没有安装Eclipse,那么请到这里下载对应版本的软件进行安装。具体的安装步骤就不在这赘述了。创建第一个标准Eclipse RCP应用(总共分为六步)1...【详细内容】
2021-12-22  阿福ChrisYuan    Tags:RCP应用   点击:(7)  评论:(0)  加入收藏
今天想简单聊一聊 Token 的 Value Capture,就是币的价值问题。首先说明啊,这个话题包含的内容非常之光,Token 的经济学设计也可以包含诸多问题,所以几乎不可能把这个问题说的清...【详细内容】
2021-12-21  唐少华TSH    Tags:Token   点击:(9)  评论:(0)  加入收藏
实现效果:假如有10条数据,分组展示,默认在当前页面展示4个,点击换一批,从第5个开始继续展示,到最后一组,再重新返回到第一组 data() { return { qList: [], //处理后...【详细内容】
2021-12-17  Mason程    Tags:VUE   点击:(14)  评论:(0)  加入收藏
什么是性能调优?(what) 为什么需要性能调优?(why) 什么时候需要性能调优?(when) 什么地方需要性能调优?(where) 什么时候来进行性能调优?(who) 怎么样进行性能调优?(How) 硬件配...【详细内容】
2021-12-16  软件测试小p    Tags:性能调优   点击:(19)  评论:(0)  加入收藏
Tasker 是一款适用于 Android 设备的高级自动化应用,它可以通过脚本让重复性的操作自动运行,提高效率。 不知道从哪里听说的抖音 app 会导致 OLED 屏幕烧屏。于是就现学现卖,自...【详细内容】
2021-12-15  ITBang    Tags:抖音防烧屏   点击:(23)  评论:(0)  加入收藏
11 月 23 日,Rust Moderation Team(审核团队)在 GitHub 上发布了辞职公告,即刻生效。根据公告,审核团队集体辞职是为了抗议 Rust 核心团队(Core team)在执行社区行为准则和标准上...【详细内容】
2021-12-15  InfoQ    Tags:Rust   点击:(24)  评论:(0)  加入收藏
一个项目的大部分API,测试用例在参数和参数值等信息会有很多相似的地方。我们可以复制API,复制用例来快速生成,然后做细微调整既可以满足我们的测试需求1.复制API:在菜单发布单...【详细内容】
2021-12-14  AutoMeter    Tags:AutoMeter   点击:(20)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条