说明:想要理解RabbitMQ,需要先理解MQ是什么?能做什么?然后根据基础知识去理解RabbitMQ是什么、提供了什么功能。
RabbitMQ是消息代理,它接受并转发消息。
注意:生产者、消费者、代理不必部署在同一主机上,应用程序既可以是生产者,又可以是消费者
RabbitMQ对Erlang版本要求(Rabbit是基于Erlang编写的)
RabbitMQ对JDK版本要求
3.2.1 下载Erlang,或访问如下链接进行下载:
http://erlang.org/download/otp_win64_23.2.exe
3.2.2 双击运行 otp_win64_23.2.exe ,点击下一步完成安装。
3.2.3 安装完成后配置环境变量,如下图所示
3.2.4 运行窗口输入cmd,在dos窗口输入 erl ,返回如图中所示,则代表erlang安装完成。
3.2.1 点击下载RibbitMQ,或访问如下链接进行下载:
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.11/rabbitmq-server-3.8.11.exe
3.2.2 双击运行 rabbitmq-server-3.8.11.exe,点击下一步完成安装。
3.2.3 双击RabbitMQ Service - start 运行RabbitMQ
出现如下提示,则代表服务启动成功:
3.2.4 访问RabbitMQ控制台
控制台地址: http://localhost:15672/
控制台用户名/密码 : guest/guest
环境要求:
添加依赖:
<!--ribbitMq-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
官方描述:
工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。后台运行的工作进程将弹出任务并最终执行作业。当您运行多个worker时,任务将在它们之间共享。
代码示例:
生产者:
1 public class NewTask {
2
3 private static final String TASK_QUEUE_NAME = "task_queue";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8
9 // 设置IP
10 factory.setHost("127.0.0.1");
11
12 // 设置端口号
13 factory.setPort(5672);
14
15 try (Connection connection = factory.newConnection();
16 Channel channel = connection.createChannel()){
17 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
18
19 String message = String.join(" ", "four");
20
21 channel.basicPublish("", TASK_QUEUE_NAME,
22 MessageProperties.PERSISTENT_TEXT_PLAIN,
23 message.getBytes(StandardCharsets.UTF_8));
24
25 System.out.println(" [x] Sent '" + message + "'");
26 }
27 }
28 }
消费者:
1 public class Worker {
2
3 private static final String TASK_QUEUE_NAME = "task_queue";
4
5 public static void main(String[] args )throws Exception {
6
7 ConnectionFactory factory = new ConnectionFactory();
8
9 // 设置IP
10 factory.setHost("127.0.0.1");
11
12 // 设置端口号
13 factory.setPort(5672);
14
15 final Connection connection = factory.newConnection();
16 final Channel channel = connection.createChannel();
17
18 channel.queueDeclare(TASK_QUEUE_NAME, true, false,false,null);
19 System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
20
21 channel.basicQos(1);
22
23 DeliverCallback deliverCallback = (comsumerTag, delivery) ->{
24 String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
25
26 System.out.println(" [x] Received '" + message + "'");
27
28 try {
29 doWork(message);
30 } finally {
31 System.out.println("[x] Done");
32 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
33 }
34 };
35 channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, comsumerTag -> {});
36 }
37
38 private static void doWork(String task){
39 for (char ch : task.toCharArray()){
40 if(ch == '.'){
41 try {
42 Thread.sleep(1000);
43 } catch (InterruptedException e) {
44 Thread.currentThread().interrupt();
45 }
46 }
47 }
48 }
49 }
官方描述:
RabbitMQ消息传递模型中的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者经常甚至根本不知道是否将消息传递到任何队列。相反,生产者只能将消息发送到交换机。交流是一件非常简单的事情。一方面,它接收来自生产者的消息,另一方面,将它们推入队列。交易所必须确切知道如何处理收到的消息。是否应将其附加到特定队列?是否应该将其附加到许多队列中?还是应该丢弃它。规则由交换类型定义 。
简而言之:
相当于我们关注了一个微信公众号,公众号每次推文我们都能及时的收到。我们就相当于消费者,公众号相当于消息中转站,文章作者相当于生产者。
代码示例:
生产者:
1 public class EmitLog {
2
3 private static final String ExCHANGE_NAME = "logs";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8
9 // 设置IP
10 factory.setHost("127.0.0.1");
11
12 // 设置端口号
13 factory.setPort(5672);
14
15 try (Connection connection = factory.newConnection();
16 Channel channel = connection.createChannel()){
17 channel.exchangeDeclare(ExCHANGE_NAME, "fanout");
18
19 String message = args.length < 1 ? "info: Hello World!" : String.join(" ", args);
20
21 channel.basicPublish(ExCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
22
23 System.out.println(" [x] Sent '" + message + "'");
24
25 }
26
27 }
28
29 }
消费者:
1 public class ReceiveLogs {
2
3 private static final String ExCHANGE_NAME = "logs";
4
5 public static void main(String[] args) throws Exception{
6 ConnectionFactory factory = new ConnectionFactory();
7
8 // 设置IP
9 factory.setHost("127.0.0.1");
10
11 // 设置端口号
12 factory.setPort(5672);
13
14 Connection connection = factory.newConnection();
15 Channel channel = connection.createChannel();
16
17 channel.exchangeDeclare(ExCHANGE_NAME, "fanout");
18 String queueName = channel.queueDeclare().getQueue();
19 channel.queueBind(queueName, ExCHANGE_NAME, "");
20
21 System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
22
23 DeliverCallback deliverCallback = (sonsumerTag, delivery) -> {
24 String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
25 System.out.println(" [x] Received '" + message + "'");
26 };
27 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
28 }
29 }
官方描述:
接上例,我们可能希望将日志消息写入磁盘的程序仅接收严重错误,而不会在警告或信息日志消息上浪费磁盘空间。
简而言之:
如果我们只想接收某些信息,比如日志级别有INFO、ERROR、DEBUG等,我们只愿接收INFO日志。可以使用Routing进行过滤。
代码示例:
生产者:
1 public class EmitLogDirect {
2
3 private static final String EXCHANGE_NAME = "direct_logs";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8
9 // 设置IP
10 factory.setHost("127.0.0.1");
11
12 // 设置端口号
13 factory.setPort(5672);
14
15 try (Connection connection = factory.newConnection();
16 Channel channel = connection.createChannel()){
17 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
18
19 String severity = getServerity(args);
20 String message = getMessage(args);
21
22 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes(StandardCharsets.UTF_8));
23 System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
24
25 }
26
27 }
28
29 private static String getServerity(String[] strings){
30 if (strings.length < 1){
31 return "info";
32 }
33 return strings[0];
34
35 }
36
37 private static String getMessage(String[] strings){
38 if (strings.length < 2) {
39 return "Hello World!";
40 }
41 return joinStrings(strings, " ", 1);
42 }
43
44 private static String joinStrings(String[] strings, String delimiter, int startIndex){
45 int length = strings.length;
46 if(length == 0){
47 return "";
48 }
49 if(length <= startIndex){
50 return "";
51 }
52 StringBuilder words = new StringBuilder(strings[startIndex]);
53 for (int i = startIndex + 1; i < length; i++){
54 words.Append(delimiter).append(strings[i]);
55 }
56 return words.toString();
57
58 }
59 }
消费者:
1 public class ReceiveLogsDirect {
2
3 private static final String EXCHANGE_NAME = "direct_logs";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8
9 // 设置IP
10 factory.setHost("127.0.0.1");
11
12 // 设置端口号
13 factory.setPort(5672);
14
15 Connection connection = factory.newConnection();
16
17 Channel channel = connection.createChannel();
18
19 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
20
21 String queueName = channel.queueDeclare().getQueue();
22
23 if(args.length < 1){
24 System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
25 System.exit(1);
26 }
27
28 for (String severity : args){
29 channel.queueBind(queueName, EXCHANGE_NAME, severity);
30 }
31 System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
32
33 DeliverCallback deliverCallback = (consumerTag, delivery)->{
34 String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
35 System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
36 };
37
38 channel.basicConsume(queueName, true, deliverCallback, comsumerTag ->{});
39 }
40 }
官方描述:
发送到主题交换机的消息不能具有任意的 routing_key-它必须是单词列表,以点分隔。这些词可以是任何东西,但通常它们指定与消息相关的某些功能。一些有效的路由关键示例:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。路由关键字中可以包含任意多个单词,最多255个字节。
绑定密钥也必须采用相同的形式。主题交换背后的逻辑 类似于直接交换-用特定路由键发送的消息将传递到所有用匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况:
简而言之:
Topic会根据消息自身所携带的路由键(Routing Key)在所有的绑定关系中寻找,与消息相匹配的队列推送该消息。
注意:
当在绑定中不使用特殊字符“ * ”(星号)和“ # ”(哈希)时,主题交换的行为就像直接的一样。
代码示例:
生产者:
1 public class EmitLogTopic {
2
3 private static final String EXCHANGE_NAME = "topic_logs";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8 // 设置IP
9 factory.setHost("127.0.0.1");
10
11 // 设置端口号
12 factory.setPort(5672);
13
14 try(Connection connection = factory.newConnection();
15 Channel channel = connection.createChannel()){
16
17 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
18
19 String routingKey = getRouting(args);
20 String message = getMessage(args);
21
22 channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
23 System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
24 }
25 }
26
27 private static String getRouting(String[] strings){
28 if (strings.length < 1){
29 return "anonymous.info";
30 }
31 return strings[0];
32 }
33
34 private static String getMessage(String[] strings){
35 if (strings.length < 2){
36 return "hello world";
37 }
38 return joinStrings(strings, " ", 1);
39 }
40
41 private static String joinStrings(String[] strings, String delimiter, int startIndex){
42 int length = strings.length;
43 if(length == 0){
44 return "";
45 }
46 if(length < startIndex){
47 return "";
48 }
49 StringBuilder words = new StringBuilder(strings[startIndex]);
50 for (int i = startIndex + 1; i < length; i++){
51 words.append(delimiter).append(strings[i]);
52 }
53 return words.toString();
54 }
55 }
消费者:
1 public class ReceiveLogTopic {
2
3 private static final String EXCHANGE_NAME = "topic_logs";
4
5 public static void main(String[] args) throws Exception{
6
7 ConnectionFactory factory = new ConnectionFactory();
8 // 设置IP
9 factory.setHost("127.0.0.1");
10
11 // 设置端口号
12 factory.setPort(5672);
13
14 Connection connection = factory.newConnection();
15 Channel channel = connection.createChannel();
16
17 channel.exchangeDeclare(EXCHANGE_NAME, "topic");
18
19 String queueName = channel.queueDeclare().getQueue();
20
21 if(args.length < 1){
22 System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
23 System.exit(1);
24 }
25
26 for (String bindingKey : args){
27 channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
28 }
29
30 System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
31
32 DeliverCallback deliverCallback = (consumerTag, delivery) -> {
33 String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
34 System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
35 };
36 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
37 }
38 }
官方描述:
尽管RPC是计算中非常普遍的模式,但它经常受到批评。当程序员不知道函数调用是本地的还是缓慢的RPC时,就会出现问题。这样的混乱会导致系统变幻莫测,并给调试增加了不必要的复杂性。滥用RPC可能会导致无法维护的意大利面条代码,而不是简化软件。
代码示例:
生产者
1 public class RPCServer {
2
3 private static final String RPC_QUEUE_NAME = "rpc_queue";
4
5 private static int fib(int n){
6 if(n == 0){
7 return 0;
8 }
9 if(n == 1){
10 return 1;
11 }
12 return fib(n - 1) + fib(n - 2);
13 }
14
15 public static void main(String[] args) throws Exception{
16
17 // 创建服务器的连接
18 ConnectionFactory factory = new ConnectionFactory();
19
20 // 设置IP
21 factory.setHost("127.0.0.1");
22
23 // 设置端口号
24 factory.setPort(5672);
25
26 try (Connection connection = factory.newConnection();
27 Channel channel = connection.createChannel()) {
28 channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
29 channel.queuePurge(RPC_QUEUE_NAME);
30
31 channel.basicQos(1);
32
33 System.out.println(" [x] Awaiting RPC requests");
34
35 Object monitor = new Object();
36 DeliverCallback deliverCallback = (consumerTag, delivery) ->{
37 AMQP.BasicProperties replyProps = new AMQP.BasicProperties
38 .Builder()
39 .correlationId(delivery.getProperties().getCorrelationId())
40 .build();
41
42 String response = "";
43
44 try{
45 String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
46 int n = Integer.parseInt(message);
47
48 System.out.println(" [.] fib(" + message + ")");
49 response += fib(n);
50 }catch (RuntimeException e){
51 System.out.println(" [.] " + e.toString());
52 }finally {
53 channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));
54 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
55
56 // RabbitMq consumer worker thread notifies the RPC server owner thread
57 // RabbitMq使用者工作线程通知RPC服务器所有者线程
58 synchronized (monitor){
59 monitor.notify();
60 }
61 }
62 };
63 channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));
64 // Wait and be prepared to consume the message from RPC client.
65 // 等待并准备使用来自RPC客户端的消息。
66 while(true){
67 synchronized (monitor){
68 try {
69 monitor.wait();
70 }catch (InterruptedException e){
71 e.printStackTrace();
72 }
73 }
74 }
75 }
76 }
77 }
消费者:
1 public class RPCClient {
2
3 private Connection connection;
4 private Channel channel;
5 private String requestQueueName = "rpc_queue";
6
7 public RPCClient() throws IOException, TimeoutException {
8 // 创建服务器的连接
9 ConnectionFactory factory = new ConnectionFactory();
10
11 // 设置IP
12 factory.setHost("127.0.0.1");
13
14 // 设置端口号
15 factory.setPort(5672);
16
17 connection = factory.newConnection();
18 channel = connection.createChannel();
19 }
20
21 public static void main(String[] args) throws Exception{
22 RPCClient fibonacciRpc = new RPCClient();
23 for (int i = 0; i < 32; i++) {
24 String i_str = Integer.toString(i);
25 System.out.println(" [x] Requesting fib(" + i_str + ")");
26 String response = fibonacciRpc.call(i_str);
27 System.out.println(" [.] Got '" + response + "'");
28 }
29
30 }
31
32 public String call(String message) throws IOException, InterruptedException {
33 final String corrId = UUID.randomUUID().toString();
34
35 String replyQueueName = channel.queueDeclare().getQueue();
36 AMQP.BasicProperties props = new AMQP.BasicProperties
37 .Builder()
38 .correlationId(corrId)
39 .replyTo(replyQueueName)
40 .build();
41
42 channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
43
44 final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
45
46 String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
47 if (delivery.getProperties().getCorrelationId().equals(corrId)) {
48 response.offer(new String(delivery.getBody(), "UTF-8"));
49 }
50 }, consumerTag -> {
51 });
52
53 String result = response.take();
54 channel.basicCancel(ctag);
55 return result;
56 }
57
58 public void close() throws IOException {
59 connection.close();
60 }
61 }
官方描述:
在某些应用程序中,确保将发布的消息发送到代理非常重要。发布者确认是RabbitMQ功能,可以帮助满足此要求。发布者确认本质上是异步的,但也可以同步处理它们。没有确定的方法可以实现发布者确认,这通常归结为应用程序和整个系统中的约束。典型的技术有:
代码示例:
1 public class PublisherConfirms {
2
3 static final int MESSAGE_COUNT = 50_000;
4
5 static Connection createConnection() throws Exception{
6
7 ConnectionFactory cf = new ConnectionFactory();
8
9 // 设置IP
10 cf.setHost("127.0.0.1");
11
12 // 设置端口号
13 cf.setPort(5672);
14
15 // 设置用户名
16 cf.setUsername("guest");
17
18 // 设置密码
19 cf.setPassword("guest");
20
21 return cf.newConnection();
22 }
23
24 public static void main(String[] args) throws Exception{
25 publishMessagesIndividually();
26 publishMessagesInBatch();
27 handlePublishConfirmsAsynchronously();
28 }
29
30 static void publishMessagesIndividually() throws Exception{
31 try(Connection connection = createConnection()){
32 Channel ch = connection.createChannel();
33
34 String queue = UUID.randomUUID().toString();
35 ch.queueDeclare(queue, false, false, true, null);
36
37 ch.confirmSelect();
38
39 long start = System.nanoTime();
40 for (int i = 0; i < MESSAGE_COUNT; i++){
41 String body = String.valueOf(i);
42 ch.basicPublish("", queue, null, body.getBytes(StandardCharsets.UTF_8));
43 ch.waitForConfirmsOrDie(5_000);
44 }
45 long end = System.nanoTime();
46 System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
47 }
48 }
49
50 static void publishMessagesInBatch() throws Exception {
51 try (Connection connection = createConnection()) {
52 Channel ch = connection.createChannel();
53
54 String queue = UUID.randomUUID().toString();
55 ch.queueDeclare(queue, false, false, true, null);
56
57 ch.confirmSelect();
58
59 int batchSize = 100;
60 int outstandingMessageCount = 0;
61 long start = System.nanoTime();
62 for (int i = 0; i < MESSAGE_COUNT; i++) {
63 String body = String.valueOf(i);
64 ch.basicPublish("", queue, null, body.getBytes());
65 outstandingMessageCount++;
66
67 if (outstandingMessageCount == batchSize) {
68 ch.waitForConfirmsOrDie(5_000);
69 outstandingMessageCount = 0;
70 }
71 }
72
73 if (outstandingMessageCount > 0) {
74 ch.waitForConfirmsOrDie(5_000);
75 }
76 long end = System.nanoTime();
77 System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
78
79 }
80
81 }
82
83 static void handlePublishConfirmsAsynchronously() throws Exception {
84 try (Connection connection = createConnection()) {
85 Channel ch = connection.createChannel();
86
87 String queue = UUID.randomUUID().toString();
88 ch.queueDeclare(queue, false, false, true, null);
89
90 ch.confirmSelect();
91
92 ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
93
94 ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
95 if (multiple) {
96 ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
97 sequenceNumber, true
98 );
99 confirmed.clear();
100 } else {
101 outstandingConfirms.remove(sequenceNumber);
102 }
103 };
104
105 ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
106 String body = outstandingConfirms.get(sequenceNumber);
107 System.err.format(
108 "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
109 body, sequenceNumber, multiple
110 );
111 cleanOutstandingConfirms.handle(sequenceNumber, multiple);
112 });
113
114 long start = System.nanoTime();
115 for (int i = 0; i < MESSAGE_COUNT; i++) {
116 String body = String.valueOf(i);
117 outstandingConfirms.put(ch.getNextPublishSeqNo(), body);
118 ch.basicPublish("", queue, null, body.getBytes());
119 }
120
121 if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
122 throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
123 }
124
125 long end = System.nanoTime();
126 System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
127 }
128 }
129
130 static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException {
131 int waited = 0;
132 while (!condition.getAsBoolean() && waited < timeout.toMillis()) {
133 Thread.sleep(100L);
134 waited = +100;
135 }
136 return condition.getAsBoolean();
137 }
138 }
总的来说,RabbitMQ还是比较简单的。目前文章只是简单记录一下,后期会更深入学习。
作者: 学海无涯519
原文链接:https://www.cnblogs.com/wgx519/p/14371511.html