轰轰烈烈的双十二已经过去小半个月了,程序猿的我坐在办公桌上思考,双十二这么大的访问量,这群电商是怎么扛住的,接口分分钟会变得不可用,并引发连锁反应导致整个系统崩溃。好吃懒做的小编,被可怕的好奇心驱使着去调研流量控制算法。好奇心害死猫,才有了这篇文章。
流量控制在计算机领域称为过载保护。何为过载保护?所谓“过载”,即需求超过了负载能力;而“保护”则是指当“过载”发生了,采取必要的措施保护自己不受“伤害”。在计算机领域,尤其是分布式系统领域,“过载保护”是一个重要的概念。一个不具备“过载保护”功能的系统,是非常危险和脆弱的,很可能由于瞬间的压力激增,引起“雪崩效应”,导致系统的各个部分都同时崩溃,停止服务。这就好像在没有保险丝的保护下,电压突然变高,导致所有的电器都会被损坏一样,“过载保护”功能是系统的“保险丝”。
如今互联网领域,也借鉴了这一思路扛住双十二, 控制网络数据传输的速率,使流量以比较均匀的速度向外发送。最终实现优化性能,减少延迟和提高带宽等。
常用的限流算法有两种:漏桶算法和令牌桶算法。本篇文章将介绍自己造轮子限流算法、漏桶算法和令牌桶算法。
作为一名小白,我是不愿意自己造轮子的,但是真要早轮子,有一个简单粗暴的思路:1)设置单位时间T(如10s)内的最大访问量ReqMax,在单位时间T内维护计数器Count;2)当请求到达时,判断时间是否进入下一个单位时间;3)如果是,则重置计数器为0;4)如果不是,计数器Count++,并判断计数器Count是否超过最大访问量ReqMax,如超过,则拒绝访问。
long timeStamp = getNowTime();
int reqCount = 0;
const int maxReqCount = 10000;//时间周期内最大请求数
const long effectiveDuration = 10;//时间控制周期
public static bool control(){
long now = getNowTime();
if (now < timeStamp + effectiveDuration){//在时间控制范围内
reqCount++;
return reqCount > maxReqCount;//当前时间范围内超过最大请求控制数
}else{
timeStamp = now;//超时后重置
reqCount = 0;
return true;
}
}
public static int getNowTime(){
long time = System.currentTimeMillis();
return (int) (time/1000);
}
该算法实现看似确实完美的实现了“单位时间内最大访问量控制”,但它在两个单位时间的临界值上的处理是有缺陷的。如:设需要控制的最大请求数为1w, 在第一个单位时间(0-10s)的最后一秒(即第9s)里达到的请求数为1w,接下来第二个单位时间(10-20s)的第一秒(即第10s)里达到请求数也是1w,由于超时重置发生在两个单位时间之间,所以这2w个请求都将通过控制,也就是说在2s里处理2w个请求,与我们设置的10s里1w个请求的设想是相违背。
学术一点的说法是该算法处理请求不够平滑,不能很好的满足限流需求。
漏桶算法思路很简单,请求先进入到漏桶里,漏桶以固定的速度出水,也就是处理请求,当水加的过快,则会直接溢出,也就是拒绝请求,可以看出漏桶算法能强行限制数据的传输速率。
漏桶算法
long timeStamp = getNowTime();
int capacity = 10000;// 桶的容量
int rate = 1;//水漏出的速度
int water = 100;//当前水量
public static bool control() {
//先执行漏水,因为rate是固定的,所以可以认为“时间间隔*rate”即为漏出的水量
long now = getNowTime();
water = Math.max(0, water - (now - timeStamp) * rate);
timeStamp = now;
if (water < capacity) { // 水还未满,加水
water ++;
return true;
} else {
return false;//水满,拒绝加水
}
}
该算法很好的解决了时间边界处理不够平滑的问题,因为在每次请求进桶前都将执行“漏水”的操作,再无边界问题。
但是对于很多场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。
令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。
令牌桶算法
令牌桶是网络设备的内部存储池,而令牌则是以给定速率填充令牌桶的虚拟信息包。每个到达的令牌都会从数据队列领出相应的数据包进行发送,发送完数据后令牌被删除。
请求注解(RFC)中定义了两种令牌桶算法——单速率三色标记算法和双速率三色标记算法,其评估结果都是为报文打上红、黄、绿三色标记。QoS会根据报文的颜色,设置报文的丢弃优先级,其中单速率三色标记比较关心报文尺寸的突发,而双速率三色标记则关注速率上的突发,两种算法都可工作于色盲模式和非色盲模式。以下结合这两种工作模式介绍一下RFC中所描述的这两种算法。
1)单速率三色标记算法网络工程师任务小组(IETF)的RFC文件定义了单速率三色标记算法,评估依据以下3个参数:承诺访问速率(CIR),即向令牌桶中填充令牌的速率;承诺突发尺寸(CBS),即令牌桶的容量,每次突发所允许的最大流量尺寸(注:设置的突发尺寸必须大于最大报文长度);超额突发尺寸(EBS)。
一般采用双桶结构:C桶和E桶。Tc表示C桶中的令牌数,Te表示E桶中令牌数,两桶的总容量分别为CBS和EBS。初始状态时两桶是满的,即Tc和Te初始值分别等于CBS和EBS。令牌的产生速率是CIR,通常是先往C桶中添加令牌,等C桶满了,再往E桶中添加令牌,当两桶都被填满时,新产生的令牌将会被丢弃。
色盲模式下,假设到达的报文长度为B。若报文长度B小于C桶中的令牌数Tc,则报文被标记为绿色,且C桶中的令牌数减少B;若Tc<B <Te,则标记为黄色,E和C桶中的令牌数均减少B;若B >Te,标记为红色,两桶总令牌数都不减少。
在非色盲模式下,若报文已被标记为绿色或B <Tc,则报文被标记为绿色,Tc减少B;若报文已被标记为黄色或Tc<B <Te,则标记为黄色,且Te减少B;若报文已被标记为红色或B >Te,则标记为红色,Tc和Te都不减少。
2)双速率三色标记算法IETF的RFC文件定义了双速率三色算法,主要是根据4种流量参数来评估:CIR、CBS、峰值信息速率(PIR),峰值突发尺寸(PBS)。前两种参数与单速率三色算法中的含义相同,PIR这个参数只在交换机上才有,路由器没有这个参数。该值必须不小于CIR的设置值,如果大于CIR,则速率限制在CIR于PRI之间的一个值。
与单速率三色标记算法不同,双速率三色标记算法的两个令牌桶C桶和P桶填充令牌的速率不同,C桶填充速率为CIR,P桶为PIR;两桶的容量分别为CBS和PBS。用Tc和Tp表示两桶中的令牌数目,初始状态时两桶是满的,即Tc和Tp初始值分别等于CBS和PBS。
色盲模式下,如果到达的报文速率大于PIR,超过Tp+Tc部分无法得到令牌,报文被标记为红色,未超过Tp+Tc而从P桶中获取令牌的报文标记为黄色,从C桶中获取令牌的报文被标记为绿色;当报文速率小于PIR,大于CIR时,报文不会得不到令牌,但超过Tp部分报文将从P桶中获取令牌,被标记为黄色报文,从C桶中获取令牌的报文被标记为绿色;当报文速率小于CIR时,报文所需令牌数不会超过Tc,只从C桶中获取令牌,所以只会被标记为绿色报文。
在非色盲模式下,如果报文已被标记为红色或者超过Tp+Tc部分无法得到令牌的报文,被标记为红色;如果标记为黄色或者超过Tc未超过Tp部分报文记为黄色;如果报文被标记为绿或未超过Tc部分报文,被标记为绿色。
long timeStamp=getNowTime();
int capacity; // 桶的容量
int rate ;//令牌放入速度
int tokens;//当前水量
bool control() {
//先执行添加令牌的操作
long now = getNowTime();
tokens = max(capacity, tokens+ (now - timeStamp)*rate);
timeStamp = now; //令牌已用完,拒绝访问
if(tokens<1){
return false;
}else{//还有令牌,领取令牌
tokens--;
retun true;
}
}
令牌桶算法是网络流量整形和速率限制中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。
大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。
传送到令牌桶的数据包需要消耗令牌。不同大小的数据包,消耗的令牌数量不一样。令牌桶这种控制机制基于令牌桶中是否存在令牌来指示什么时候可以发送流量。令牌桶中的每一个令牌都代表一个字节。如果令牌桶中存在令牌,则允许发送流量;而如果令牌桶中不存在令牌,则不允许发送流量。因此,如果突发门限被合理地配置并且令牌桶中有足够的令牌,那么流量就可以以峰值速率发送。
google开源工具包guava提供了限流工具类RateLimiter,该类基于“令牌桶算法”,非常方便使用。RateLimiter经常用于限制对一些物理资源或者逻辑资源的访问速率。它支持两种获取permits接口,一种是如果拿不到立刻返回false,一种会阻塞等待一段时间看能不能拿到。
//多任务执行,但每秒执行不超过2个任务
final RateLimiter rateLimiter = RateLimiter.create(2.0);
void submitTasks(List<Runnable> tasks, Executor executor) {
for (Runnable task : tasks) {
rateLimiter.acquire(); // may wait
executor.execute(task);
}
}
//以每秒5kb内的速度发送
final RateLimiter rateLimiter = RateLimiter.create(5000.0);
void submitPacket(byte[] packet) {
rateLimiter.acquire(packet.length);
networkService.send(packet);
}
//以非阻塞的形式达到降级
if(limiter.tryAcquire()) { //未请求到limiter则立即返回false
doSomething();
}else{
doSomethingElse();
}
RateLimiter其实是一个abstract类,但是它提供了几个static方法用于创建RateLimiter:
/**
* 创建一个稳定输出令牌的RateLimiter,保证了平均每秒不超过permitsPerSecond个请求
* 当请求到来的速度超过了permitsPerSecond,保证每秒只处理permitsPerSecond个请求
* 当这个RateLimiter使用不足(即请求到来速度小于permitsPerSecond),会囤积最多permitsPerSecond个请求
*/
public static RateLimiter create(double permitsPerSecond);
/**
* 创建一个稳定输出令牌的RateLimiter,保证了平均每秒不超过permitsPerSecond个请求
* 还包含一个热身期(warmup period),热身期内,RateLimiter会平滑的将其释放令牌的速率加大,直到起达到最大速率
* 同样,如果RateLimiter在热身期没有足够的请求(unused),则起速率会逐渐降低到冷却状态
*
* 设计这个的意图是为了满足那种资源提供方需要热身时间,而不是每次访问都能提供稳定速率的服务的情况(比如带缓存服务,需要定期刷新缓存的)
* 参数warmupPeriod和unit决定了其从冷却状态到达最大速率的时间
*/
public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit);
提供了两个获取令牌的方法,不带参数表示获取一个令牌。如果没有令牌则一直等待,返回等待的时间(单位为秒),没有被限流则直接返回0.0:
public double acquire();
public double acquire(int permits);
尝试获取令牌,分为待超时时间和不带超时时间两种:
public boolean tryAcquire();
//尝试获取一个令牌,立即返回
public boolean tryAcquire(int permits);
public boolean tryAcquire(long timeout, TimeUnit unit);
//尝试获取permits个令牌,带超时时间
public boolean tryAcquire(int permits, long timeout, TimeUnit unit);
RateLimiter的主要功能就是提供一个稳定的速率,实现方式就是通过限制请求流入的速度,比如计算请求等待合适的时间阈值。
实现QPS速率的最简单的方式就是记住上一次请求的最后授权时间,然后保证1/QPS秒内不允许请求进入。比如QPS=5,如果我们保证最后一个被授权请求之后的200ms的时间内没有请求被授权,那么我们就达到了预期的速率。如果一个请求现在过来但是最后一个被授权请求是在100ms之前,那么我们就要求当前这个请求等待100ms。按照这个思路,请求15个新令牌(许可证)就需要3秒。
有一点很重要:上面这个设计思路的RateLimiter记忆非常的浅,它的脑容量非常的小,只记得上一次被授权的请求的时间。如果RateLimiter的一个被授权请求q之前很长一段时间没有被使用会怎么样?这个RateLimiter会立马忘记过去这一段时间的利用不足,而只记得刚刚的请求q。
过去一段时间的利用不足意味着有过剩的资源是可以利用的。这种情况下,RateLimiter应该加把劲(speed up for a while)将这些过剩的资源利用起来。比如在向网络中发生数据的场景(限流),过去一段时间的利用不足可能意味着网卡缓冲区是空的,这种场景下,我们是可以加速发送来将这些过程的资源利用起来。
另一方面,过去一段时间的利用不足可能意味着处理请求的服务器对即将到来的请求是准备不足的(less ready for future requests),比如因为很长一段时间没有请求当前服务器的cache是陈旧的,进而导致即将到来的请求会触发一个昂贵的操作(比如重新刷新全量的缓存)。
为了处理这种情况,RateLimiter中增加了一个维度的信息,就是过去一段时间的利用不足(past underutilization),代码中使用storedPermits变量表示。当没有利用不足这个变量为0,最大能达到maxStoredPermits(maxStoredPermits表示完全没有利用)。因此,请求的令牌可能从两个地方来:
过去剩余的令牌(stored permits, 可能没有)
现有的令牌(fresh permits,当前这段时间还没用完的令牌)
我们将通过一个例子来解释它是如何工作的:对一个每秒产生一个令牌的RateLimiter,每有一个没有使用令牌的一秒,我们就将storedPermits加1,如果RateLimiter在10秒都没有使用,则storedPermits变成10.0。这个时候,一个请求到来并请求三个令牌(acquire(3)),我们将从storedPermits中的令牌为其服务,storedPermits变为7.0。这个请求之后立马又有一个请求到来并请求10个令牌,我们将从storedPermits剩余的7个令牌给这个请求,剩下还需要三个令牌,我们将从RateLimiter新产生的令牌中获取。我们已经知道,RateLimiter每秒新产生1个令牌,就是说上面这个请求还需要的3个请求就要求其等待3秒。
想象一个RateLimiter每秒产生一个令牌,现在完全没有使用(处于初始状态),限制一个昂贵的请求acquire(100)过来。如果我们选择让这个请求等待100秒再允许其执行,这显然很荒谬。我们为什么什么也不做而只是傻傻的等待100秒,一个更好的做法是允许这个请求立即执行(和acquire(1)没有区别),然后将随后到来的请求推迟到正确的时间点。这种策略,我们允许这个昂贵的任务立即执行,并将随后到来的请求推迟100秒。这种策略就是让任务的执行和等待同时进行。
一个重要的结论:RateLimiter不会记最后一个请求,而是即下一个请求允许执行的时间。这也可以很直白的告诉我们到达下一个调度时间点的时间间隔。然后定一个一段时间未使用的Ratelimiter也很简单:下一个调度时间点已经过去,这个时间点和现在时间的差就是Ratelimiter多久没有被使用,我们会将这一段时间翻译成storedPermits。所有,如果每秒钟产生一个令牌(rate==1),并且正好每秒来一个请求,那么storedPermits就不会增长。
分析一下RateLimiter如何实现限流:
public double acquire() {
return acquire(1);
}
public double acquire(int permits) {
long microsToWait = reserve(permits);
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
checkPermits(permits);
synchronized (mutex()) { //应对并发情况需要同步
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
return max(momentAvailable - nowMicros, 0);
}
下面方法来自RateLimiter的具体实现类SmoothRateLimiter:
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
resync(nowMicros); //补充令牌
long returnValue = nextFreeTicketMicros;
//这次请求消耗的令牌数目
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
double freshPermits = requiredPermits - storedPermitsToSpend;
long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
this.nextFreeTicketMicros = nextFreeTicketMicros + waitMicros;
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
private void resync(long nowMicros) {
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
storedPermits = min(maxPermits,
storedPermits + (nowMicros - nextFreeTicketMicros) / stableIntervalMicros);
nextFreeTicketMicros = nowMicros;
}
}
另外,对于storedPermits的使用,RateLimiter存在两种策略,二者区别主要体现在使用storedPermits时候需要等待的时间。这个逻辑由storedPermitsToWaitTime函数实现:
/**
* Translates a specified portion of our currently stored permits which we want to
* spend/acquire, into a throttling time. Conceptually, this evaluates the integral
* of the underlying function we use, for the range of
* [(storedPermits - permitsToTake), storedPermits].
*
* <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
*/
abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);
存在两种策略就是为了应对我们上面讲到的,存在资源使用不足大致分为两种情况:
(1)资源确实使用不足,这些剩余的资源我们私海可以使用的;
(2)提供资源的服务过去还没准备好,比如服务刚启动等。
为此,RateLimiter实际上由两种实现策略,其实现分别见SmoothBursty和SmoothWarmingUp。二者主要的区别就是storedPermitsToWaitTime实现以及maxPermits数量的计算。
SmoothBursty使用storedPermits不需要额外等待时间。并且默认maxBurstSeconds未1,因此maxPermits为permitsPerSecond,即最多可以存储1秒的剩余令牌,比如QPS=5,则maxPermits=5。
下面这个RateLimiter的入口就是用来创建SmoothBursty类型的RateLimiter:
public static RateLimiter create(double permitsPerSecond)
/**
* This implements a "bursty" RateLimiter, where storedPermits are translated to
* zero throttling. The maximum number of permits that can be saved (when the RateLimiter is
* unused) is defined in terms of time, in this sense: if a RateLimiter is 2qps, and this
* time is specified as 10 seconds, we can save up to 2 * 10 = 20 permits.
*/
static final class SmoothBursty extends SmoothRateLimiter {
/** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
final double maxBurstSeconds;
SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
super(stopwatch);
this.maxBurstSeconds = maxBurstSeconds;
}
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
System.out.println("maxPermits=" + maxPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
storedPermits = (oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
return 0L;
}
}
一个简单的使用示意图及解释,下面私海一个QPS=4的SmoothBursty:
(1)t=0,这时候storedPermits=0,请求1个令牌,等待时间=0;
(2)t=1,这时候storedPermits=3,请求3个令牌,等待时间=0;
(3)t=2,这时候storedPermits=4,请求10个令牌,等待时间=0,超前使用了2个令牌;
(4)t=3,这时候storedPermits=0,请求1个令牌,等待时间=0.5。
代码的输出:
maxPermits=4.0, storedPermits=7.2E-4, stableIntervalMicros=250000.0, nextFreeTicketMicros=1472
acquire(1), sleepSecond=0.0
maxPermits=4.0, storedPermits=3.012212, stableIntervalMicros=250000.0, nextFreeTicketMicros=1004345
acquire(3), sleepSecond=0.0
maxPermits=4.0, storedPermits=4.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=2004668
acquire(10), sleepSecond=0.0
maxPermits=4.0, storedPermits=0.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=3504668
acquire(1), sleepSecond=0.499591
static final class SmoothWarmingUp extends SmoothRateLimiter {
private final long warmupPeriodMicros;
/**
* The slope of the line from the stable interval (when permits == 0), to the cold interval
* (when permits == maxPermits)
*/
private double slope;
private double halfPermits;
SmoothWarmingUp(SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit) {
super(stopwatch);
this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
maxPermits = warmupPeriodMicros / stableIntervalMicros;
halfPermits = maxPermits / 2.0;
// Stable interval is x, cold is 3x, so on average it's 2x. Double the time -> halve the rate
double coldIntervalMicros = stableIntervalMicros * 3.0;
slope = (coldIntervalMicros - stableIntervalMicros) / halfPermits;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits = (oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
double availablePermitsAboveHalf = storedPermits - halfPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
if (availablePermitsAboveHalf > 0.0) {
double permitsAboveHalfToTake = min(availablePermitsAboveHalf, permitsToTake);
micros = (long) (permitsAboveHalfToTake * (permitsToTime(availablePermitsAboveHalf)
+ permitsToTime(availablePermitsAboveHalf - permitsAboveHalfToTake)) / 2.0);
permitsToTake -= permitsAboveHalfToTake;
}
// measuring the integral on the left part of the function (the horizontal line)
micros += (stableIntervalMicros * permitsToTake);
return micros;
}
private double permitsToTime(double permits) {
return stableIntervalMicros + permits * slope;
}
}
maxPermits等于热身(warmup)期间能产生的令牌数,比如QPS=4,warmup为2秒,则maxPermits=8。halfPermits为maxPermits的一半。
参考注释中的神图:
* ^ throttling
* |
* 3*stable + /
* interval | /.
* (cold) | / .
* | / . <-- "warmup period" is the area of the trapezoid between
* 2*stable + / . halfPermits and maxPermits
* interval | / .
* | / .
* | / .
* stable +----------/ WARM . }
* interval | . UP . } <-- this rectangle (from 0 to maxPermits, and
* | . PERIOD. } height == stableInterval) defines the cooldown period,
* | . . } and we want cooldownPeriod == warmupPeriod
* |---------------------------------> storedPermits
* (halfPermits) (maxPermits)
*
下面是我们QPS=4,warmup为2秒时候对应的图。
maxPermits=8,halfPermits=4,和SmoothBursty相同的请求序列:
(1)t=0,这时候storedPermits=8,请求1个令牌,使用1个storedPermits消耗时间=1×(0.75+0.625)/2=0.6875秒;
(2)t=1,这时候storedPermits=8,请求3个令牌,使用3个storedPermits消耗时间=3×(0.75+0.375)/2=1.6875秒(注意已经超过1秒了,意味着下次产生新Permit时间为2.6875);
(3)t=2,这时候storedPermits=5,请求10个令牌,使用5个storedPermits消耗时间=1×(0.375+0.25)/2+4*0.25=1.3125秒,再加上额外请求的5个新产生的Permit需要消耗=5*0.25=1.25秒,即总共需要耗时2.5625秒,则下一次产生新的Permit时间为2.6875+2.5625=5.25,注意当前请求私海2.6875才返回的,之前一直阻塞;
(4)t=3,因为前一个请求阻塞到2.6875,实际这个请求3.6875才到达RateLimiter,请求1个令牌,storedPermits=0,下一次产生新Permit时间为5.25,因此总共需要等待5.25-3.6875=1.5625秒。
实际执行结果:
warmupPeriodMicros=2000000
stableIntervalMicros=250000.0, maxPermits=8.0, halfPermits=4.0, coldIntervalMicros=750000.0, slope=125000.0, storedPermits=8.0
maxPermits=8.0, storedPermits=8.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=1524
acquire(1), sleepSecond=0.0
maxPermits=8.0, storedPermits=8.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=1001946
acquire(3), sleepSecond=0.0
maxPermits=8.0, storedPermits=5.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=2689446
acquire(10), sleepSecond=0.687186
maxPermits=8.0, storedPermits=0.0, stableIntervalMicros=250000.0, nextFreeTicketMicros=5251946
acquire(1), sleepSecond=1.559174
ListenableFuture顾名思义就是可以监听的Future,它是对JAVA原生Future的扩展增强。我们知道Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果我们希望一旦计算完成就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做,代码复杂,而且效率低下。使用ListenableFuture Guava帮我们检测Future是否完成了,如果完成就自动调用回调函数,这样可以减少并发程序的复杂度。
推荐使用第二种方法,因为第二种方法可以直接得到Future的返回值,或者处理错误情况。本质上第二种方法是通过调动第一种方法实现的,做了进一步的封装。
另外ListenableFuture还有其他几种内置实现:
1)SettableFuture:不需要实现一个方法来计算返回值,而只需要返回一个固定值来做为返回值,可以通过程序设置此Future的返回值或者异常信息;2)CheckedFuture:这是一个继承自ListenableFuture接口,他提供了checkedGet()方法,此方法在Future执行发生异常时,可以抛出指定类型的异常。
RateLimiter类似于JDK的信号量Semphore,他用来限制对资源并发访问的线程数
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;
public class ListenableFutureDemo {
public static void main(String[] args) {
testRateLimiter();
testListenableFuture();
}
/**
* RateLimiter类似于JDK的信号量Semphore,他用来限制对资源并发访问的线程数
*/
public static void testRateLimiter() {
ListeningExecutorService executorService = MoreExecutors
.listeningDecorator(Executors.newCachedThreadPool());
RateLimiter limiter = RateLimiter.create(5.0); // 每秒不超过4个任务被提交
for (int i = 0; i < 10; i++) {
limiter.acquire(); // 请求RateLimiter, 超过permits会被阻塞
final ListenableFuture<Integer> listenableFuture = executorService
.submit(new Task("is "+ i));
}
}
public static void testListenableFuture() {
ListeningExecutorService executorService = MoreExecutors
.listeningDecorator(Executors.newCachedThreadPool());
final ListenableFuture<Integer> listenableFuture = executorService
.submit(new Task("testListenableFuture"));
//同步获取调用结果
try {
System.out.println(listenableFuture.get());
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (ExecutionException e1) {
e1.printStackTrace();
}
//第一种方式
listenableFuture.addListener(new Runnable() {
@Override
public void run() {
try {
System.out.println("get listenable future's result "
+ listenableFuture.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}, executorService);
//第二种方式
Futures.addCallback(listenableFuture, new FutureCallback<Integer>() {
@Override
public void onSuccess(Integer result) {
System.out
.println("get listenable future's result with callback "
+ result);
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
});
}
}
class Task implements Callable<Integer> {
String str;
public Task(String str){
this.str = str;
}
@Override
public Integer call() throws Exception {
System.out.println("call execute.." + str);
TimeUnit.SECONDS.sleep(1);
return 7;
}
}
pom.xml依赖
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>14.0.1</version>
</dependency>