您当前的位置:首页 > 电脑百科 > 数据库 > Redis

Redis-消息队列的两种实现方式

时间:2020-01-02 15:52:28  来源:  作者:

索引:

  1. 基于list的实现方式
  2. 基于publish/subscribe
  3. 实战

消息队列简介

消息队列:是消息的顺序集合。
比如网站的PV统计和查看,传统方式就是每个页面发一个AJAX然后MySQL给PV+1。用户量非常大的时候,没有办法实时插入PV。
结合redis消息队列的实现,也是每个用户访问的时候发送ajax到控制器,这个时候redis每次rpush pvlog,相当于直接往数组后面插入一万个行为,接下来用一个脚本运输处理pvlog,set pv查看时候get pv,如果想处理用户请求时间等等,同样可以这样异步处理。

常见场景和解决的问题:

应对流量峰值
异步消费(不定速的插入,生产和匀速的处理,消费)
解耦应用(不同来源的生产和同步去向的消费,基于publish/subscribe实现),即消息队列作为消息池,同时往里面写入的可能有多种数据,根据不同的场景来进行消费。
redis实现消息队列原理
使用redis实现的最主要优势是简单快捷,性能没有kafka高,但是安装简便,kafka性能高但是比较重,如果消息队列不是很多,比如说一个博客计算pv,那么kafka可能比整个项目还要大。

实现方式:

方法一: 基于list的实现方式

 

Redis-消息队列的两种实现方式

 

Screen-Shot-2019-04-06-at-7.01.54-PM.png

 

核心代码
没有用消息队列的方式,使用incrBy大概上限在1000万

<?php$redis = new Redis();$redis->connect('127.0.0.1', 6380);$res->select(0);$key = 'pv:index';//  看一下是不是没有,如果没有的话就设置成0if(false === $redis->get($key)){    $redis->set($key, 0);}// 如果有了就增加1 注意incrBy的上线大概在1000万$redis->incrBy($key,1);`

用list来实现

// 用list消息队列实现$key = 'listpv:index';$redis->rPush($key, 1);// 后台的cron来实现// 先连接redis$redis = new Redis();$redis->connect('127.0.0.1', 6380);$res->select(0);$key = 'pv:index';// 用一个死循环while(true){    if(false !== $redis->lPop($key))    {        $redis->incrBy('pv:index');    }}// 然后在terminal中无线循环这个脚本

基于list来实现消息队列的特点:

与水库相似的地方:

水库的容量决定承载能力 — redis的容量决定业务的承载能力
每一滴随只可能经过一个闸门 — 每条消息只能被一个消费者消费
与水库不同的地方:

水库用于蓄水 — 一般要把消息全部消费掉
不要的随扔掉 — 处理失败的消息要做容错

方法二:基于publish/subscribe

 

Redis-消息队列的两种实现方式

 

Screen-Shot-2019-04-06-at-7.53.22-PM.png


频道固定,生产者和消费者不固定,可能一对多,也可能多对一,也可能多对多。
命令行的实现方式:

 

首先起两个redis cli,一个作为订阅,一个作为发布

订阅者:SUBSCRIBE channel1 channel2

发布者:
PUBLISH channel1 helloChannel
PUBLISH channel2 helloChannel2

  • 如果这个时候发布到一个没有被订阅的channel,那么这条消息就会丢失。
  • 如果有多个订阅了同一个channel,但有信息发布到同一个channel的时候,他们都会受到
    代码实现:

发布者:

<?php// 发布者$redis = new Redis();$redis->connect('127.0.0.1', 6380);$res = $redis->publish('c1','hello c1');echo "clients reading c1:{$res}n";$res = $redis->publish('c2','hello c2');echo "clients reading c2:{$res}n";$res = $redis->publish('c3','hello c3');echo "clients reading c3:{$res}n";

监听者:

