From 9f41b9c413824dbb5a2e8a1e6dbc9081a2bc0400 Mon Sep 17 00:00:00 2001 From: wangtao Date: Thu, 7 Nov 2019 10:14:36 +0800 Subject: [PATCH] docs(from-readers): add an article about current limiting --- README.md | 2 +- .../huifer-how-to-limit-current.md | 362 ++++++++++++++++++ 2 files changed, 363 insertions(+), 1 deletion(-) create mode 100644 docs/high-concurrency/huifer-how-to-limit-current.md diff --git a/README.md b/README.md index f5a0f5c..a7a6509 100644 --- a/README.md +++ b/README.md @@ -101,7 +101,7 @@ - 如何设计一个高可用系统? ### 限流 -- 如何限流?在工作中是怎么做的?说一下具体的实现? +- [如何限流?在工作中是怎么做的?说一下具体的实现?](/docs/high-concurrency/huifer-how-to-limit-current.md) ### 熔断 - 如何进行熔断? diff --git a/docs/high-concurrency/huifer-how-to-limit-current.md b/docs/high-concurrency/huifer-how-to-limit-current.md new file mode 100644 index 0000000..88c9b90 --- /dev/null +++ b/docs/high-concurrency/huifer-how-to-limit-current.md @@ -0,0 +1,362 @@ +# 如何限流?在工作中是怎么做的?说一下具体的实现? +- Author: [HuiFer](https://github.com/huifer) +- Description: 该文简单介绍限流相关技术以及实现 + +## 什么是限流 +> 限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。 + +## 限流方法 + +### 计数器 + +#### 实现方式 +- 控制单位时间内的请求数量 +```java + +import java.util.concurrent.atomic.AtomicInteger; + +public class Counter { + /** + * 最大访问数量 + */ + private final int limit = 10; + /** + * 访问时间差 + */ + private final long timeout = 1000; + /** + * 请求时间 + */ + private long time; + /** + * 当前计数器 + */ + private AtomicInteger reqCount = new AtomicInteger(0); + + public boolean limit() { + long now = System.currentTimeMillis(); + if (now < time + timeout) { + // 单位时间内 + reqCount.addAndGet(1); + return reqCount.get() <= limit; + } else { + // 超出单位时间 + time = now; + reqCount = new AtomicInteger(0); + return true; + } + } +} + +``` + +- 劣势 + - 假设在 00:01 时发生一个请求,在 00:01-00:58 之间不在发送请求,在 00:59 时发送剩下的所有请求 `n-1` (n为限流请求数量),在下一分钟的 00:01 发送n个请求,这样在2秒钟内请求到达了 `2n - 1` 个. + - 设每分钟请求数量为60个,每秒可以处理1个请求,用户在 00:59 发送 60 个请求,在 01:00 发送 60 个请求 此时2秒钟有120个请求(每秒60个请求),远远大于了每秒钟处理数量的阈值 + + +### 滑动窗口 +#### 实现方式 +- 滑动窗口是对计数器方式的改进,增加一个时间粒度的度量单位 + - 把一分钟分成若干等分(6份,每份10秒), 在每一份上设置独立计数器,在 00:00-00:09 之间发生请求计数器累加1.当等分数量越大限流统计就越详细 + +```java +package com.example.demo1.service; + +import java.util.Iterator; +import java.util.Random; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.IntStream; + +public class TimeWindow { + private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); + + /** + * 间隔秒数 + */ + private int seconds; + + /** + * 最大限流 + */ + private int max; + + public TimeWindow(int max, int seconds) { + this.seconds = seconds; + this.max = max; + + /** + * 永续线程执行清理queue 任务 + */ + new Thread(() -> { + while (true) { + try { + // 等待 间隔秒数-1 执行清理操作 + Thread.sleep((seconds - 1) * 1000L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + clean(); + } + }).start(); + + } + + public static void main(String[] args) throws Exception { + + final TimeWindow timeWindow = new TimeWindow(10, 1); + + // 测试3个线程 + IntStream.range(0, 3).forEach((i) -> { + new Thread(() -> { + + while (true) { + + try { + Thread.sleep(new Random().nextInt(20) * 100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + timeWindow.take(); + } + + }).start(); + + }); + + } + + /** + * 获取令牌,并且添加时间 + */ + public void take() { + + + long start = System.currentTimeMillis(); + try { + + + int size = sizeOfValid(); + if (size > max) { + System.err.println("超限"); + + } + synchronized (queue) { + if (sizeOfValid() > max) { + System.err.println("超限"); + System.err.println("queue中有 " + queue.size() + " 最大数量 " + max); + } + this.queue.offer(System.currentTimeMillis()); + } + System.out.println("queue中有 " + queue.size() + " 最大数量 " + max); + + } + + } + + + public int sizeOfValid() { + Iterator it = queue.iterator(); + Long ms = System.currentTimeMillis() - seconds * 1000; + int count = 0; + while (it.hasNext()) { + long t = it.next(); + if (t > ms) { + // 在当前的统计时间范围内 + count++; + } + } + + return count; + } + + + /** + * 清理过期的时间 + */ + public void clean() { + Long c = System.currentTimeMillis() - seconds * 1000; + + Long tl = null; + while ((tl = queue.peek()) != null && tl < c) { + System.out.println("清理数据"); + queue.poll(); + } + } + +} + +``` + +### Leaky Bucket 漏桶 +#### 实现方式 +- 规定固定容量的桶,有水进入,有水流出. 对于流进的水我们无法估计进来的数量、速度,对于流出的水我们可以控制速度. +```java +public class LeakBucket { + /** + * 时间 + */ + private long time; + /** + * 总量 + */ + private Double total; + /** + * 水流出去的速度 + */ + private Double rate; + /** + * 当前总量 + */ + private Double nowSize; + + + public boolean limit() { + long now = System.currentTimeMillis(); + nowSize = Math.max(0, (nowSize - (now - time) * rate)); + time = now; + if ((nowSize + 1) < total) { + nowSize++; + return true; + } else { + return false; + } + + } +} +``` + +### Token Bucket 令牌桶 +#### 实现方式 +- 规定固定容量的桶,token 以固定速度往桶内填充,当桶满时 token 不会被继续放入,每过来一个请求把 token 从桶中移除,如果桶中没有 token 不能请求 + +```java +public class TokenBucket { + /** + * 时间 + */ + private long time; + /** + * 总量 + */ + private Double total; + /** + * token 放入速度 + */ + private Double rate; + /** + * 当前总量 + */ + private Double nowSize; + + + public boolean limit() { + long now = System.currentTimeMillis(); + nowSize = Math.min(total, nowSize + (now - time) * rate); + time = now; + if (nowSize < 1) { + // 桶里没有token + return false; + } else { + // 存在token + nowSize -= 1; + return true; + } + } + + + +} +``` + +## 工作中的使用 +### spring cloud gateway +- spring cloud gateway 默认使用redis进行限流,笔者一般只是修改修改参数属于拿来即用.并没有去从头实现上述那些算法. +```xml + + org.springframework.cloud + spring-cloud-starter-gateway + + + org.springframework.boot + spring-boot-starter-data-redis-reactive + +``` + +```yaml +spring: + cloud: + gateway: + routes: + - id: requestratelimiter_route + uri: lb://pigx-upms + order: 10000 + predicates: + - Path=/admin/** + filters: + - name: RequestRateLimiter + args: + redis-rate-limiter.replenishRate: 1 # 令牌桶的容积 + redis-rate-limiter.burstCapacity: 3 # 流速 每秒 + key-resolver: "#{@remoteAddrKeyResolver}" #SPEL表达式去的对应的bean + - StripPrefix=1 +``` + +```java +@Bean +KeyResolver remoteAddrKeyResolver() { + return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName()); +} +``` + + +### sentinel +- 通过配置来控制每个url的流量 +```xml + + com.alibaba.cloud + spring-cloud-starter-alibaba-sentinel + +``` +```yaml +spring: + cloud: + nacos: + discovery: + server-addr: localhost:8848 + sentinel: + transport: + dashboard: localhost:8080 + port: 8720 + datasource: + ds: + nacos: + server-addr: localhost:8848 + dataId: spring-cloud-sentinel-nacos + groupId: DEFAULT_GROUP + rule-type: flow + namespace: xxxxxxxx +``` +- 配置内容在nacos上进行编辑 +```json +[ + { + "resource": "/hello", + "limitApp": "default", + "grade": 1, + "count": 1, + "strategy": 0, + "controlBehavior": 0, + "clusterMode": false + } +] +``` +- resource:资源名,即限流规则的作用对象。 +- limitApp:流控针对的调用来源,若为 default 则不区分调用来源。 +- grade:限流阈值类型,QPS 或线程数模式,0代表根据并发数量来限流,1代表根据QPS来进行流量控制。 +- count:限流阈值 +- strategy:判断的根据是资源自身,还是根据其它关联资源 (refResource),还是根据链路入口 +- controlBehavior:流控效果(直接拒绝 / 排队等待 / 慢启动模式) +- clusterMode:是否为集群模式 +### 总结 +> sentinel和spring cloud gateway两个框架都是很好的限流框架,但是在我使用中还没有将[spring-cloud-alibaba](https://github.com/alibaba/spring-cloud-alibaba)接入到项目中进行使用,所以我会选择**spring cloud gateway**,当接入完整的或者接入Nacos项目使用setinel会有更加好的体验. \ No newline at end of file