PRF:20171003 Streams a new general purpose data structure in Redis.md

@qhwdw 校对完成,请你看看有无严重不当之处。辛苦了。@pityonline 也可以看看~
This commit is contained in:
Xingyu.Wang 2018-07-21 16:26:28 +08:00
parent 528649f6b9
commit 100b59486c

View File

@ -30,43 +30,49 @@ streams一个新的 Redis 通用数据结构
结合 radix 树和 listpacks它可以很容易地去构建一个日志它同时也是非常空间高效的并且是索引化的这意味着允许通过 ID 和时间进行随机访问。自从这些就绪后,我开始去写一些代码以实现流数据结构。我最终完成了该实现,不管怎样,现在,在 Github 上的 Redis 的 “streams” 分支里面,它已经可以跑起来了。我并没有声称那个 API 是 100% 的最终版本,但是,这有两个有趣的事实:一是,在那时,只有消费组是缺失的,加上一些不那么重要的命令去操作流,但是,所有的大的方面都已经实现了。二是,一旦各个方面比较稳定了之后 ,决定大概用两个月的时间将所有的流的工作向后移植到 4.0 分支。这意味着 Redis 用户为了使用流,不用等待 Redis 4.2,它们在生产环境马上就可用了。这是可能的,因为作为一个新的数据结构,几乎所有的代码改变都出现在新的代码里面。除了阻塞列表操作之外:该代码被重构了,我们对于流和列表阻塞操作共享了相同的代码,而极大地简化了 Redis 内部。
教程:欢迎使用 Redis 流
==================================
### 教程:欢迎使用 Redis 流
在某种程序上,你应该感谢流作为 Redis 列表的一个增强版本。流元素不再是一个单一的字符串,它们更多是一个fields和值values组成的对象。范围查询更适用而且更快。流中的每个条目都有一个 ID它是一个逻辑偏移量。不同的客户端可以阻塞等待blocking-wait比指定的 IDs 更大的元素。Redis 流的一个基本的命令是 XADD。是的所有的 Redis 命令都是以一个“X”为前缀的。
在某些方面,你可以认为流是 Redis 列表的一个增强版本。流元素不再是一个单一的字符串,它们更多是一个<ruby><rt>field</rt></ruby><ruby><rt>value</rt></ruby>组成的对象。范围查询更适用而且更快。流中的每个条目都有一个 ID它是一个逻辑偏移量。不同的客户端可以<ruby>阻塞等待<rt>blocking-wait</rt></ruby>比指定的 ID 更大的元素。Redis 流的一个基本的命令是 XADD。是的所有的 Redis 命令都是以一个“X”为前缀的。
```
> XADD mystream * sensor-id 1234 temperature 10.5
1506871964177.0
```
这个 XADD 命令将追加指定的条目作为一个新元素到一个指定的流 “mystream” 中。在上面的示例中的这个条目有两个域sensor-id 和 temperature然而每个条目在同一个流中可以有不同的域。使用相同的域名字将导致更多的内存使用。一个有趣的事情是域的排序是保证保存的。XADD 仅返回插入的条目的 ID因为在第三个参数中有星号*),我们请求命令去自动生成 ID。这几乎总是你想要的但是它也可能去强制指定一个 ID实例为了去复制这个命令到被动服务器和 AOF 文件。
这个 `XADD` 命令将追加指定的条目作为一个指定的流 “mystream” 的新元素。在上面的示例中的这个条目有两个域:`sensor-id` 和 `temperature`每个条目在同一个流中可以有不同的域。使用相同的域名字可以更好地利用内存。一个有趣的事情是域的排序是可以保证顺序的。XADD 仅返回插入的条目的 ID因为在第三个参数中是星号`*`),表示我们邀请命令自动生成 ID。一般情况下需求如此但是也可以去强制指定一个 ID这种情况用于复制这个命令到被动服务器和 AOF 文件。
这个 ID 是由两部分组成的:一个毫秒时间和一个序列号。1506871964177 是毫秒时间,它仅是一个使用毫秒解决方案的 UNIX 时间。圆点(.)后面的数字 0是一个序列号它是为了区分相同毫秒数的条目增加上去的。所有的数字都是 64 位的无符号整数。这意味着在流中,我们可以增加所有我们想要的条目,在同毫秒的事件。ID 的毫秒部分使用 Redis 服务器的当前本地时间生成的 ID 和流中的最后一个条目之间的最大值来获取。因此,对实例来说,即使是计算机时间向后跳,这个 IDs 仍然是增加的。在某些情况下,你可能想流条目的 IDs 作为完整的 128 位数字。然而,现实是,它们与被添加的实例的本地时间有关,意味着我们有毫秒级的精确的范围查询。
这个 ID 是由两部分组成的:一个毫秒时间和一个序列号。`1506871964177` 是毫秒时间,它仅是一个毫秒级的 UNIX 时间。圆点(`.`)后面的数字 `0`,是一个序列号,它是为了区分相同毫秒数的条目增加上去的。这两个数字都是 64 位的无符号整数。这意味着在流中,我们可以增加所有我们想要的条目,即使是在同毫秒中。ID 的毫秒部分使用 Redis 服务器的当前本地时间生成的 ID 和流中的最后一个条目 ID 两者间的最大的一个。因此,对实例来说,即使是计算机时间向后跳,这个 ID 仍然是增加的。在某些情况下,你可以认为流条目的 ID 是完整的 128 位数字。然而,现实是,它们与被添加的实例的本地时间有关,这意味着我们可以在毫秒级的精度的范围随意查询。
如你想像的那样,以一个快速的方式去添加两个条目,结果是仅序列号增加。我可以使用一个 MULTI/EXEC 块去简单模拟“快速插入”,如下:
如你想像的那样,以一个非常快速的方式去添加两个条目,结果是仅序列号增加。我可以使用一个 `MULTI`/`EXEC` 块去简单模拟“快速插入”,如下:
> MULTI
```
> MULTI
OK
> XADD mystream * foo 10
> XADD mystream * foo 10
QUEUED
> XADD mystream * bar 20
QUEUED
> EXEC
1) 1506872463535.0
2) 1506872463535.1
```
在上面的示例中也展示了无需在开始时指定任何模式schema的情况下对不同的条目使用不同的域。会发生什么呢每个块它通常包含 50 - 150 个消息范围的内容)的每一个信息被用作参考。并且,有相同域的连续条目被使用一个标志进行压缩,这个标志表明“这个块中的第一个条目的相同域”。因此,对于连续消息使用相同域可以节省许多内存,即使是域的集合随着时间发生缓慢变化。
为了从流中检索数据,这里有两种方法:范围查询,它是通过 XRANGE 命令实现的,并且对于正在变化的流,通过 XREAD 命令去实现。XRANGE 命令仅取得包括从开始到停止范围内的条目。因此,对于实例,如果我知道它的 ID我可以取得单个条目像这样
在上面的示例中,也展示了无需在开始时指定任何<ruby>模式<rt>schema</rt></ruby>的情况下,对不同的条目使用不同的域。会发生什么呢?每个块(它通常包含 50 - 150 个消息内容)的每一个信息被用作参考。并且,有相同域的连续条目被使用一个标志进行压缩,这个标志表明与“这个块中的第一个条目的相同域”。因此,对于连续消息使用相同域可以节省许多内存,即使是域的集合随着时间发生缓慢变化。
为了从流中检索数据,这里有两种方法:范围查询,它是通过 `XRANGE` 命令实现的;以及流式,它是通过 XREAD 命令实现的。`XRANGE` 命令仅取得包括从开始到停止范围内的条目。因此,对于实例,如果我知道它的 ID我可以取得单个条目像这样
```
> XRANGE mystream 1506871964177.0 1506871964177.0
1) 1) 1506871964177.0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "10.5"
```
然而,你可以使用指定的开始符号 “-” 和停止符号 “+” 去表示可能的最小和最大 ID。为了限制返回条目的数量它也可以使用 COUNT 选项。下面是一个更复杂的 XRANGE 示例:
然而,你可以使用指定的开始符号 `-` 和停止符号 `+` 去表示最小和最大的 ID。为了限制返回条目的数量它也可以使用 `COUNT` 选项。下面是一个更复杂的 `XRANGE` 示例:
```
> XRANGE mystream - + COUNT 2
1) 1) 1506871964177.0
2) 1) "sensor-id"
@ -76,9 +82,11 @@ QUEUED
2) 1) 1506872463535.0
2) 1) "foo"
2) "10"
```
这里我们讲的是 IDs 的范围,然后,为了在一个给定时间范围内取得特定元素的范围,你可以使用 XRANGE因为你可以省略 IDs 的“序列” 部分。因此,你可以做的仅是指定“毫秒”时间,下面的命令的意思是: “从 UNIX 时间 1506872463 开始给我 10 个条目”:
这里我们讲的是 ID 的范围,然后,为了取得在一个给定时间范围内的特定范围的元素,你可以使用 `XRANGE`,因为你可以省略 ID 的“序列” 部分。因此,你可以做的仅是指定“毫秒”时间,下面的命令的意思是: “从 UNIX 时间 1506872463 开始给我 10 个条目”:
```
127.0.0.1:6379> XRANGE mystream 1506872463000 + COUNT 10
1) 1) 1506872463535.0
2) 1) "foo"
@ -86,24 +94,27 @@ QUEUED
2) 1) 1506872463535.1
2) 1) "bar"
2) "20"
```
关于 XRANGE 注意的最重要的事情是,由于我们在回复中收到了 IDs并且连续 ID 是无法直接获得的,因为 ID 的序列部分是增加的,它可以使用 XRANGE 去遍历整个流,接收每个调用的元素的特定个数。在 Redis 中的*SCAN*系列命令之后,那是允许 Redis 数据结构迭代的,尽管事实上它们不是为迭代设计的,我以免再次产生相同的错误。
关于 `XRANGE` 最后一个重要的事情是,考虑到我们在回复中收到 ID并且随后连续的 ID 只是增加了 ID 的序列部分,所以可以使用 `XRANGE` 去遍历整个流,接收每个调用的指定个数的元素。在 Redis 中的`*SCAN` 系列命令之后,它允许 Redis 数据结构迭代,尽管事实上它们不是为迭代设计的,但我避免再犯相同的错误。
使用 XREAD 处理变化的流:阻塞新的数据
===========================================
### 使用 XREAD 处理变化的流:阻塞新的数据
XRANGE 用于,当我们想通过 ID 或时间去访问流中的一个范围或者是通过 ID 去得到单个元素时,是非常完美的。然而,在当数据到达时,不同的客户端必须同时使用流的情况下,它就不是一个很好的解决方案,并且它是需要某种形式的“池”的。(对于*某些*应用程序来说,这可能是个好主意,因为它们仅是偶尔连接取数的)
当我们想通过 ID 或时间去访问流中的一个范围或者是通过 ID 去得到单个元素时,使用 XRANGE 是非常完美的。然而,在当数据到达时,流必须由不同的客户端处理时,它就不是一个很好的解决方案,它是需要某种形式的“池”。(对于*某些*应用程序来说,这可能是个好主意,因为它们仅是偶尔连接取数的)
XREAD 命令是为读设计的,同时从多个流中仅指定我们得到的流中的最后条目的 ID。此外如果没有数据可用我们可以要求阻塞当数据到达时去解除阻塞。类似于阻塞列表操作产生的效果但是这里的数据是从流中得到的。并且多个客户端可以在同时访问相同的数据。
这里有一个关于 XREAD 调用的规范示例:
`XREAD` 命令是为读设计的,同时从多个流中仅指定我们从该流中得到的最后条目的 ID。此外如果没有数据可用我们可以要求阻塞当数据到达时去解除阻塞。类似于阻塞列表操作产生的效果但是这里的数据是从流中得到的。并且多个客户端可以在同时访问相同的数据。
这里有一个关于 `XREAD` 调用的规范示例:
```
> XREAD BLOCK 5000 STREAMS mystream otherstream $ $
```
它的意思是:从 “mystream” 和 “otherstream” 取得数据。如果没有数据可用,阻塞客户端 5000 毫秒。之后我们用关键字 STREAMS 指定我们想要监听的流,和最后的 ID指定的 ID “$” 意思是:假设我现在已经有了流中的所有元素,因此,从下一个到达的元素开始给我。
如果,从另外一个客户端,我发出这样的命令:
它的意思是:从 `mystream``otherstream` 取得数据。如果没有数据可用,阻塞客户端 5000 毫秒。之后我们用关键字 `STREAMS` 指定我们想要监听的流,和我们的最后 ID指定的 ID `$` 意思是:假设我现在已经有了流中的所有元素,因此,从下一个到达的元素开始给我。
如果,从另外一个客户端,我发出这样的命令:
```
> XADD otherstream * message “Hi There”
在 XREAD 侧会出现什么情况呢?
@ -112,44 +123,47 @@ XREAD 命令是为读设计的,同时从多个流中仅指定我们得到的
2) 1) 1) 1506935385635.0
2) 1) "message"
2) "Hi There"
```
和到过的数据一起,我们得到了最新到达的数据的 key在下次的调用中我们将使用接收到的最新消息的 ID
和收到的数据一起,我们得到了最新收到的数据的 key在下次的调用中我们将使用接收到的最新消息的 ID
```
> XREAD BLOCK 5000 STREAMS mystream otherstream $ 1506935385635.0
```
依次类推。然而需要注意的是使用方式,有可能客户端在一个非常大的延迟(因为它处理消息需要时间,或者其它什么原因)之后再次连接。在这种情况下,同时会有很多消息堆积,为了确保客户端不被消息淹没,并且服务器不会丢失太多时间的提供给单个客户端的大量消息,所以,总是使用 XREAD 的 COUNT 选项是明智的。
依次类推。然而需要注意的是使用方式,有可能客户端在一个非常大的延迟(因为它处理消息需要时间,或者其它什么原因)之后再次连接。在这种情况下,期间会有很多消息堆积,为了确保客户端不被消息淹没,并且服务器不会花费太多时间提供给单个客户端的大量消息,所以,总是使用 `XREAD``COUNT` 选项是明智的。
流封顶
==============
到现在为止,一直还都不错… 然而,有些时候,流需要去删除一些旧的消息。幸运的是,这可以使用 XADD 命令的 MAXLEN 选项去做:
### 流封顶
到现在为止,一直还都不错…… 然而,有些时候,流需要去删除一些旧的消息。幸运的是,这可以使用 `XADD` 命令的 `MAXLEN` 选项去做:
```
> XADD mystream MAXLEN 1000000 * field1 value1 field2 value2
```
它是基本意思是,如果流添加的新元素被发现超过 1000000 个消息,那么,删除旧的消息,以便于长度回到 1000000 个元素以内。它很像是使用 RPUSH + LTRIM 的列表,但是,这是我们使用了一个内置机制去完成的。然而,需要注意的是,上面的意思是每次我们增加一个新的消息时,我们还需要另外的工作去从流中删除旧的消息。这将使用一些 CPU 资源,所以,在计算 MAXLEN 的之前,尽可能使用 “~” 符号,为了表明我们不是要求非常*精确*的 1000000 个消息。但是,这里有很多,它还不是一个大的问题:
它是基本意思是,如果流添加的新元素被发现超过 `1000000` 个消息,那么,删除旧的消息,以便于长度回到 `1000000` 个元素以内。它很像是使用 `RPUSH` + `LTRIM` 的列表,但是,这是我们使用了一个内置机制去完成的。然而,需要注意的是,上面的意思是每次我们增加一个新的消息时,我们还需要另外的工作去从流中删除旧的消息。这将使用一些 CPU 资源,所以,在计算 MAXLEN 的之前,尽可能使用 `~` 符号,为了表明我们不是要求非常*精确*的 1000000 个消息,而是稍微多一些也不是一个大的问题:
```
> XADD mystream MAXLEN ~ 1000000 * foo bar
```
这种方式的 XADD 删除消息,仅用于当它可以删除整个节点的时候。相比 vanilla XADD,这种方式几乎可以自由地对流进行封顶。
这种方式 XADD 仅当它可以删除整个节点的时候才会删除元素。相比普通的 `XADD`,这种方式几乎可以自由地对流进行封顶。
(进程内工作的)客户组
==================================
### 消费组(开发中)
这是不在 Redis 中实现的第一个特性,但是,它是在进程内工作的。它也是来自 Kafka 的灵感,尽管在这里以不同的方式去实现的。重点是使用了 XREAD客户端也可以增加一个 “GROUP <name> 选项。 在相同组的所有客户端自动调用,以得到*不同的*消息。当然,这里不能从同一个流中被多个组读。在这种情况下,所有的组将收到流中到达的消息的相同副本。但是,在不同的组,消息是不会重复的。
这是第一个 Redis 中尚未实现而在开发中的特性。它也是来自 Kafka 的灵感,尽管在这里以不同的方式去实现的。重点是使用了 `XREAD`,客户端也可以增加一个 `GROUP <name>` 选项。 在相同组的所有客户端自动调用,以得到*不同的*消息。当然,不能从同一个流中被多个组读。在这种情况下,所有的组将收到流中到达的消息的相同副本。但是,在每个组内,消息是不会重复的。
当指定组时,尽可能指定一个 “RETRY <milliseconds>” 选项去扩展组:在这种情况下,如果消息没有使用 XACK 去进行确认,它将在指定的毫秒数后进行再次投递。这种情况下,客户端没有私有的方法去标记已处理的消息这也是一项正在进行中的工作。
当指定组时,能指定一个 “RETRY <milliseconds>” 选项去扩展组:在这种情况下,如果消息没有使用 XACK 去进行确认,它将在指定的毫秒数后进行再次投递。这将为消息投递提供更佳的可靠性,这种情况下,客户端没有私有的方法去标记已处理的消息这也是一项正在进行中的工作。
内存使用和节省的加载时间
=====================================
### 内存使用和节省的加载时间
因为被设计用来模拟 Redis 流,所以,根据它们的域的数量、值和长度,内存使用是显著降低的。但对于简单的消息,每 100 MB 内存使用可以有几百万条消息,此外,设想中的格式去需要极少的系列化:是存储为 radix 树节点的 listpack 块,在磁盘上和内存中是用同一个来表示的,因此,它们被琐碎地存储和读取。在一个实例中Redis 能在 0.3 秒内可以从 RDB 文件中读取 500 万个条目。这使的流的复制和持久存储是非常高效的。
因为被设计用来模拟 Redis 流,内存使用是显著降低的。这依赖于它们的域的数量、值和长度,但对于简单的消息,每 100 MB 内存使用可以有几百万条消息,此外,设想中的格式去需要极少的系列化:listpack 块以 radix 树节点方式存储,在磁盘上和内存中是以相同方式表示的,因此,它们可以被轻松地存储和读取。在一个实例中Redis 能在 0.3 秒内可以从 RDB 文件中读取 500 万个条目。这使的流的复制和持久存储是非常高效的。
计划允许从条目中间删除。现在仅部分实现,策略是整个条目标记中删除的条目被标记为已删除条目,并且,当达到设置的已删除条目占全部条目的比例时,这个块将被回收重写,并且,如果需要,它将被连到相邻的另一个块上,以避免碎片化。
也计划允许从条目中间删除。现在仅部分实现,策略是删除的条目在标记中标识为已删除条目,并且,当达到已删除条目占全部条目的给定比例时,这个块将被回收重写,并且,如果需要,它将被连到相邻的另一个块上,以避免碎片化。
最终发布时间的结论
===================
### 最终发布时间的结论
Redis 流将包含在年底前推出的 Redis 4.0 系列的稳定版中。我认为这个通用的数据结构将为 Redis 提供一个巨大的补丁,为了用于解决很多现在很难去解决的情况:那意味着你需要创造性地滥用当前提供的数据结构去解决那些问题。一个非常重要的使用情况是时间系列,但是,我的感觉是,对于其它案例来说,通过 TREAD 来传递消息将是非常有趣的,因为它可以替代那些需要更高可靠性的发布/订阅的应用程序,而不是“即用即弃”,以及全新的使用案例。现在,如果你想在你的有问题的环境中,去评估新的数据结构的能力,可以在 GitHub 上去获得 “streams” 分支,开始去玩吧。欢迎向我们报告所有的 bug 。 :-)
Redis 流将包含在年底前LCTT 译注:本文原文发布于 2017 年 10 月)推出的 Redis 4.0 系列的稳定版中。我认为这个通用的数据结构将为 Redis 提供一个巨大的补丁,用于解决很多现在很难去解决的情况:那意味着你(之前)需要创造性地滥用当前提供的数据结构去解决那些问题。一个非常重要的使用情况是时间系列,但是,我的感觉是,对于其它用例来说,通过 `TREAD` 来传递消息将是非常有趣的,因为它可以替代那些需要更高可靠性的发布/订阅的应用程序,而不是“即用即弃”,以及全新的用例。现在,如果你想在你需要的环境中评估新的数据结构的能力,可以在 GitHub 上去获得 “streams” 分支,开始去玩吧。欢迎向我们报告所有的 bug 。 :-)
如果你喜欢这个视频,展示这个 streams 的实时会话在这里: https://www.youtube.com/watch?v=ELDzy9lCFHQ
@ -158,9 +172,9 @@ Redis 流将包含在年底前推出的 Redis 4.0 系列的稳定版中。我认
via: http://antirez.com/news/114
作者:[antirez ][a]
作者:[antirez][a]
译者:[qhwdw](https://github.com/qhwdw)
校对:[校对者ID](https://github.com/校对者ID)
校对:[wxy](https://github.com/wxy)
本文由 [LCTT](https://github.com/LCTT/TranslateProject) 原创编译,[Linux中国](https://linux.cn/) 荣誉推出