2019-11-07 10:14:36 +08:00
|
|
|
|
# 如何限流?在工作中是怎么做的?说一下具体的实现?
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
|
|
|
|
* Author: [HuiFer](https://github.com/huifer)
|
|
|
|
|
* Description: 该文简单介绍限流相关技术以及实现
|
2019-11-07 10:14:36 +08:00
|
|
|
|
|
|
|
|
|
## 什么是限流
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
2019-11-07 10:14:36 +08:00
|
|
|
|
> 限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。
|
|
|
|
|
|
|
|
|
|
## 限流方法
|
|
|
|
|
|
|
|
|
|
### 计数器
|
|
|
|
|
|
|
|
|
|
#### 实现方式
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
|
|
|
|
* 控制单位时间内的请求数量
|
|
|
|
|
|
|
|
|
|
``` java
|
2019-11-07 10:14:36 +08:00
|
|
|
|
|
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
|
|
|
|
public class Counter {
|
|
|
|
|
/**
|
|
|
|
|
* 最大访问数量
|
|
|
|
|
*/
|
|
|
|
|
private final int limit = 10;
|
|
|
|
|
/**
|
|
|
|
|
* 访问时间差
|
|
|
|
|
*/
|
|
|
|
|
private final long timeout = 1000;
|
|
|
|
|
/**
|
|
|
|
|
* 请求时间
|
|
|
|
|
*/
|
|
|
|
|
private long time;
|
|
|
|
|
/**
|
|
|
|
|
* 当前计数器
|
|
|
|
|
*/
|
|
|
|
|
private AtomicInteger reqCount = new AtomicInteger(0);
|
|
|
|
|
|
|
|
|
|
public boolean limit() {
|
|
|
|
|
long now = System.currentTimeMillis();
|
|
|
|
|
if (now < time + timeout) {
|
|
|
|
|
// 单位时间内
|
|
|
|
|
reqCount.addAndGet(1);
|
|
|
|
|
return reqCount.get() <= limit;
|
|
|
|
|
} else {
|
|
|
|
|
// 超出单位时间
|
|
|
|
|
time = now;
|
|
|
|
|
reqCount = new AtomicInteger(0);
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
2020-05-06 20:23:11 +08:00
|
|
|
|
* 劣势
|
2019-11-07 10:14:36 +08:00
|
|
|
|
- 假设在 00:01 时发生一个请求,在 00:01-00:58 之间不在发送请求,在 00:59 时发送剩下的所有请求 `n-1` (n为限流请求数量),在下一分钟的 00:01 发送n个请求,这样在2秒钟内请求到达了 `2n - 1` 个.
|
|
|
|
|
- 设每分钟请求数量为60个,每秒可以处理1个请求,用户在 00:59 发送 60 个请求,在 01:00 发送 60 个请求 此时2秒钟有120个请求(每秒60个请求),远远大于了每秒钟处理数量的阈值
|
|
|
|
|
|
|
|
|
|
### 滑动窗口
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
2019-11-07 10:14:36 +08:00
|
|
|
|
#### 实现方式
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
|
|
|
|
* 滑动窗口是对计数器方式的改进, 增加一个时间粒度的度量单位
|
2019-11-07 10:14:36 +08:00
|
|
|
|
- 把一分钟分成若干等分(6份,每份10秒), 在每一份上设置独立计数器,在 00:00-00:09 之间发生请求计数器累加1.当等分数量越大限流统计就越详细
|
|
|
|
|
|
2020-05-06 20:23:11 +08:00
|
|
|
|
``` java
|
2019-11-07 10:14:36 +08:00
|
|
|
|
package com.example.demo1.service;
|
|
|
|
|
|
|
|
|
|
import java.util.Iterator;
|
|
|
|
|
import java.util.Random;
|
|
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
|
|
import java.util.stream.IntStream;
|
|
|
|
|
|
|
|
|
|
public class TimeWindow {
|
|
|
|
|
private ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<Long>();
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 间隔秒数
|
|
|
|
|
*/
|
|
|
|
|
private int seconds;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 最大限流
|
|
|
|
|
*/
|
|
|
|
|
private int max;
|
|
|
|
|
|
|
|
|
|
public TimeWindow(int max, int seconds) {
|
|
|
|
|
this.seconds = seconds;
|
|
|
|
|
this.max = max;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 永续线程执行清理queue 任务
|
|
|
|
|
*/
|
|
|
|
|
new Thread(() -> {
|
|
|
|
|
while (true) {
|
|
|
|
|
try {
|
|
|
|
|
// 等待 间隔秒数-1 执行清理操作
|
|
|
|
|
Thread.sleep((seconds - 1) * 1000L);
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
clean();
|
|
|
|
|
}
|
|
|
|
|
}).start();
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
|
|
|
|
|
|
|
final TimeWindow timeWindow = new TimeWindow(10, 1);
|
|
|
|
|
|
|
|
|
|
// 测试3个线程
|
|
|
|
|
IntStream.range(0, 3).forEach((i) -> {
|
|
|
|
|
new Thread(() -> {
|
|
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
Thread.sleep(new Random().nextInt(20) * 100);
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
timeWindow.take();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}).start();
|
|
|
|
|
|
|
|
|
|
});
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 获取令牌,并且添加时间
|
|
|
|
|
*/
|
|
|
|
|
public void take() {
|
|
|
|
|
|
|
|
|
|
long start = System.currentTimeMillis();
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
|
|
int size = sizeOfValid();
|
|
|
|
|
if (size > max) {
|
|
|
|
|
System.err.println("超限");
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
synchronized (queue) {
|
|
|
|
|
if (sizeOfValid() > max) {
|
|
|
|
|
System.err.println("超限");
|
|
|
|
|
System.err.println("queue中有 " + queue.size() + " 最大数量 " + max);
|
|
|
|
|
}
|
|
|
|
|
this.queue.offer(System.currentTimeMillis());
|
|
|
|
|
}
|
|
|
|
|
System.out.println("queue中有 " + queue.size() + " 最大数量 " + max);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public int sizeOfValid() {
|
|
|
|
|
Iterator<Long> it = queue.iterator();
|
|
|
|
|
Long ms = System.currentTimeMillis() - seconds * 1000;
|
|
|
|
|
int count = 0;
|
|
|
|
|
while (it.hasNext()) {
|
|
|
|
|
long t = it.next();
|
|
|
|
|
if (t > ms) {
|
|
|
|
|
// 在当前的统计时间范围内
|
|
|
|
|
count++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return count;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 清理过期的时间
|
|
|
|
|
*/
|
|
|
|
|
public void clean() {
|
|
|
|
|
Long c = System.currentTimeMillis() - seconds * 1000;
|
|
|
|
|
|
|
|
|
|
Long tl = null;
|
|
|
|
|
while ((tl = queue.peek()) != null && tl < c) {
|
|
|
|
|
System.out.println("清理数据");
|
|
|
|
|
queue.poll();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
### Leaky Bucket 漏桶
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
2019-11-07 10:14:36 +08:00
|
|
|
|
#### 实现方式
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
|
|
|
|
* 规定固定容量的桶, 有水进入, 有水流出. 对于流进的水我们无法估计进来的数量、速度, 对于流出的水我们可以控制速度.
|
|
|
|
|
|
|
|
|
|
``` java
|
2019-11-07 10:14:36 +08:00
|
|
|
|
public class LeakBucket {
|
|
|
|
|
/**
|
|
|
|
|
* 时间
|
|
|
|
|
*/
|
|
|
|
|
private long time;
|
|
|
|
|
/**
|
|
|
|
|
* 总量
|
|
|
|
|
*/
|
|
|
|
|
private Double total;
|
|
|
|
|
/**
|
|
|
|
|
* 水流出去的速度
|
|
|
|
|
*/
|
|
|
|
|
private Double rate;
|
|
|
|
|
/**
|
|
|
|
|
* 当前总量
|
|
|
|
|
*/
|
|
|
|
|
private Double nowSize;
|
|
|
|
|
|
|
|
|
|
public boolean limit() {
|
|
|
|
|
long now = System.currentTimeMillis();
|
|
|
|
|
nowSize = Math.max(0, (nowSize - (now - time) * rate));
|
|
|
|
|
time = now;
|
|
|
|
|
if ((nowSize + 1) < total) {
|
|
|
|
|
nowSize++;
|
|
|
|
|
return true;
|
|
|
|
|
} else {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
### Token Bucket 令牌桶
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
2019-11-07 10:14:36 +08:00
|
|
|
|
#### 实现方式
|
|
|
|
|
|
2020-05-06 20:23:11 +08:00
|
|
|
|
* 规定固定容量的桶, token 以固定速度往桶内填充, 当桶满时 token 不会被继续放入, 每过来一个请求把 token 从桶中移除, 如果桶中没有 token 不能请求
|
|
|
|
|
|
|
|
|
|
``` java
|
2019-11-07 10:14:36 +08:00
|
|
|
|
public class TokenBucket {
|
|
|
|
|
/**
|
|
|
|
|
* 时间
|
|
|
|
|
*/
|
|
|
|
|
private long time;
|
|
|
|
|
/**
|
|
|
|
|
* 总量
|
|
|
|
|
*/
|
|
|
|
|
private Double total;
|
|
|
|
|
/**
|
|
|
|
|
* token 放入速度
|
|
|
|
|
*/
|
|
|
|
|
private Double rate;
|
|
|
|
|
/**
|
|
|
|
|
* 当前总量
|
|
|
|
|
*/
|
|
|
|
|
private Double nowSize;
|
|
|
|
|
|
|
|
|
|
public boolean limit() {
|
|
|
|
|
long now = System.currentTimeMillis();
|
|
|
|
|
nowSize = Math.min(total, nowSize + (now - time) * rate);
|
|
|
|
|
time = now;
|
|
|
|
|
if (nowSize < 1) {
|
|
|
|
|
// 桶里没有token
|
|
|
|
|
return false;
|
|
|
|
|
} else {
|
|
|
|
|
// 存在token
|
|
|
|
|
nowSize -= 1;
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
## 工作中的使用
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
2019-11-07 10:14:36 +08:00
|
|
|
|
### spring cloud gateway
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
|
|
|
|
* spring cloud gateway 默认使用redis进行限流, 笔者一般只是修改修改参数属于拿来即用. 并没有去从头实现上述那些算法.
|
|
|
|
|
|
|
|
|
|
``` xml
|
2019-11-07 10:14:36 +08:00
|
|
|
|
<dependency>
|
|
|
|
|
<groupId>org.springframework.cloud</groupId>
|
|
|
|
|
<artifactId>spring-cloud-starter-gateway</artifactId>
|
|
|
|
|
</dependency>
|
|
|
|
|
<dependency>
|
|
|
|
|
<groupId>org.springframework.boot</groupId>
|
|
|
|
|
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
|
|
|
|
|
</dependency>
|
|
|
|
|
```
|
|
|
|
|
|
2020-05-06 20:23:11 +08:00
|
|
|
|
``` yaml
|
2019-11-07 10:14:36 +08:00
|
|
|
|
spring:
|
|
|
|
|
cloud:
|
|
|
|
|
gateway:
|
|
|
|
|
routes:
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
2019-11-07 10:14:36 +08:00
|
|
|
|
- id: requestratelimiter_route
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
2019-11-07 10:14:36 +08:00
|
|
|
|
uri: lb://pigx-upms
|
|
|
|
|
order: 10000
|
|
|
|
|
predicates:
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
2019-11-07 10:14:36 +08:00
|
|
|
|
- Path=/admin/**
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
2019-11-07 10:14:36 +08:00
|
|
|
|
filters:
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
2019-11-07 10:14:36 +08:00
|
|
|
|
- name: RequestRateLimiter
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
2019-11-07 10:14:36 +08:00
|
|
|
|
args:
|
|
|
|
|
redis-rate-limiter.replenishRate: 1 # 令牌桶的容积
|
|
|
|
|
redis-rate-limiter.burstCapacity: 3 # 流速 每秒
|
|
|
|
|
key-resolver: "#{@remoteAddrKeyResolver}" #SPEL表达式去的对应的bean
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
2019-11-07 10:14:36 +08:00
|
|
|
|
- StripPrefix=1
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
2019-11-07 10:14:36 +08:00
|
|
|
|
```
|
|
|
|
|
|
2020-05-06 20:23:11 +08:00
|
|
|
|
``` java
|
2019-11-07 10:14:36 +08:00
|
|
|
|
@Bean
|
|
|
|
|
KeyResolver remoteAddrKeyResolver() {
|
|
|
|
|
return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
### sentinel
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
|
|
|
|
* 通过配置来控制每个url的流量
|
|
|
|
|
|
|
|
|
|
``` xml
|
2019-11-07 10:14:36 +08:00
|
|
|
|
<dependency>
|
|
|
|
|
<groupId>com.alibaba.cloud</groupId>
|
|
|
|
|
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
|
|
|
|
|
</dependency>
|
|
|
|
|
```
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
|
|
|
|
``` yaml
|
2019-11-07 10:14:36 +08:00
|
|
|
|
spring:
|
|
|
|
|
cloud:
|
|
|
|
|
nacos:
|
|
|
|
|
discovery:
|
|
|
|
|
server-addr: localhost:8848
|
|
|
|
|
sentinel:
|
|
|
|
|
transport:
|
|
|
|
|
dashboard: localhost:8080
|
|
|
|
|
port: 8720
|
|
|
|
|
datasource:
|
|
|
|
|
ds:
|
|
|
|
|
nacos:
|
|
|
|
|
server-addr: localhost:8848
|
|
|
|
|
dataId: spring-cloud-sentinel-nacos
|
|
|
|
|
groupId: DEFAULT_GROUP
|
|
|
|
|
rule-type: flow
|
|
|
|
|
namespace: xxxxxxxx
|
|
|
|
|
```
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
|
|
|
|
* 配置内容在nacos上进行编辑
|
|
|
|
|
|
|
|
|
|
``` json
|
2019-11-07 10:14:36 +08:00
|
|
|
|
[
|
|
|
|
|
{
|
|
|
|
|
"resource": "/hello",
|
|
|
|
|
"limitApp": "default",
|
|
|
|
|
"grade": 1,
|
|
|
|
|
"count": 1,
|
|
|
|
|
"strategy": 0,
|
|
|
|
|
"controlBehavior": 0,
|
|
|
|
|
"clusterMode": false
|
|
|
|
|
}
|
|
|
|
|
]
|
|
|
|
|
```
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
|
|
|
|
* resource:资源名,即限流规则的作用对象。
|
|
|
|
|
* limitApp:流控针对的调用来源,若为 default 则不区分调用来源。
|
|
|
|
|
* grade:限流阈值类型,QPS 或线程数模式,0代表根据并发数量来限流,1代表根据QPS来进行流量控制。
|
|
|
|
|
* count:限流阈值
|
|
|
|
|
* strategy:判断的根据是资源自身,还是根据其它关联资源 (refResource),还是根据链路入口
|
|
|
|
|
* controlBehavior:流控效果(直接拒绝 / 排队等待 / 慢启动模式)
|
|
|
|
|
* clusterMode:是否为集群模式
|
|
|
|
|
|
2019-11-07 10:14:36 +08:00
|
|
|
|
### 总结
|
2020-05-06 20:23:11 +08:00
|
|
|
|
|
|
|
|
|
> sentinel和spring cloud gateway两个框架都是很好的限流框架, 但是在我使用中还没有将[spring-cloud-alibaba](https://github.com/alibaba/spring-cloud-alibaba)接入到项目中进行使用, 所以我会选择**spring cloud gateway**, 当接入完整的或者接入Nacos项目使用setinel会有更加好的体验.
|