<?php$redis = new Redis();$redis->connect('127.0.0.1', 6380);// 超时控制$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);// 订阅$redis->subscribe(['c1','c2'],function(Redis $instance, $channel, $message){    echo "received message form {$channel} : {$message}n";})​```php### 实战部分-生成内容页的质量分:实现三个功能:- 统计首页、列表页、内容页的PV- 统计浏览时间超过5s的内容页- 内容页的PV+1分,浏览时间超过5s+5分,不超过5秒-1分,生成内容页的质量分前端部分:​```JAVAscript<script>// ajax 访问ajax.php,给内容页增加PV$get('ajax.php?action=pv&from=article&aid=<?=$aid?>');// 如果页面打开时间超过5秒,则发出统计setTimeout(function(){    $.get('ajax.php?action=get5&aid=<?=$aid?>');},5000);</script>

后端部分:
发布的实现:

<?php$action = $_GET['action'];$redis = new Redis();$redis->connect('127.0.0.1', 6380);// 首页的PV$channelPvIndex = 'pv:index'; //内容页的pv$channelPvList ='pv:list'; // 内容页的PV$channelPvArticle = 'pv:article'; // 内容页浏览超过5秒$channelGT5 = 'gt5:article';if('pv' === $action){    $from = $_GET['from'];    if('index' === $from)    {        $redis->publish($channelPVIndex, 1);    }    else if('list' === $from)    {        $tid = intval($_GET['tid']);        $redis->publish($channelPvList, $tid);    }    else if('articel' === $_GET['aid'])    {        $aid = intval($_GET['aid']);        $redis->publish($channelPvArticle, $aid);    }}else if('gt5' == $action){    $aid = intval($GET['aid'])    {        $redis->publish($channelGT5, $aid);    }}else{    // unknown action}

订阅的实现:

<?php// 订阅的实现$redis = new Redis();$redis->connect('127.0.0.1', 6380);// 首页的PV$channelPvIndex = 'pv:index'; //内容页的pv$channelPvList ='pv:list'; // 内容页的PV$channelPvArticle = 'pv:article'; // 内容页浏览超过5秒$channelGT5 = 'gt5:article';// 频道和PV的key的映射$keyMap = [    $channelPVIndex => 'realtimepv:index',    $channelPvList => 'realtimepv:list',    $channelPvArticle => 'realtimepv:article'];$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);$redis->subscribe(    [$channelPVIndex,$channelPvList,$channelPvArticle,$channelGT5],    function(Redis $instance, $channel, $message){    // 注意在subscribe的回调中只能够执行订阅、取消订阅、模式订阅、模式取消订阅,无法执行incrBy    // 尝试取消订阅命令(证明上面一句话)    // $instance->unsubscribe([$channelName]);    // 因此要想incryBy只能重新实例化一个redis    $redis2 = new Redis();    $redis2->connect('127.0.0.1',6380);    global $keyMap; //这里可以使用闭包实现    if(!isset($$keyMap[$channelName]))    {        $realTimePvKey = $keyMap[$channelName]; // 映射过来        $redis2->incrBy($realTimePvKey, 1);    }}    )

注意在subscribe的回调中只能够执行订阅、取消订阅、模式订阅、模式取消订阅,无法执行incrBy,因此要想incryBy只能重新实例化一个redis

计算质量分:

<?php// 订阅的实现$redis = new Redis();$redis->connect('127.0.0.1', 6380);// 内容页的PV$channelPvArticle = 'pv:article'; // 内容页浏览超过5秒$channelGT5 = 'gt5:article';$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);$redis->subscribe([$channelPVIndex,$channelPvList,$channelPvArticle,$channelGT5],    function(Redis $instance, $channel, $message){        /**         * 使用pv和gt5的数量计算文章的质量分         * 1. 如何计算? = gt5*6         * 2. 以什么形式保存? HASH         * gt5: int         * score: int         */        $redis2 = new Redis();        $redis2->connect('127.0.0.1',6380);        global $keyMap; //这里可以使用闭包实现        if('gt5:article' === $channelName)        {            echo "${channelName}n";            $key = 'realtimescore:'.intval($message);            $res = $redis2->hIncrBy($keym 'gt5', 1);            echo "${channelName}n";            if($res)            {                $score = $res * 6;                $redis2->hSet($key, 'score', $score);                echo "{$score}n";            }            else            {                // 报警            }        }    })

本人文章均为原创,转载请注明出处。



