后端服务的接口都是有访问上限的,如果外部 QPS 或并发量超过了访问上限会导致应用瘫痪。所以一般都会对接口调用加上限流保护,防止超出预期的请求导致系统故障。

从限流类型来说一般来说分为两种: 并发数限流和 qps 限流,并发数限流就是限制同一时刻的最大并发请求数量,qps 限流指的是限制一段时间内发生的请求个数。

从作用范围的层次上来看分单机限流和分布式限流,前者是针对单机的,后者是针对集群的,他们的思想都是一样的,只不过是范围不一样,本文分析的都是单机限流

接下来我们看看并发数限流和 QPS 限流。

并发数限流

并发数限流限制的是同一时刻的并发数,所以不考虑线程安全的话,我们只要用一个 int 变量就能实现,伪代码如下:

int maxRequest=100;
int nowRequest=0;

public void request(){
if(nowRequest>=maxRequest){
return ;
}
nowRequest++;
// 调用接口
try{
invokeXXX();
}finally{
nowRequest–;
}
}

复制代码

显然,上述实现会有线程安全的问题,最直接的做法是加锁:

int maxRequest=100;
int nowRequest=0;

public void request(){
if(nowRequest>=maxRequest){
return ;
}
synchronized(this){
if(nowRequest>=maxRequest){
return ;
}
nowRequest++;
}

<span class="hljs-comment">//调用接口</span>
<span class="hljs-keyword">try</span>{
     invokeXXX();    
}<span class="hljs-keyword">finally</span>{
    <span class="hljs-keyword">synchronized</span>(<span class="hljs-keyword">this</span>){
     	nowRequest--;
    }
}

}

复制代码

当然也可以用 AtomicInteger 实现:

int maxRequest=100;
AtomicInteger nowRequest=new AtomicInteger(0);

public void request(){
for(;;){
int currentReq=nowRequest.get();
if(currentReq>=maxRequest){
return;
}
if(nowRequest.compareAndSet(currentReq,currentReq+1)){
break;
}
}

<span class="hljs-comment">//调用接口</span>
<span class="hljs-keyword">try</span>{
     invokeXXX();    
}<span class="hljs-keyword">finally</span>{
    nowRequest.decrementAndGet();
}

}

复制代码

熟悉 JDK 并发包的同学会说干嘛这么麻烦,这不就是信号量(Semaphore)做的事情吗? 对的,其实最简单的方法就是用信号量来实现:

int maxRequest=100;
Semaphore reqSemaphore = new Semaphore(maxRequest);

public void request(){
if(!reqSemaphore.tryAcquire()){
return ;
}

<span class="hljs-comment">//调用接口</span>
<span class="hljs-keyword">try</span>{
     invokeXXX();    
}<span class="hljs-keyword">finally</span>{
   reqSemaphore.release();
}

}

复制代码

条条大路通罗马,并发数限流比较简单,一般来说用信号量就好。

QPS 限流

QPS 限流限制的是一段时间内(一般指 1 秒)的请求个数。

计数器法

最简单的做法用一个 int 型的 count 变量做计数器:请求前计数器 +1,如超过阈值并且与第一个请求的间隔还在 1s 内,则限流。

伪代码如下:

int maxQps=100;
int count;
long timeStamp=System.currentTimeMillis();
long interval=1000;

public synchronized boolean grant(){
long now=System.currentTimeMillis();
if(now<timeStamp+interval){
count++;
return count<maxQps;
}else{
timeStamp=now;
count=1;
return true;
}
}

复制代码

该种方法实现起来很简单,但其实是有临界问题的,假如在第一秒的后 500ms 来了 100 个请求,第 2 秒的前 500ms 来了 100 个请求,那在这 1 秒内其实最大 QPS 为 200。如下图:

image

计数器法会有临界问题,主要还是统计的精度太低,这点可以通过滑动窗口算法解决

滑动窗口

我们用一个长度为 10 的数组表示 1 秒内的 QPS 请求,数组每个元素对应了相应 100ms 内的请求数。用一个sum变量代码当前 1s 的请求数。同时每隔 100ms 将淘汰过期的值。

伪代码如下:

int maxQps=100;
AtomicInteger[] count=new AtomicInteger[10];
long timeStamp=System.currentTimeMillis();
long interval=1000;
AtomicInteger sum;
volatile int index;

public void init(){
for(int i=0;i<count.length;i++){
count[i]=new AtomicInteger(0);
}
sum=new AtomicInteger(0);
}

public synchronized boolean grant(){
count[index].incrementAndGet();
return sum.incrementAndGet()<maxQps;
}

// 每 100ms 执行一次
public void run(){
index=(index+1)%count.length;
int val=count[index].getAndSet(0);
sum.addAndGet(-val);
}

复制代码

滑动窗口的窗口越小,则精度越高,相应的资源消耗也更高。

