advanced-java/docs/high-concurrency/how-to-limit-current.md

377 lines
9.9 KiB
Markdown
Raw Normal View History

# 如何限流?在工作中是怎么做的?说一下具体的实现?
2020-05-06 20:23:11 +08:00
## 什么是限流
2020-05-06 20:23:11 +08:00
> 限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。
## 限流方法
### 计数器
#### 实现方式
2020-05-06 20:23:11 +08:00
2021-09-02 22:01:32 +08:00
控制单位时间内的请求数量。
2020-05-06 20:23:11 +08:00
```java
import java.util.concurrent.atomic.AtomicInteger;
public class Counter {
/**
* 最大访问数量
*/
private final int limit = 10;
/**
* 访问时间差
*/
private final long timeout = 1000;
/**
* 请求时间
*/
private long time;
/**
* 当前计数器
*/
private AtomicInteger reqCount = new AtomicInteger(0);
public boolean limit() {
long now = System.currentTimeMillis();
if (now < time + timeout) {
// 单位时间内
reqCount.addAndGet(1);
return reqCount.get() <= limit;
} else {
// 超出单位时间
time = now;
reqCount = new AtomicInteger(0);
return true;
}
}
}
```
2021-09-02 22:01:32 +08:00
劣势:
假设在 00:01 时发生一个请求,在 00:01-00:58 之间不在发送请求,在 00:59 时发送剩下的所有请求 `n-1` (n 为限流请求数量),在下一分钟的 00:01 发送 n 个请求,这样在 2 秒钟内请求到达了 `2n - 1` 个。
设每分钟请求数量为 60 个,每秒可以处理 1 个请求,用户在 00:59 发送 60 个请求,在 01:00 发送 60 个请求 此时 2 秒钟有 120 个请求(每秒 60 个请求),远远大于了每秒钟处理数量的阈值。
### 滑动窗口
2020-05-06 20:23:11 +08:00
#### 实现方式
2020-05-06 20:23:11 +08:00
2021-09-02 22:01:32 +08:00
滑动窗口是对计数器方式的改进,增加一个时间粒度的度量单位,把一分钟分成若干等分(6 份,每份 10 秒),在每一份上设置独立计数器,在 00:00-00:09 之间发生请求计数器累加 1。当等分数量越大限流统计就越详细。
```java
package com.example.demo1.service;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.IntStream;
public class TimeWindow {
private ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<Long>();
/**
* 间隔秒数
*/
private int seconds;
/**
* 最大限流
*/
private int max;
2021-09-02 22:01:32 +08:00
public TimeWindow(int max int seconds) {
this.seconds = seconds;
this.max = max;
/**
* 永续线程执行清理queue 任务
*/
new Thread(() -> {
while (true) {
try {
// 等待 间隔秒数-1 执行清理操作
Thread.sleep((seconds - 1) * 1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
clean();
}
}).start();
}
public static void main(String[] args) throws Exception {
2021-09-02 22:01:32 +08:00
final TimeWindow timeWindow = new TimeWindow(10 1);
// 测试3个线程
2021-09-02 22:01:32 +08:00
IntStream.range(0 3).forEach((i) -> {
new Thread(() -> {
while (true) {
try {
Thread.sleep(new Random().nextInt(20) * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
timeWindow.take();
}
}).start();
});
}
/**
* 获取令牌,并且添加时间
*/
public void take() {
long start = System.currentTimeMillis();
try {
int size = sizeOfValid();
if (size > max) {
System.err.println("超限");
}
synchronized (queue) {
if (sizeOfValid() > max) {
System.err.println("超限");
System.err.println("queue中有 " + queue.size() + " 最大数量 " + max);
}
this.queue.offer(System.currentTimeMillis());
}
System.out.println("queue中有 " + queue.size() + " 最大数量 " + max);
}
}
public int sizeOfValid() {
Iterator<Long> it = queue.iterator();
Long ms = System.currentTimeMillis() - seconds * 1000;
int count = 0;
while (it.hasNext()) {
long t = it.next();
if (t > ms) {
// 在当前的统计时间范围内
count++;
}
}
return count;
}
/**
* 清理过期的时间
*/
public void clean() {
Long c = System.currentTimeMillis() - seconds * 1000;
Long tl = null;
while ((tl = queue.peek()) != null && tl < c) {
System.out.println("清理数据");
queue.poll();
}
}
}
```
### Leaky Bucket 漏桶
2020-05-06 20:23:11 +08:00
#### 实现方式
2020-05-06 20:23:11 +08:00
2021-09-02 22:01:32 +08:00
规定固定容量的桶,有水进入,有水流出。对于流进的水我们无法估计进来的数量、速度,对于流出的水我们可以控制速度。
2020-05-06 20:23:11 +08:00
```java
public class LeakBucket {
/**
* 时间
*/
private long time;
/**
* 总量
*/
private Double total;
/**
* 水流出去的速度
*/
private Double rate;
/**
* 当前总量
*/
private Double nowSize;
public boolean limit() {
long now = System.currentTimeMillis();
2021-09-02 22:01:32 +08:00
nowSize = Math.max(0 (nowSize - (now - time) * rate));
time = now;
if ((nowSize + 1) < total) {
nowSize++;
return true;
} else {
return false;
}
}
}
```
### Token Bucket 令牌桶
2020-05-06 20:23:11 +08:00
#### 实现方式
2021-09-02 22:01:32 +08:00
规定固定容量的桶, token 以固定速度往桶内填充, 当桶满时 token 不会被继续放入, 每过来一个请求把 token 从桶中移除, 如果桶中没有 token 不能请求。
2020-05-06 20:23:11 +08:00
```java
public class TokenBucket {
/**
* 时间
*/
private long time;
/**
* 总量
*/
private Double total;
/**
* token 放入速度
*/
private Double rate;
/**
* 当前总量
*/
private Double nowSize;
public boolean limit() {
long now = System.currentTimeMillis();
2021-09-02 22:01:32 +08:00
nowSize = Math.min(total nowSize + (now - time) * rate);
time = now;
if (nowSize < 1) {
// 桶里没有token
return false;
} else {
// 存在token
nowSize -= 1;
return true;
}
}
}
```
## 工作中的使用
2020-05-06 20:23:11 +08:00
### spring cloud gateway
2020-05-06 20:23:11 +08:00
2022-09-26 10:34:20 +08:00
- spring cloud gateway 默认使用 redis 进行限流,笔者一般只是修改修改参数属于拿来即用,并没有去从头实现上述那些算法。
2020-05-06 20:23:11 +08:00
```xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
```
```yaml
spring:
2023-05-25 09:35:57 +08:00
cloud:
gateway:
routes:
- id: requestratelimiter_route
uri: lb://pigx-upms
order: 10000
predicates:
- Path=/admin/**
filters:
- name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 1 # 令牌桶的容积
redis-rate-limiter.burstCapacity: 3 # 流速 每秒
key-resolver: '#{@remoteAddrKeyResolver}' #SPEL表达式去的对应的bean
- StripPrefix=1
```
```java
@Bean
KeyResolver remoteAddrKeyResolver() {
return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
}
```
### sentinel
2020-05-06 20:23:11 +08:00
2022-09-26 10:34:20 +08:00
- 通过配置来控制每个 url 的流量
2020-05-06 20:23:11 +08:00
```xml
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
```
2020-05-06 20:23:11 +08:00
```yaml
spring:
2023-05-25 09:35:57 +08:00
cloud:
nacos:
discovery:
server-addr: localhost:8848
sentinel:
transport:
dashboard: localhost:8080
port: 8720
datasource:
ds:
nacos:
server-addr: localhost:8848
dataId: spring-cloud-sentinel-nacos
groupId: DEFAULT_GROUP
rule-type: flow
namespace: xxxxxxxx
```
2020-05-06 20:23:11 +08:00
2022-09-26 10:34:20 +08:00
- 配置内容在 nacos 上进行编辑
2020-05-06 20:23:11 +08:00
```json
[
2023-05-25 09:35:57 +08:00
{
"resource": "/hello",
"limitApp": "default",
"grade": 1,
"count": 1,
"strategy": 0,
"controlBehavior": 0,
"clusterMode": false
}
]
```
2020-05-06 20:23:11 +08:00
2022-09-26 10:34:20 +08:00
- resource资源名即限流规则的作用对象。
- limitApp流控针对的调用来源若为 default 则不区分调用来源。
- grade限流阈值类型QPS 或线程数模式0 代表根据并发数量来限流1 代表根据 QPS 来进行流量控制。
- count限流阈值
- strategy判断的根据是资源自身还是根据其它关联资源 (refResource),还是根据链路入口
- controlBehavior流控效果直接拒绝 / 排队等待 / 慢启动模式)
- clusterMode是否为集群模式
2020-05-06 20:23:11 +08:00
### 总结
2020-05-06 20:23:11 +08:00
2021-09-02 22:01:32 +08:00
> sentinel 和 spring cloud gateway 两个框架都是很好的限流框架, 但是在我使用中还没有将[spring-cloud-alibaba](https://github.com/alibaba/spring-cloud-alibaba)接入到项目中进行使用, 所以我会选择**spring cloud gateway** 当接入完整的或者接入 Nacos 项目使用 setinel 会有更加好的体验.