Tags:Redis 消息队列   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
索引: 基于list的实现方式 基于publish/subscribe 实战消息队列简介消息队列:是消息的顺序集合。 比如网站的PV统计和查看,传统方式就是每个页面发一个AJAX然后mysql给PV+1。用...【详细内容】
2020-01-02  Tags: Redis 消息队列  点击:(85)  评论:(0)  加入收藏
Redis以内存数据库而闻名。但是,某些系统将它用作消息队列管理工具。Pub/Sub 和 RPOPLPUSH 是用于实现这样一个系统的两组命令。在这篇文章中,我将分享一些关于这两个命令集的...【详细内容】
2019-11-14  Tags: Redis 消息队列  点击:(111)  评论:(0)  加入收藏
▌简易百科推荐
来源: my.oschina.net/xiaomu0082/blog/2990388首先说下问题现象:内网sandbox环境API持续1周出现应用卡死,所有api无响应现象刚开始当测试抱怨环境响应慢的时候 ,我们重启一下应...【详细内容】
2021-12-08  Java识堂    Tags:Redis   点击:(18)  评论:(0)  加入收藏
我不知道为什么你会选择对特定数量的“错误”(或警告)如此具体。听起来您正在寻找将要发布到 Yahoo! 的某些文章的内容。 Insider (N Foos to Blah for the BlahBlah)。那说:...【详细内容】
2021-12-07  富集云科技有限公司    Tags:Redis   点击:(14)  评论:(0)  加入收藏
目录 一、背景 二、步骤 0.理论支持 1、获取数据 2、结果 3、分析数据并评估大小 三、关于repl-backlog-size 一、背景 repl-backlog-size控制这个环形缓冲区. ​ 主从断...【详细内容】
2021-11-05  弈秋的美好生活    Tags:redis   点击:(41)  评论:(0)  加入收藏
Redis 性能测试是通过同时执行多个命令实现的。1,Redis-benchmarkRedis性能命令:redis性能命令格式: redis-benchmark [option] [option value] redis 性能测试工具可选参数如...【详细内容】
2021-11-02  川石信息    Tags:Redis   点击:(41)  评论:(0)  加入收藏
1 概述数据结构和内部编码 无传统关系型数据库的 Table 模型schema 所对应的db仅以编号区分。同一 db 内,key 作为顶层模型,它的值是扁平化的。即 db 就是key的命名空间。 key...【详细内容】
2021-11-01  JavaEdge    Tags:Redis   点击:(28)  评论:(0)  加入收藏
普通java中使用引用Java redis 驱动,即可连接:import redis.clients.jedis.Jedis; public class RedisTestJava { public static void main(String[] args) { //连...【详细内容】
2021-10-13  faesuite    Tags:Redis   点击:(34)  评论:(0)  加入收藏
Redis常用的数据结构有 string list set zset hashstringstring 是 Redis 的基本的数据类型,一个 key 对应一个 value。string 类型是二进制安全的,Redis的string可以包含任...【详细内容】
2021-10-12  语霖    Tags:Redis   点击:(36)  评论:(0)  加入收藏
列表类型可以存储一组按插入顺序排序的字符串,它非常灵活,支持在两端插入、弹出数据,可以充当栈和队列的角色。> LPUSH fruit apple(integer) 1> RPUSH fruit banana(integer)...【详细内容】
2021-09-17  深夜敲代码    Tags:Redis   点击:(54)  评论:(0)  加入收藏
Redis持久化意义 是做灾难恢复,数据恢复,也可以归类到高可用的一个环节里面去,比如你的redis整个挂了,然后redis就不可用了,你要做的事情是让redis变得可用,尽快变得可用 大量的请...【详细内容】
2021-08-12  小李说IT    Tags:Redis   点击:(77)  评论:(0)  加入收藏
当查询Redis中没有的数据时,该查询会下沉到数据库层,同时数据库层也没有该数据,当这种情况大量出现或被恶意攻击时,接口的访问全部透过Redis访问数据库,而数据库中也没有这些数据...【详细内容】
2021-07-30  随便t    Tags:缓存穿透   点击:(91)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条