背景
最近项目有个需求需要动态更新规则,当时脑中想到的第一个方案是利用zk的监听机制,管理人员更新完规则将状态写入zk,集群中的机器监听zk的状态,当有状态变更后,集群中的机器开始拉取最新的配置。但由于公司技术选型,没有专门搭建zk集群,因此也不可能为这一个小需求去搭建zk集群。图为使用zk监听状态变化的流程。
最后只好退而求其次,想到了使用redis的队列来做规则的更新
消息队列
首先做简单的引入。
从队列和消息队列的定义看来,看不出什么相似之处。但我理解它们的作用是相似的,只是使用环境不同。队列和消息队列 本质上都可以用于解决“生产者”和“消费者”问题,在二者这间建立桥梁,it中专业术语是对“生产者”和“消费者”进行解耦。可以动态的通过调整“生产者”和“消费者”线程数或服务器实例数,在正常情况使消费和生产到达一个平衡;在高峰情况下(生产者大于消费者)可以保护消费者不被拖垮的同时,还可以对把积压的数据保存下来,消费者可以延迟消费这些数据进行处理。
队列 一般指的是单个服务实例内部使用,比如,在JAVA中的一个jvm实例内部可以使用Queue的子类(Deque:双端队列,是Queue的子接口),比如:单线程情况下使用LinkedList(无界)、PriorityQueue(优先队列);多线程情况下可以阻塞队列ArrayBlockingQueue(有界)、LinkedBlockingQueue(无界)、DelayQueue(延迟队列 无界)、PriorityBlockingQueue(优先 无界)、SynchronousQueue(没有容量的队列)。可以看到java的api已经很强大了,可以根据自己的业务需求选择使用。使用方法:生产者从一端放入消息,消费者从另一端取出消息进行处理,消息放到队列里(感觉是不是有点像“消息队列”的定义)。
MQ主要是用来:
目前使用的较多的有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。
另外上面提到的“有界”和“无界”,指的是队列的容量大小。有界 指的是创建队列时必须指定队列的容量;无界 创建队列时无需指定队列的容量,容量大小取决于jvm实例分配的内存空间大小。在海量业务场景里,我们期望队列的容量是无限的,但单个jvm实例 即便是使用“无界”队列 由于单个实例内存是有限的,最终无法容纳下海量的消息数据。聪明的程序员就想 能不能使用一个第三方的队列来存储这些数据呢?当然是可以的,这就产生了“消息队列”。
消息队列 一般是采用一个独立的集群专门用于消息存储,可以存储在内存里 也可以直接存储在磁盘中。比如常见的:RabbitMQ、kafka、rocketMQ、ActiveMQ、zeromq等等,它们有不同的特性,以及采用了各种不同的实现,适用于各种场景的消息任务分发。但他们本质作用跟上面讲的单实例环境中java“队列”没什么两样:在消息的传输过程中保存消息的容器。只是这里转换到“分布式”环境中而已。
可以看到这里这里提到的“传统”消息队列,都是一个很重型的集群。如果这个分布式环境中的消息数量有限,我们可以不必引入这种重型的mq框架。比如:本次分享的主题 如何使用redis实现“消息队列”。
redis 实现消息队列
redis有一个数据类型叫list(列表),它的每个子元素都是 string 类型的双向链表。我们可以通过 push,pop 操作从链表的头部或者尾部添加删除元素。这使得 list 既可以用作栈,也可以用作队列。
假如,我们有一个队列系统,把一个个任务放到队列中,另一个进程就把队列中的任务取出来执行。
放到队列我们使用LPUSH,也就是往双向链表的尾部填充一个元素,这一端也叫生产者,是产生内容的一端。
另一个进程使用RPOP往头部取出元素来执行,这一端也叫消费者。
如果仅仅是这种方式来实现队列,它就是需要进程不断地循环队列,判断队列是不是有新元素,有的话就取出来执行,没有的话,就继续循环,但是这个总有一个时间间隔,你总得规定每隔一段时间去循环,虽然这个时间很小,但总有延迟,这种方式叫作轮循。有没有一种方式就是让不断执行一个redis命令,而redis中的列队有值就会通过命令通知程序呢?有的,那就是阻塞操作的RPOP,它叫作BRPOP。
我们来演示一下它是如何实现的。
$ redis-cli 127.0.0.1:6379> BRPOP list1 0
先执行BRPOP,假如队列list1没有值,它会返回nil,并且阻塞在那,在等另一个程序或进程往list1中填值。
我们开启另一个redis端终。
$ redis-cli 127.0.0.1:6379> LPUSH list1 a (integer) 1
我们再来看之前的结果。
127.0.0.1:6379> BRPOP list1 0 1) "list1" 2) "a" (16.99s)
这样就能把列表的值给取到了。
优点
不足
通过pub/sub来实现
实现机制
订阅,取消订阅和发布实现了发布/订阅消息范式,发布者不是计划发送消息给特定的订阅者。而是发布的消息分到不同的频道,不需要知道什么样的订阅者订阅。订阅者对一个或多个频道感兴趣,只需接收感兴趣的消息,不需要知道什么样的发布者发布的。
这是一种基于非持久化的消息机制,消息发布者和订阅者必须同时在线,否则一旦消息订阅者由于各种异常情况而被迫断开连接,在其重新连接后,其离线期间的消息是无法被重新通知的(即发即弃)。
Redis中的消息可以提供两种不同的功能。一类是基于Channel的消息,这一类消息和Redis中存储的Keys没有太多关联,也就是说即使不在Redis中存储任何Keys信息,这类消息也可以独立使用。另一类消息可以对(也可以不对)Redis中存储的Keys信息的变化事件进行通知,可以用来向订阅者通知Redis中符合订阅条件的Keys的各种事件。
通过springboot 构建redis消息队列
首先springboot配置文件配置如下:
spring.redis.host=localhost spring.redis.port=6379
消息生产者,注入redisTemplate,用convertAndSend发送消息
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; @Service public class PublishService { @Autowired private StringRedisTemplate stringRedisTemplate; public void sendMsg(String channel, String msg) { stringRedisTemplate.convertAndSend(channel, msg); } }
消费者:创建一个接收消息的类,继承MessageListener,也可以不继承
import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @Slf4j @Service public class RedisReceiver { public void receiveMessage(String message) { log.info("receive message is {}",message); } }
消息订阅者配置类:
import com.wuzy.queue.RedisReceiver; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.stereotype.Component; /** * redis 监听配置 */ @Configuration public class RedisSubListenerConfig { /** * 初始化监听器 * * @param connectionFactory * @param listenerAdapter * @return */ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listenerAdapter, new PatternTopic("channel_1")); // new PatternTopic("这里是监听的通道的名字") 通道要和发布者发布消息的通道一致 return container; } /** * 绑定消息监听者和接收监听的方法 * * @param redisReceiver * @return */ @Bean MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) { // redisReceiver 消息接收者 // receiveMessage 消息接收后的方法 MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(); messageListenerAdapter.setDefaultListenerMethod("receiveMessage"); messageListenerAdapter.setDelegate(redisReceiver); return messageListenerAdapter; } @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } }
优点
不足