漏桶算法

漏桶算法思路是,有一个固定大小的桶,水(请求)忽快忽慢的进入到漏桶里,漏桶以一定的速度出水。当桶满了之后会发生溢出。

image

维基百科上可以看到,漏桶算法有两种实现,一种是as a meter,另一种是as a queue网上大多数文章都没有提到其有两种实现,且对这两种概念混乱。

As a meter

第一种实现是和令牌桶等价的,只是表述角度不同。

伪代码如下:

long timeStamp=System.currentTimeMillis();// 上一次调用 grant 的时间
int bucketSize=100;// 桶大小
int rate=10;// 每 ms 流出多少请求
int count;// 目前的水量

public synchronized boolean grant(){
long now = System.currentTimeMillis();
if(now>timeStamp){
count = Math.max(0,count-(now-timeStamp)*rate);
timeStamp = now;
}

<span class="hljs-keyword">if</span>(count+<span class="hljs-number">1</span>&lt;=bucketSize){
    count++;
    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;
}<span class="hljs-keyword">else</span>{
    <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;
}

}

复制代码

该种实现允许一段时间内的突发流量,比如初始时桶中没有水,这时 1ms 内来了 100 个请求,这 100 个请求是不会被限流的,但之后每 ms 最多只能接受 10 个请求(比如下 1ms 又来了 100 个请求,那其中 90 个请求是会被限流的)。

其达到的效果和令牌桶一样。

As a queue

第二种实现是用一个队列实现,当请求到来时如果队列没满则加入到队列中,否则拒绝掉新的请求。同时会以恒定的速率从队列中取出请求执行。

image

伪代码如下:

Queue<Request> queue=new LinkedBlockingQueue(100);
int gap;
int rate;

public synchronized boolean grant(Request req){
if(!queue.offer(req)){return false;}
}

// 单独线程执行
void consume(){
while(true){
for(int i=0;i<rate;i++){
// 执行请求
Request req=queue.poll();
if(req==null){break;}
req.doRequest();
}
Thread.sleep(gap);
}
}

复制代码

对于该种算法,固定的限定了请求的速度,不允许流量突发的情况。

比如初始时桶是空的,这时 1ms 内来了 100 个请求,那只有前 10 个会被接受,其他的会被拒绝掉。注意与上文中as a meter实现的区别。

** 不过,当桶的大小等于每个 ticket 流出的水大小时,第二种漏桶算法和第一种漏桶算法是等价的。** 也就是说,as a queueas a meter的一种特殊实现。如果你没有理解这句话,你可以再看看上面as a meter的伪代码,当bucketSize==rate时,请求速度就是恒定的,不允许突发流量。

令牌桶算法

令牌桶算法的思想就是,桶中最多有 N 个令牌,会以一定速率往桶中加令牌,每个请求都需要从令牌桶中取出相应的令牌才能放行,如果桶中没有令牌则被限流。

image

令牌桶算法与上文的漏桶算法as a meter实现是等价的,能够在限制数据的平均传输速率的同时还允许某种程度的突发传输。伪代码:

int token;
int bucketSize;
int rate;
long timeStamp=System.currentTimeMillis();

public synchronized boolean grant(){
long now=System.currentTimeMillis();
if(now>timeStamp){
token=Math.max(bucketSize,token+(timeStamp-now)*rate);
timeStamp=now;
}
if(token>0){
token–;
return true;
}else{
return false;
}

}

复制代码

漏桶算法两种实现和令牌桶算法的对比

as a meter的漏桶算法和令牌桶算法是一样的,只是思想角度有所不同。

as a queue的漏桶算法能强行限制数据的传输速率,而令牌桶和as a meter漏桶则能够在限制数据的平均传输速率的同时还允许某种程度的突发传输。

一般业界用的比较多的是令牌桶算法,像 guava 中的RateLimiter就是基于令牌桶算法实现的。当然不同的业务场景会有不同的需要,具体的选择还是要结合场景。

End

本文介绍了后端系统中常用的限流算法,对于每种算法都有对应的伪代码,结合伪代码理解起来应该不难。但伪代码中只是描述了大致思想,对于一些细节和效率问题并没有关注,所以下篇文章将会分析常用限流 API:guava 的RateLimiter的源码实现,让读者对于限流有个更清晰的认识。

参考文章

blog.zhuxingsheng.com/blog/counte…

blog.51cto.com/leyew/86030…

en.wikipedia.org/wiki/Leaky_…

  • Java

    Java,是由 Sun Microsystems 公司于 1995 年 5 月推出的 Java 程序设计语言和 Java 平台的总称。用 Java 实现的 HotJava 浏览器(支持 Java applet)显示了 Java 的魅力:跨平台、动态的…

    380 引用 • 6 回帖
感谢    赞同    分享    收藏    关注    反对    举报    ...