# 第十一章:流處理 ![](../img/ch11.png) > 有效的複雜系統總是從簡單的系統演化而來。 反之亦然:從零設計的複雜系統沒一個能有效工作的。 > > —— 約翰・加爾,Systemantics(1975) --------------- [TOC] 在 [第十章](ch10.md) 中,我們討論了批處理技術,它讀取一組檔案作為輸入,並生成一組新的檔案作為輸出。輸出是 **衍生資料(derived data)** 的一種形式;也就是說,如果需要,可以透過再次執行批處理過程來重新建立資料集。我們看到了如何使用這個簡單而強大的想法來建立搜尋索引、推薦系統、做分析等等。 然而,在 [第十章](ch10.md) 中仍然有一個很大的假設:即輸入是有界的,即已知和有限的大小,所以批處理知道它何時完成輸入的讀取。例如,MapReduce 核心的排序操作必須讀取其全部輸入,然後才能開始生成輸出:可能發生這種情況:最後一條輸入記錄具有最小的鍵,因此需要第一個被輸出,所以提早開始輸出是不可行的。 實際上,很多資料是 **無界限** 的,因為它隨著時間的推移而逐漸到達:你的使用者在昨天和今天產生了資料,明天他們將繼續產生更多的資料。除非你停業,否則這個過程永遠都不會結束,所以資料集從來就不會以任何有意義的方式 “完成”【1】。因此,批處理程式必須將資料人為地分成固定時間段的資料塊,例如,在每天結束時處理一天的資料,或者在每小時結束時處理一小時的資料。 日常批處理中的問題是,輸入的變更只會在一天之後的輸出中反映出來,這對於許多急躁的使用者來說太慢了。為了減少延遲,我們可以更頻繁地執行處理 —— 比如說,在每秒鐘的末尾 —— 或者甚至更連續一些,完全拋開固定的時間切片,當事件發生時就立即進行處理,這就是 **流處理(stream processing)** 背後的想法。 一般來說,“流” 是指隨著時間的推移逐漸可用的資料。這個概念出現在很多地方:Unix 的 stdin 和 stdout、程式語言(惰性列表)【2】、檔案系統 API(如 Java 的 `FileInputStream`)、TCP 連線、透過網際網路傳送音訊和影片等等。 在本章中,我們將把 **事件流(event stream)** 視為一種資料管理機制:無界限,增量處理,與上一章中的批次資料相對應。我們將首先討論怎樣表示、儲存、透過網路傳輸流。在 “[資料庫與流](#資料庫與流)” 中,我們將研究流和資料庫之間的關係。最後在 “[流處理](#流處理)” 中,我們將研究連續處理這些流的方法和工具,以及它們用於應用構建的方式。 ## 傳遞事件流 在批處理領域,作業的輸入和輸出是檔案(也許在分散式檔案系統上)。流處理領域中的等價物看上去是什麼樣子的? 當輸入是一個檔案(一個位元組序列),第一個處理步驟通常是將其解析為一系列記錄。在流處理的上下文中,記錄通常被叫做 **事件(event)** ,但它本質上是一樣的:一個小的、自包含的、不可變的物件,包含某個時間點發生的某件事情的細節。一個事件通常包含一個來自日曆時鐘的時間戳,以指明事件發生的時間(請參閱 “[單調鍾與日曆時鐘](ch8.md#單調鍾與日曆時鐘)”)。 例如,發生的事件可能是使用者採取的行動,例如檢視頁面或進行購買。它也可能來源於機器,例如對溫度感測器或 CPU 利用率的週期性測量。在 “[使用 Unix 工具的批處理](ch10.md#使用Unix工具的批處理)” 的示例中,Web 伺服器日誌的每一行都是一個事件。 事件可能被編碼為文字字串或 JSON,或者某種二進位制編碼,如 [第四章](ch4.md) 所述。這種編碼允許你儲存一個事件,例如將其追加到一個檔案,將其插入關係表,或將其寫入文件資料庫。它還允許你透過網路將事件傳送到另一個節點以進行處理。 在批處理中,檔案被寫入一次,然後可能被多個作業讀取。類似地,在流處理術語中,一個事件由 **生產者(producer)** (也稱為 **釋出者(publisher)** 或 **傳送者(sender)** )生成一次,然後可能由多個 **消費者(consumer)** ( **訂閱者(subscribers)** 或 **接收者(recipients)** )進行處理【3】。在檔案系統中,檔名標識一組相關記錄;在流式系統中,相關的事件通常被聚合為一個 **主題(topic)** 或 **流(stream)** 。 原則上講,檔案或資料庫就足以連線生產者和消費者:生產者將其生成的每個事件寫入資料儲存,且每個消費者定期輪詢資料儲存,檢查自上次執行以來新出現的事件。這實際上正是批處理在每天結束時處理當天資料時所做的事情。 但當我們想要進行低延遲的連續處理時,如果資料儲存不是為這種用途專門設計的,那麼輪詢開銷就會很大。輪詢的越頻繁,能返回新事件的請求比例就越低,而額外開銷也就越高。相比之下,最好能在新事件出現時直接通知消費者。 資料庫在傳統上對這種通知機制支援的並不好,關係型資料庫通常有 **觸發器(trigger)** ,它們可以對變化(如,插入表中的一行)作出反應,但是它們的功能非常有限,並且在資料庫設計中有些後顧之憂【4,5】。相應的是,已經開發了專門的工具來提供事件通知。 ### 訊息傳遞系統 向消費者通知新事件的常用方式是使用 **訊息傳遞系統(messaging system)**:生產者傳送包含事件的訊息,然後將訊息推送給消費者。我們之前在 “[訊息傳遞中的資料流](ch4.md#訊息傳遞中的資料流)” 中談到了這些系統,但現在我們將詳細介紹這些系統。 像生產者和消費者之間的 Unix 管道或 TCP 連線這樣的直接通道,是實現訊息傳遞系統的簡單方法。但是,大多數訊息傳遞系統都在這一基本模型上進行了擴充套件。特別的是,Unix 管道和 TCP 將恰好一個傳送者與恰好一個接收者連線,而一個訊息傳遞系統允許多個生產者節點將訊息傳送到同一個主題,並允許多個消費者節點接收主題中的訊息。 在這個 **釋出 / 訂閱** 模式中,不同的系統採取各種各樣的方法,並沒有針對所有目的的通用答案。為了區分這些系統,問一下這兩個問題會特別有幫助: 1. **如果生產者傳送訊息的速度比消費者能夠處理的速度快會發生什麼?** 一般來說,有三種選擇:系統可以丟掉訊息,將訊息放入緩衝佇列,或使用 **背壓**(backpressure,也稱為 **流量控制**,即 flow control:阻塞生產者,以免其傳送更多的訊息)。例如 Unix 管道和 TCP 就使用了背壓:它們有一個固定大小的小緩衝區,如果填滿,傳送者會被阻塞,直到接收者從緩衝區中取出資料(請參閱 “[網路擁塞和排隊](ch8.md#網路擁塞和排隊)”)。 如果訊息被快取在佇列中,那麼理解佇列增長會發生什麼是很重要的。當佇列裝不進記憶體時系統會崩潰嗎?還是將訊息寫入磁碟?如果是這樣,磁碟訪問又會如何影響訊息傳遞系統的效能【6】? 2. **如果節點崩潰或暫時離線,會發生什麼情況? —— 是否會有訊息丟失?** 與資料庫一樣,永續性可能需要寫入磁碟和 / 或複製的某種組合(請參閱 “[複製與永續性](ch7.md#複製與永續性)”),這是有代價的。如果你能接受有時訊息會丟失,則可能在同一硬體上獲得更高的吞吐量和更低的延遲。 是否可以接受訊息丟失取決於應用。例如,對於週期傳輸的感測器讀數和指標,偶爾丟失的資料點可能並不重要,因為更新的值會在短時間內發出。但要注意,如果大量的訊息被丟棄,可能無法立刻意識到指標已經不正確了【7】。如果你正在對事件計數,那麼它們能夠可靠送達是更重要的,因為每個丟失的訊息都意味著使計數器的錯誤擴大。 我們在 [第十章](ch10.md) 中探討的批處理系統的一個很好的特性是,它們提供了強大的可靠性保證:失敗的任務會自動重試,失敗任務的部分輸出會自動丟棄。這意味著輸出與沒有發生故障一樣,這有助於簡化程式設計模型。在本章的後面,我們將研究如何在流處理的上下文中提供類似的保證。 #### 直接從生產者傳遞給消費者 許多訊息傳遞系統使用生產者和消費者之間的直接網路通訊,而不透過中間節點: * UDP 組播廣泛應用於金融行業,例如股票市場,其中低時延非常重要【8】。雖然 UDP 本身是不可靠的,但應用層的協議可以恢復丟失的資料包(生產者必須記住它傳送的資料包,以便能按需重新發送資料包)。 * 無代理的訊息庫,如 ZeroMQ 【9】和 nanomsg 採取類似的方法,透過 TCP 或 IP 多播實現釋出 / 訂閱訊息傳遞。 * StatsD 【10】和 Brubeck 【7】使用不可靠的 UDP 訊息傳遞來收集網路中所有機器的指標並對其進行監控。 (在 StatsD 協議中,只有接收到所有訊息,才認為計數器指標是正確的;使用 UDP 將使得指標處在一種最佳近似狀態【11】。另請參閱 “[TCP 與 UDP](ch8.md#TCP與UDP)” * 如果消費者在網路上公開了服務,生產者可以直接傳送 HTTP 或 RPC 請求(請參閱 “[服務中的資料流:REST 與 RPC](ch4.md#服務中的資料流:REST與RPC)”)將訊息推送給使用者。這就是 webhooks 背後的想法【12】,一種服務的回撥 URL 被註冊到另一個服務中,並且每當事件發生時都會向該 URL 發出請求。 儘管這些直接訊息傳遞系統在設計它們的環境中執行良好,但是它們通常要求應用程式碼意識到訊息丟失的可能性。它們的容錯程度極為有限:即使協議檢測到並重傳在網路中丟失的資料包,它們通常也只是假設生產者和消費者始終線上。 如果消費者處於離線狀態,則可能會丟失其不可達時傳送的訊息。一些協議允許生產者重試失敗的訊息傳遞,但當生產者崩潰時,它可能會丟失訊息緩衝區及其本應傳送的訊息,這種方法可能就沒用了。 #### 訊息代理 一種廣泛使用的替代方法是透過 **訊息代理**(message broker,也稱為 **訊息佇列**,即 message queue)傳送訊息,訊息代理實質上是一種針對處理訊息流而最佳化的資料庫。它作為伺服器執行,生產者和消費者作為客戶端連線到伺服器。生產者將訊息寫入代理,消費者透過從代理那裡讀取來接收訊息。 透過將資料集中在代理上,這些系統可以更容易地容忍來來去去的客戶端(連線,斷開連線和崩潰),而永續性問題則轉移到代理的身上。一些訊息代理只將訊息儲存在記憶體中,而另一些訊息代理(取決於配置)將其寫入磁碟,以便在代理崩潰的情況下不會丟失。針對緩慢的消費者,它們通常會允許無上限的排隊(而不是丟棄訊息或背壓),儘管這種選擇也可能取決於配置。 排隊的結果是,消費者通常是 **非同步(asynchronous)** 的:當生產者傳送訊息時,通常只會等待代理確認訊息已經被快取,而不等待訊息被消費者處理。向消費者遞送訊息將發生在未來某個未定的時間點 —— 通常在幾分之一秒之內,但有時當訊息堆積時會顯著延遲。 #### 訊息代理與資料庫的對比 有些訊息代理甚至可以使用 XA 或 JTA 參與兩階段提交協議(請參閱 “[實踐中的分散式事務](ch9.md#實踐中的分散式事務)”)。這個功能與資料庫在本質上非常相似,儘管訊息代理和資料庫之間仍存在實踐上很重要的差異: * 資料庫通常保留資料直至顯式刪除,而大多數訊息代理在訊息成功遞送給消費者時會自動刪除訊息。這樣的訊息代理不適合長期的資料儲存。 * 由於它們很快就能刪除訊息,大多數訊息代理都認為它們的工作集相當小 —— 即佇列很短。如果代理需要緩衝很多訊息,比如因為消費者速度較慢(如果記憶體裝不下訊息,可能會溢位到磁碟),每個訊息需要更長的處理時間,整體吞吐量可能會惡化【6】。 * 資料庫通常支援次級索引和各種搜尋資料的方式,而訊息代理通常支援按照某種模式匹配主題,訂閱其子集。雖然機制並不一樣,但對於客戶端選擇想要了解的資料的一部分,都是基本的方式。 * 查詢資料庫時,結果通常基於某個時間點的資料快照;如果另一個客戶端隨後向資料庫寫入一些改變了查詢結果的內容,則第一個客戶端不會發現其先前結果現已過期(除非它重複查詢或輪詢變更)。相比之下,訊息代理不支援任意查詢,但是當資料發生變化時(即新訊息可用時),它們會通知客戶端。 這是關於訊息代理的傳統觀點,它被封裝在諸如 JMS 【14】和 AMQP 【15】的標準中,並且被諸如 RabbitMQ、ActiveMQ、HornetQ、Qpid、TIBCO 企業訊息服務、IBM MQ、Azure Service Bus 和 Google Cloud Pub/Sub 所實現 【16】。 #### 多個消費者 當多個消費者從同一主題中讀取訊息時,有兩種主要的訊息傳遞模式,如 [圖 11-1](../img/fig11-1.png) 所示: * 負載均衡(load balancing) 每條訊息都被傳遞給消費者 **之一**,所以處理該主題下訊息的工作能被多個消費者共享。代理可以為消費者任意分配訊息。當處理訊息的代價高昂,希望能並行處理訊息時,此模式非常有用(在 AMQP 中,可以透過讓多個客戶端從同一個佇列中消費來實現負載均衡,而在 JMS 中則稱之為 **共享訂閱**,即 shared subscription)。 * 扇出(fan-out) 每條訊息都被傳遞給 **所有** 消費者。扇出允許幾個獨立的消費者各自 “收聽” 相同的訊息廣播,而不會相互影響 —— 這個流處理中的概念對應批處理中多個不同批處理作業讀取同一份輸入檔案 (JMS 中的主題訂閱與 AMQP 中的交叉繫結提供了這一功能)。 ![](../img/fig11-1.png) **圖 11-1 (a)負載平衡:在消費者間共享消費主題;(b)扇出:將每條訊息傳遞給多個消費者。** 兩種模式可以組合使用:例如,兩個獨立的消費者組可以每組各訂閱同一個主題,每一組都共同收到所有訊息,但在每一組內部,每條訊息僅由單個節點處理。 #### 確認與重新傳遞 消費者隨時可能會崩潰,所以有一種可能的情況是:代理向消費者遞送訊息,但消費者沒有處理,或者在消費者崩潰之前只進行了部分處理。為了確保訊息不會丟失,訊息代理使用 **確認(acknowledgments)**:客戶端必須顯式告知代理訊息處理完畢的時間,以便代理能將訊息從佇列中移除。 如果與客戶端的連線關閉,或者代理超出一段時間未收到確認,代理則認為訊息沒有被處理,因此它將訊息再遞送給另一個消費者。 (請注意可能發生這樣的情況,訊息 **實際上是** 處理完畢的,但 **確認** 在網路中丟失了。需要一種原子提交協議才能處理這種情況,正如在 “[實踐中的分散式事務](ch9.md#實踐中的分散式事務)” 中所討論的那樣) 當與負載均衡相結合時,這種重傳行為對訊息的順序有種有趣的影響。在 [圖 11-2](../img/fig11-2.png) 中,消費者通常按照生產者傳送的順序處理訊息。然而消費者 2 在處理訊息 m3 時崩潰,與此同時消費者 1 正在處理訊息 m4。未確認的訊息 m3 隨後被重新發送給消費者 1,結果消費者 1 按照 m4,m3,m5 的順序處理訊息。因此 m3 和 m4 的交付順序與生產者 1 的傳送順序不同。 ![](../img/fig11-2.png) **圖 11-2 在處理 m3 時消費者 2 崩潰,因此稍後重傳至消費者 1** 即使訊息代理試圖保留訊息的順序(如 JMS 和 AMQP 標準所要求的),負載均衡與重傳的組合也不可避免地導致訊息被重新排序。為避免此問題,你可以讓每個消費者使用單獨的佇列(即不使用負載均衡功能)。如果訊息是完全獨立的,則訊息順序重排並不是一個問題。但正如我們將在本章後續部分所述,如果訊息之間存在因果依賴關係,這就是一個很重要的問題。 ### 分割槽日誌 透過網路傳送資料包或向網路服務傳送請求通常是短暫的操作,不會留下永久的痕跡。儘管可以永久記錄(透過抓包與日誌),但我們通常不這麼做。即使是將訊息持久地寫入磁碟的訊息代理,在送達給消費者之後也會很快刪除訊息,因為它們建立在短暫訊息傳遞的思維方式上。 資料庫和檔案系統採用截然相反的方法論:至少在某人顯式刪除前,通常寫入資料庫或檔案的所有內容都要被永久記錄下來。 這種思維方式上的差異對建立衍生資料的方式有巨大影響。如 [第十章](ch10.md) 所述,批處理過程的一個關鍵特性是,你可以反覆執行它們,試驗處理步驟,不用擔心損壞輸入(因為輸入是隻讀的)。而 AMQP/JMS 風格的訊息傳遞並非如此:收到訊息是具有破壞性的,因為確認可能導致訊息從代理中被刪除,因此你不能期望再次運行同一個消費者能得到相同的結果。 如果你將新的消費者新增到訊息傳遞系統,通常只能接收到消費者註冊之後開始傳送的訊息。先前的任何訊息都隨風而逝,一去不復返。作為對比,你可以隨時為檔案和資料庫新增新的客戶端,且能讀取任意久遠的資料(只要應用沒有顯式覆蓋或刪除這些資料)。 為什麼我們不能把它倆雜交一下,既有資料庫的持久儲存方式,又有訊息傳遞的低延遲通知?這就是 **基於日誌的訊息代理(log-based message brokers)** 背後的想法。 #### 使用日誌進行訊息儲存 日誌只是磁碟上簡單的僅追加記錄序列。我們先前在 [第三章](ch3.md) 中日誌結構儲存引擎和預寫式日誌的上下文中討論了日誌,在 [第五章](ch5.md) 複製的上下文裡也討論了它。 同樣的結構可以用於實現訊息代理:生產者透過將訊息追加到日誌末尾來發送訊息,而消費者透過依次讀取日誌來接收訊息。如果消費者讀到日誌末尾,則會等待新訊息追加的通知。 Unix 工具 `tail -f` 能監視檔案被追加寫入的資料,基本上就是這樣工作的。 為了伸縮超出單個磁碟所能提供的更高吞吐量,可以對日誌進行 **分割槽**(按 [第六章](ch6.md) 的定義)。不同的分割槽可以託管在不同的機器上,使得每個分割槽都有一份能獨立於其他分割槽進行讀寫的日誌。一個主題可以定義為一組攜帶相同型別訊息的分割槽。這種方法如 [圖 11-3](../img/fig11-3.png) 所示。 在每個分割槽內,代理為每個訊息分配一個單調遞增的序列號或 **偏移量**(offset,在 [圖 11-3](../img/fig11-3.png) 中,框中的數字是訊息偏移量)。這種序列號是有意義的,因為分割槽是僅追加寫入的,所以分割槽內的訊息是完全有序的。沒有跨不同分割槽的順序保證。 ![](../img/fig11-3.png) **圖 11-3 生產者透過將訊息追加寫入主題分割槽檔案來發送訊息,消費者依次讀取這些檔案** Apache Kafka 【17,18】、Amazon Kinesis Streams 【19】和 Twitter 的 DistributedLog 【20,21】都是基於日誌的訊息代理。 Google Cloud Pub/Sub 在架構上類似,但對外暴露的是 JMS 風格的 API,而不是日誌抽象【16】。儘管這些訊息代理將所有訊息寫入磁碟,但透過跨多臺機器分割槽,每秒能夠實現數百萬條訊息的吞吐量,並透過複製訊息來實現容錯性【22,23】。 #### 日誌與傳統的訊息傳遞相比 基於日誌的方法天然支援扇出式訊息傳遞,因為多個消費者可以獨立讀取日誌,而不會相互影響 —— 讀取訊息不會將其從日誌中刪除。為了在一組消費者之間實現負載平衡,代理可以將整個分割槽分配給消費者組中的節點,而不是將單條訊息分配給消費者客戶端。 然後每個客戶端將消費被指派分割槽中的 **所有** 訊息。通常情況下,當一個使用者被指派了一個日誌分割槽時,它會以簡單的單執行緒方式順序地讀取分割槽中的訊息。這種粗粒度的負載均衡方法有一些缺點: * 共享消費主題工作的節點數,最多為該主題中的日誌分割槽數,因為同一個分割槽內的所有訊息被遞送到同一個節點 [^i]。 * 如果某條訊息處理緩慢,則它會阻塞該分割槽中後續訊息的處理(一種行首阻塞的形式;請參閱 “[描述效能](ch1.md#描述效能)”)。 因此在訊息處理代價高昂,希望逐條並行處理,以及訊息的順序並沒有那麼重要的情況下,JMS/AMQP 風格的訊息代理是可取的。另一方面,在訊息吞吐量很高,處理迅速,順序很重要的情況下,基於日誌的方法表現得非常好。 [^i]: 要設計一種負載均衡方案也是有可能的,在這種方案中,兩個消費者透過讀取全部訊息來共享分割槽處理的工作,但是其中一個只考慮具有偶數偏移量的訊息,而另一個消費者只處理奇數編號的偏移量。或者你可以將訊息攤到一個執行緒池中來處理,但這種方法會使消費者偏移量管理變得複雜。一般來說,單執行緒處理單分割槽是合適的,可以透過增加更多分割槽來提高並行度。 #### 消費者偏移量 順序消費一個分割槽使得判斷訊息是否已經被處理變得相當容易:所有偏移量小於消費者的當前偏移量的訊息已經被處理,而具有更大偏移量的訊息還沒有被看到。因此,代理不需要跟蹤確認每條訊息,只需要定期記錄消費者的偏移即可。這種方法減少了額外簿記開銷,而且在批處理和流處理中採用這種方法有助於提高基於日誌的系統的吞吐量。 實際上,這種偏移量與單領導者資料庫複製中常見的日誌序列號非常相似,我們在 “[設定新從庫](ch5.md#設定新從庫)” 中討論了這種情況。在資料庫複製中,日誌序列號允許跟隨者斷開連線後,重新連線到領導者,並在不跳過任何寫入的情況下恢復複製。這裡原理完全相同:訊息代理表現得像一個主庫,而消費者就像一個從庫。 如果消費者節點失效,則失效消費者的分割槽將指派給其他節點,並從最後記錄的偏移量開始消費訊息。如果消費者已經處理了後續的訊息,但還沒有記錄它們的偏移量,那麼重啟後這些訊息將被處理兩次。我們將在本章後面討論這個問題的處理方法。 #### 磁碟空間使用 如果只追加寫入日誌,則磁碟空間終究會耗盡。為了回收磁碟空間,日誌實際上被分割成段,並不時地將舊段刪除或移動到歸檔儲存。 (我們將在後面討論一種更為複雜的磁碟空間釋放方式) 這就意味著如果一個慢消費者跟不上訊息產生的速率而落後得太多,它的消費偏移量指向了刪除的段,那麼它就會錯過一些訊息。實際上,日誌實現了一個有限大小的緩衝區,當緩衝區填滿時會丟棄舊訊息,它也被稱為 **迴圈緩衝區(circular buffer)** 或 **環形緩衝區(ring buffer)**。不過由於緩衝區在磁碟上,因此緩衝區可能相當的大。 讓我們做個簡單計算。在撰寫本文時,典型的大型硬碟容量為 6TB,順序寫入吞吐量為 150MB/s。如果以最快的速度寫訊息,則需要大約 11 個小時才能填滿磁碟。因而磁碟可以緩衝 11 個小時的訊息,之後它將開始覆蓋舊的訊息。即使使用多個磁碟和機器,這個比率也是一樣的。實踐中的部署很少能用滿磁碟的寫入頻寬,所以通常可以儲存一個幾天甚至幾周的日誌緩衝區。 不管保留多長時間的訊息,日誌的吞吐量或多或少保持不變,因為無論如何,每個訊息都會被寫入磁碟【18】。這種行為與預設將訊息儲存在記憶體中,僅當佇列太長時才寫入磁碟的訊息傳遞系統形成鮮明對比。當佇列很短時,這些系統非常快;而當這些系統開始寫入磁碟時,就要慢的多,所以吞吐量取決於保留的歷史數量。 #### 當消費者跟不上生產者時 在 “[訊息傳遞系統](#訊息傳遞系統)” 中,如果消費者無法跟上生產者傳送資訊的速度時,我們討論了三種選擇:丟棄資訊,進行緩衝或施加背壓。在這種分類法裡,基於日誌的方法是緩衝的一種形式,具有很大但大小固定的緩衝區(受可用磁碟空間的限制)。 如果消費者遠遠落後,而所要求的資訊比保留在磁碟上的資訊還要舊,那麼它將不能讀取這些資訊,所以代理實際上丟棄了比緩衝區容量更大的舊資訊。你可以監控消費者落後日誌頭部的距離,如果落後太多就發出報警。由於緩衝區很大,因而有足夠的時間讓運維人員來修復慢消費者,並在訊息開始丟失之前讓其趕上。 即使消費者真的落後太多開始丟失訊息,也只有那個消費者受到影響;它不會中斷其他消費者的服務。這是一個巨大的運維優勢:你可以實驗性地消費生產日誌,以進行開發,測試或除錯,而不必擔心會中斷生產服務。當消費者關閉或崩潰時,會停止消耗資源,唯一剩下的只有消費者偏移量。 這種行為也與傳統的訊息代理形成了鮮明對比,在那種情況下,你需要小心地刪除那些消費者已經關閉的佇列 —— 否則那些佇列就會累積不必要的訊息,從其他仍活躍的消費者那裡佔走記憶體。 #### 重播舊訊息 我們之前提到,使用 AMQP 和 JMS 風格的訊息代理,處理和確認訊息是一個破壞性的操作,因為它會導致訊息在代理上被刪除。另一方面,在基於日誌的訊息代理中,使用訊息更像是從檔案中讀取資料:這是隻讀操作,不會更改日誌。 除了消費者的任何輸出之外,處理的唯一副作用是消費者偏移量的前進。但偏移量是在消費者的控制之下的,所以如果需要的話可以很容易地操縱:例如你可以用昨天的偏移量跑一個消費者副本,並將輸出寫到不同的位置,以便重新處理最近一天的訊息。你可以使用各種不同的處理程式碼重複任意次。 這一方面使得基於日誌的訊息傳遞更像上一章的批處理,其中衍生資料透過可重複的轉換過程與輸入資料顯式分離。它允許進行更多的實驗,更容易從錯誤和漏洞中恢復,使其成為在組織內整合資料流的良好工具【24】。 ## 資料庫與流 我們已經在訊息代理和資料庫之間進行了一些比較。儘管傳統上它們被視為單獨的工具類別,但是我們看到基於日誌的訊息代理已經成功地從資料庫中獲取靈感並將其應用於訊息傳遞。我們也可以反過來:從訊息傳遞和流中獲取靈感,並將它們應用於資料庫。 我們之前曾經說過,事件是某個時刻發生的事情的記錄。發生的事情可能是使用者操作(例如鍵入搜尋查詢)或讀取感測器,但也可能是 **寫入資料庫**。某些東西被寫入資料庫的事實是可以被捕獲、儲存和處理的事件。這一觀察結果表明,資料庫和資料流之間的聯絡不僅僅是磁碟日誌的物理儲存 —— 而是更深層的聯絡。 事實上,複製日誌(請參閱 “[複製日誌的實現](ch5.md#複製日誌的實現)”)是一個由資料庫寫入事件組成的流,由主庫在處理事務時生成。從庫將寫入流應用到它們自己的資料庫副本,從而最終得到相同資料的精確副本。複製日誌中的事件描述發生的資料更改。 我們還在 “[全序廣播](ch9.md#全序廣播)” 中遇到了狀態機複製原理,其中指出:如果每個事件代表對資料庫的寫入,並且每個副本按相同的順序處理相同的事件,則副本將達到相同的最終狀態 (假設事件處理是一個確定性的操作)。這是事件流的又一種場景! 在本節中,我們將首先看看異構資料系統中出現的一個問題,然後探討如何透過將事件流的想法帶入資料庫來解決這個問題。 ### 保持系統同步 正如我們在本書中所看到的,沒有一個系統能夠滿足所有的資料儲存、查詢和處理需求。在實踐中,大多數重要應用都需要組合使用幾種不同的技術來滿足所有的需求:例如,使用 OLTP 資料庫來為使用者請求提供服務,使用快取來加速常見請求,使用全文索引來處理搜尋查詢,使用資料倉庫用於分析。每一種技術都有自己的資料副本,並根據自己的目的進行儲存方式的最佳化。 由於相同或相關的資料出現在了不同的地方,因此相互間需要保持同步:如果某個專案在資料庫中被更新,它也應當在快取、搜尋索引和資料倉庫中被更新。對於資料倉庫,這種同步通常由 ETL 程序執行(請參閱 “[資料倉庫](ch3.md#資料倉庫)”),通常是先取得資料庫的完整副本,然後執行轉換,並批次載入到資料倉庫中 —— 換句話說,批處理。我們在 “[批處理工作流的輸出](ch10.md#批處理工作流的輸出)” 中同樣看到了如何使用批處理建立搜尋索引、推薦系統和其他衍生資料系統。 如果週期性的完整資料庫轉儲過於緩慢,有時會使用的替代方法是 **雙寫(dual write)**,其中應用程式碼在資料變更時明確寫入每個系統:例如,首先寫入資料庫,然後更新搜尋索引,然後使快取項失效(甚至同時執行這些寫入)。 但是,雙寫有一些嚴重的問題,其中一個是競爭條件,如 [圖 11-4](../img/fig11-4.png) 所示。在這個例子中,兩個客戶端同時想要更新一個專案 X:客戶端 1 想要將值設定為 A,客戶端 2 想要將其設定為 B。兩個客戶端首先將新值寫入資料庫,然後將其寫入到搜尋索引。因為運氣不好,這些請求的時序是交錯的:資料庫首先看到來自客戶端 1 的寫入將值設定為 A,然後來自客戶端 2 的寫入將值設定為 B,因此資料庫中的最終值為 B。搜尋索引首先看到來自客戶端 2 的寫入,然後是客戶端 1 的寫入,所以搜尋索引中的最終值是 A。即使沒發生錯誤,這兩個系統現在也永久地不一致了。 ![](../img/fig11-4.png) **圖 11-4 在資料庫中 X 首先被設定為 A,然後被設定為 B,而在搜尋索引處,寫入以相反的順序到達** 除非有一些額外的併發檢測機制,例如我們在 “[檢測併發寫入](ch5.md#檢測併發寫入)” 中討論的版本向量,否則你甚至不會意識到發生了併發寫入 —— 一個值將簡單地以無提示方式覆蓋另一個值。 雙重寫入的另一個問題是,其中一個寫入可能會失敗,而另一個成功。這是一個容錯問題,而不是一個併發問題,但也會造成兩個系統互相不一致的結果。確保它們要麼都成功要麼都失敗,是原子提交問題的一個例子,解決這個問題的代價是昂貴的(請參閱 “[原子提交與兩階段提交](ch9.md#原子提交與兩階段提交)”)。 如果你只有一個單領導者複製的資料庫,那麼這個領導者決定了寫入順序,而狀態機複製方法可以在資料庫副本上工作。然而,在 [圖 11-4](../img/fig11-4.png) 中,沒有單個主庫:資料庫可能有一個領導者,搜尋索引也可能有一個領導者,但是兩者都不追隨對方,所以可能會發生衝突(請參閱 “[多主複製](ch5.md#多主複製)“)。 如果實際上只有一個領導者 —— 例如,資料庫 —— 而且我們能讓搜尋索引成為資料庫的追隨者,情況要好得多。但這在實踐中可能嗎? ### 變更資料捕獲 大多數資料庫的複製日誌的問題在於,它們一直被當做資料庫的內部實現細節,而不是公開的 API。客戶端應該透過其資料模型和查詢語言來查詢資料庫,而不是解析複製日誌並嘗試從中提取資料。 數十年來,許多資料庫根本沒有記錄在檔的獲取變更日誌的方式。由於這個原因,捕獲資料庫中所有的變更,然後將其複製到其他儲存技術(搜尋索引、快取或資料倉庫)中是相當困難的。 最近,人們對 **變更資料捕獲(change data capture, CDC)** 越來越感興趣,這是一種觀察寫入資料庫的所有資料變更,並將其提取並轉換為可以複製到其他系統中的形式的過程。 CDC 是非常有意思的,尤其是當變更能在被寫入後立刻用於流時。 例如,你可以捕獲資料庫中的變更,並不斷將相同的變更應用至搜尋索引。如果變更日誌以相同的順序應用,則可以預期搜尋索引中的資料與資料庫中的資料是匹配的。搜尋索引和任何其他衍生資料系統只是變更流的消費者,如 [圖 11-5](../img/fig11-5.png) 所示。 ![](../img/fig11-5.png) **圖 11-5 將資料按順序寫入一個數據庫,然後按照相同的順序將這些更改應用到其他系統** #### 變更資料捕獲的實現 我們可以將日誌消費者叫做 **衍生資料系統**,正如在 [第三部分](part-iii.md) 的介紹中所討論的:儲存在搜尋索引和資料倉庫中的資料,只是 **記錄系統** 資料的額外檢視。變更資料捕獲是一種機制,可確保對記錄系統所做的所有更改都反映在衍生資料系統中,以便衍生系統具有資料的準確副本。 從本質上說,變更資料捕獲使得一個數據庫成為領導者(被捕獲變化的資料庫),並將其他元件變為追隨者。基於日誌的訊息代理非常適合從源資料庫傳輸變更事件,因為它保留了訊息的順序(避免了 [圖 11-2](../img/fig11-2.png) 的重新排序問題)。 資料庫觸發器可用來實現變更資料捕獲(請參閱 “[基於觸發器的複製](ch5.md#基於觸發器的複製)”),透過註冊觀察所有變更的觸發器,並將相應的變更項寫入變更日誌表中。但是它們往往是脆弱的,而且有顯著的效能開銷。解析複製日誌可能是一種更穩健的方法,但它也很有挑戰,例如如何應對模式變更。 LinkedIn 的 Databus【25】,Facebook 的 Wormhole【26】和 Yahoo! 的 Sherpa【27】大規模地應用這個思路。 Bottled Water 使用解碼 WAL 的 API 實現了 PostgreSQL 的 CDC【28】,Maxwell 和 Debezium 透過解析 binlog 對 MySQL 做了類似的事情【29,30,31】,Mongoriver 讀取 MongoDB oplog【32,33】,而 GoldenGate 為 Oracle 提供類似的功能【34,35】。 像訊息代理一樣,變更資料捕獲通常是非同步的:記錄資料庫系統不會等待消費者應用變更再進行提交。這種設計具有的運維優勢是,新增緩慢的消費者不會過度影響記錄系統。不過,所有複製延遲可能有的問題在這裡都可能出現(請參閱 “[複製延遲問題](ch5.md#複製延遲問題)”)。 #### 初始快照 如果你擁有 **所有** 對資料庫進行變更的日誌,則可以透過重播該日誌,來重建資料庫的完整狀態。但是在許多情況下,永遠保留所有更改會耗費太多磁碟空間,且重播過於費時,因此日誌需要被截斷。 例如,構建新的全文索引需要整個資料庫的完整副本 —— 僅僅應用最近變更的日誌是不夠的,因為這樣會丟失最近未曾更新的專案。因此,如果你沒有完整的歷史日誌,則需要從一個一致的快照開始,如先前的 “[設定新從庫](ch5.md#設定新從庫)” 中所述。 資料庫的快照必須與變更日誌中的已知位置或偏移量相對應,以便在處理完快照後知道從哪裡開始應用變更。一些 CDC 工具集成了這種快照功能,而其他工具則把它留給你手動執行。 #### 日誌壓縮 如果你只能保留有限的歷史日誌,則每次要新增新的衍生資料系統時,都需要做一次快照。但 **日誌壓縮(log compaction)** 提供了一個很好的備選方案。 我們之前在 “[雜湊索引](ch3.md#雜湊索引)” 中關於日誌結構儲存引擎的上下文中討論了日誌壓縮(請參閱 [圖 3-2](../img/fig3-2.png) 的示例)。原理很簡單:儲存引擎定期在日誌中查詢具有相同鍵的記錄,丟掉所有重複的內容,並只保留每個鍵的最新更新。這個壓縮與合併過程在後臺執行。 在日誌結構儲存引擎中,具有特殊值 NULL(**墓碑**,即 tombstone)的更新表示該鍵被刪除,並會在日誌壓縮過程中被移除。但只要鍵不被覆蓋或刪除,它就會永遠留在日誌中。這種壓縮日誌所需的磁碟空間僅取決於資料庫的當前內容,而不取決於資料庫中曾經發生的寫入次數。如果相同的鍵經常被覆蓋寫入,則先前的值將最終將被垃圾回收,只有最新的值會保留下來。 在基於日誌的訊息代理與變更資料捕獲的上下文中也適用相同的想法。如果 CDC 系統被配置為,每個變更都包含一個主鍵,且每個鍵的更新都替換了該鍵以前的值,那麼只需要保留對鍵的最新寫入就足夠了。 現在,無論何時需要重建衍生資料系統(如搜尋索引),你可以從壓縮日誌主題的零偏移量處啟動新的消費者,然後依次掃描日誌中的所有訊息。日誌能保證包含資料庫中每個鍵的最新值(也可能是一些較舊的值)—— 換句話說,你可以使用它來獲取資料庫內容的完整副本,而無需從 CDC 源資料庫取一個快照。 Apache Kafka 支援這種日誌壓縮功能。正如我們將在本章後面看到的,它允許訊息代理被當成永續性儲存使用,而不僅僅是用於臨時訊息。 #### 變更流的API支援 越來越多的資料庫開始將變更流作為第一等的介面,而不像傳統上要去做加裝改造,或者費工夫逆向工程一個 CDC。例如,RethinkDB 允許查詢訂閱通知,當查詢結果變更時獲得通知【36】,Firebase 【37】和 CouchDB 【38】基於變更流進行同步,該變更流同樣可用於應用。而 Meteor 使用 MongoDB oplog 訂閱資料變更,並改變了使用者介面【39】。 VoltDB 允許事務以流的形式連續地從資料庫中匯出資料【40】。資料庫將關係資料模型中的輸出流表示為一個表,事務可以向其中插入元組,但不能查詢。已提交事務按照提交順序寫入這個特殊表,而流則由該表中的元組日誌構成。外部消費者可以非同步消費該日誌,並使用它來更新衍生資料系統。 Kafka Connect【41】致力於將廣泛的資料庫系統的變更資料捕獲工具與 Kafka 整合。一旦變更事件進入 Kafka 中,它就可以用於更新衍生資料系統,比如搜尋索引,也可以用於本章稍後討論的流處理系統。 ### 事件溯源 我們在這裡討論的想法和 **事件溯源(Event Sourcing)** 之間有一些相似之處,這是一個在 **領域驅動設計(domain-driven design, DDD)** 社群中折騰出來的技術。我們將簡要討論事件溯源,因為它包含了一些關於流處理系統的有用想法。 與變更資料捕獲類似,事件溯源涉及到 **將所有對應用狀態的變更** 儲存為變更事件日誌。最大的區別是事件溯源將這一想法應用到了一個不同的抽象層次上: * 在變更資料捕獲中,應用以 **可變方式(mutable way)** 使用資料庫,可以任意更新和刪除記錄。變更日誌是從資料庫的底層提取的(例如,透過解析複製日誌),從而確保從資料庫中提取的寫入順序與實際寫入的順序相匹配,從而避免 [圖 11-4](../img/fig11-4.png) 中的競態條件。寫入資料庫的應用不需要知道 CDC 的存在。 * 在事件溯源中,應用邏輯顯式構建在寫入事件日誌的不可變事件之上。在這種情況下,事件儲存是僅追加寫入的,更新與刪除是不鼓勵的或禁止的。事件被設計為旨在反映應用層面發生的事情,而不是底層的狀態變更。 事件溯源是一種強大的資料建模技術:從應用的角度來看,將使用者的行為記錄為不可變的事件更有意義,而不是在可變資料庫中記錄這些行為的影響。事件溯源使得應用隨時間演化更為容易,透過更容易理解事情發生的原因來幫助除錯的進行,並有利於防止應用 Bug(請參閱 “[不可變事件的優點](#不可變事件的優點)”)。 例如,儲存 “學生取消選課” 事件以中性的方式清楚地表達了單個行為的意圖,而其副作用 “從登記表中刪除了一個條目,而一條取消原因的記錄被新增到學生反饋表 “則嵌入了很多有關稍後對資料的使用方式的假設。如果引入一個新的應用功能,例如 “將位置留給等待列表中的下一個人” —— 事件溯源方法允許將新的副作用輕鬆地從現有事件中脫開。 事件溯源類似於 **編年史(chronicle)** 資料模型【45】,事件日誌與星型模式中的事實表之間也存在相似之處(請參閱 “[星型和雪花型:分析的模式](ch3.md#星型和雪花型:分析的模式)”) 。 諸如 Event Store【46】這樣的專業資料庫已經被開發出來,供使用事件溯源的應用使用,但總的來說,這種方法獨立於任何特定的工具。傳統的資料庫或基於日誌的訊息代理也可以用來構建這種風格的應用。 #### 從事件日誌中派生出當前狀態 事件日誌本身並不是很有用,因為使用者通常期望看到的是系統的當前狀態,而不是變更歷史。例如,在購物網站上,使用者期望能看到他們購物車裡的當前內容,而不是他們購物車所有變更的一個僅追加列表。 因此,使用事件溯源的應用需要拉取事件日誌(表示 **寫入** 系統的資料),並將其轉換為適合向用戶顯示的應用狀態(從系統 **讀取** 資料的方式【47】)。這種轉換可以使用任意邏輯,但它應當是確定性的,以便能再次執行,並從事件日誌中衍生出相同的應用狀態。 與變更資料捕獲一樣,重播事件日誌允許讓你重新構建系統的當前狀態。不過,日誌壓縮需要採用不同的方式處理: * 用於記錄更新的 CDC 事件通常包含記錄的 **完整新版本**,因此主鍵的當前值完全由該主鍵的最近事件確定,而日誌壓縮可以丟棄相同主鍵的先前事件。 * 另一方面,事件溯源在更高層次進行建模:事件通常表示使用者操作的意圖,而不是因為操作而發生的狀態更新機制。在這種情況下,後面的事件通常不會覆蓋先前的事件,所以你需要完整的歷史事件來重新構建最終狀態。這裡進行同樣的日誌壓縮是不可能的。 使用事件溯源的應用通常有一些機制,用於儲存從事件日誌中匯出的當前狀態快照,因此它們不需要重複處理完整的日誌。然而這只是一種效能最佳化,用來加速讀取,提高從崩潰中恢復的速度;真正的目的是系統能夠永久儲存所有原始事件,並在需要時重新處理完整的事件日誌。我們將在 “[不變性的侷限性](#不變性的侷限性)” 中討論這個假設。 #### 命令和事件 事件溯源的哲學是仔細區分 **事件(event)** 和 **命令(command)**【48】。當來自使用者的請求剛到達時,它一開始是一個命令:在這個時間點上它仍然可能失敗,比如,因為違反了一些完整性條件。應用必須首先驗證它是否可以執行該命令。如果驗證成功並且命令被接受,則它變為一個持久化且不可變的事件。 例如,如果使用者試圖註冊特定使用者名稱,或預定飛機或劇院的座位,則應用需要檢查使用者名稱或座位是否已被佔用。(先前在 “[容錯共識](ch9.md#容錯共識)” 中討論過這個例子)當檢查成功時,應用可以生成一個事件,指示特定的使用者名稱是由特定的使用者 ID 註冊的,或者座位已經預留給特定的顧客。 在事件生成的時刻,它就成為了 **事實(fact)**。即使客戶稍後決定更改或取消預訂,他們之前曾預定了某個特定座位的事實仍然成立,而更改或取消是之後新增的單獨的事件。 事件流的消費者不允許拒絕事件:當消費者看到事件時,它已經成為日誌中不可變的一部分,並且可能已經被其他消費者看到了。因此任何對命令的驗證,都需要在它成為事件之前同步完成。例如,透過使用一個可以原子性地自動驗證命令併發布事件的可序列事務。 或者,預訂座位的使用者請求可以拆分為兩個事件:第一個是暫時預約,第二個是驗證預約後的獨立的確認事件(如 “[使用全序廣播實現線性一致的儲存](ch9.md#使用全序廣播實現線性一致的儲存)” 中所述) 。這種分割方式允許驗證發生在一個非同步的過程中。 ### 狀態、流和不變性 我們在 [第十章](ch10.md) 中看到,批處理因其輸入檔案不變性而受益良多,你可以在現有輸入檔案上執行實驗性處理作業,而不用擔心損壞它們。這種不變性原則也是使得事件溯源與變更資料捕獲如此強大的原因。 我們通常將資料庫視為應用程式當前狀態的儲存 —— 這種表示針對讀取進行了最佳化,而且通常對於服務查詢而言是最為方便的表示。狀態的本質是,它會變化,所以資料庫才會支援資料的增刪改。這又該如何匹配不變性呢? 只要你的狀態發生了變化,那麼這個狀態就是這段時間中事件修改的結果。例如,當前可用的座位列表是你已處理的預訂所產生的結果,當前帳戶餘額是帳戶中的借與貸的結果,而 Web 伺服器的響應時間圖,是所有已發生 Web 請求的獨立響應時間的聚合結果。 無論狀態如何變化,總是有一系列事件導致了這些變化。即使事情已經執行與回滾,這些事件出現是始終成立的。關鍵的想法是:可變的狀態與不可變事件的僅追加日誌相互之間並不矛盾:它們是一體兩面,互為陰陽的。所有變化的日誌 —— **變化日誌(changelog)**,表示了隨時間演變的狀態。 如果你傾向於數學表示,那麼你可能會說,應用狀態是事件流對時間求積分得到的結果,而變更流是狀態對時間求微分的結果,如 [圖 11-6](../img/fig11-6.png) 所示【49,50,51】。這個比喻有一些侷限性(例如,狀態的二階導似乎沒有意義),但這是考慮資料的一個實用出發點。 $$ state(now) = \int_{t=0}^{now}{stream(t) \ dt} \\ stream(t) = \frac{d\ state(t)}{dt} $$ ![](../img/fig11-6.png) **圖 11-6 應用當前狀態與事件流之間的關係** 如果你持久儲存了變更日誌,那麼重現狀態就非常簡單。如果你認為事件日誌是你的記錄系統,而所有的衍生狀態都從它派生而來,那麼系統中的資料流動就容易理解的多。正如帕特・赫蘭(Pat Helland)所說的【52】: > 事務日誌記錄了資料庫的所有變更。高速追加是更改日誌的唯一方法。從這個角度來看,資料庫的內容其實是日誌中記錄最新值的快取。日誌才是真相,資料庫是日誌子集的快取,這一快取子集恰好來自日誌中每條記錄與索引值的最新值。 日誌壓縮(如 “[日誌壓縮](#日誌壓縮)” 中所述)是連線日誌與資料庫狀態之間的橋樑:它只保留每條記錄的最新版本,並丟棄被覆蓋的版本。 #### 不可變事件的優點 資料庫中的不變性是一個古老的概念。例如,會計在幾個世紀以來一直在財務記賬中應用不變性。一筆交易發生時,它被記錄在一個僅追加寫入的分類帳中,實質上是描述貨幣、商品或服務轉手的事件日誌。賬目,比如利潤、虧損、資產負債表,是從分類賬中的交易求和衍生而來【53】。 如果發生錯誤,會計師不會刪除或更改分類帳中的錯誤交易 —— 而是新增另一筆交易以補償錯誤,例如退還一筆不正確的費用。不正確的交易將永遠保留在分類帳中,對於審計而言可能非常重要。如果從不正確的分類賬衍生出的錯誤數字已經公佈,那麼下一個會計週期的數字就會包括一個更正。這個過程在會計事務中是很常見的【54】。 儘管這種可審計性只在金融系統中尤其重要,但對於不受這種嚴格監管的許多其他系統,也是很有幫助的。如 “[批處理輸出的哲學](ch10.md#批處理輸出的哲學)” 中所討論的,如果你意外地部署了將錯誤資料寫入資料庫的錯誤程式碼,當代碼會破壞性地覆寫資料時,恢復要困難得多。使用不可變事件的僅追加日誌,診斷問題與故障恢復就要容易的多。 不可變的事件也包含了比當前狀態更多的資訊。例如在購物網站上,顧客可以將物品新增到他們的購物車,然後再將其移除。雖然從履行訂單的角度,第二個事件取消了第一個事件,但對分析目的而言,知道客戶考慮過某個特定項而之後又反悔,可能是很有用的。也許他們會選擇在未來購買,或者他們已經找到了替代品。這個資訊被記錄在事件日誌中,但對於移出購物車就刪除記錄的資料庫而言,這個資訊在移出購物車時可能就丟失了【42】。 #### 從同一事件日誌中派生多個檢視 此外,透過從不變的事件日誌中分離出可變的狀態,你可以針對不同的讀取方式,從相同的事件日誌中衍生出幾種不同的表現形式。效果就像一個流的多個消費者一樣([圖 11-5](../img/fig11-5.png)):例如,分析型資料庫 Druid 使用這種方式直接從 Kafka 攝取資料【55】,Pistachio 是一個分散式的鍵值儲存,使用 Kafka 作為提交日誌【56】,Kafka Connect 能將來自 Kafka 的資料匯出到各種不同的資料庫與索引【41】。這對於許多其他儲存和索引系統(如搜尋伺服器)來說是很有意義的,當系統要從分散式日誌中獲取輸入時亦然(請參閱 “[保持系統同步](#保持系統同步)”)。 新增從事件日誌到資料庫的顯式轉換,能夠使應用更容易地隨時間演進:如果你想要引入一個新功能,以新的方式表示現有資料,則可以使用事件日誌來構建一個單獨的、針對新功能的讀取最佳化檢視,無需修改現有系統而與之共存。並行執行新舊系統通常比在現有系統中執行複雜的模式遷移更容易。一旦不再需要舊的系統,你可以簡單地關閉它並回收其資源【47,57】。 如果你不需要擔心如何查詢與訪問資料,那麼儲存資料通常是非常簡單的。模式設計、索引和儲存引擎的許多複雜性,都是希望支援某些特定查詢和訪問模式的結果(請參閱 [第三章](ch3.md))。出於這個原因,透過將資料寫入的形式與讀取形式相分離,並允許幾個不同的讀取檢視,你能獲得很大的靈活性。這個想法有時被稱為 **命令查詢責任分離(command query responsibility segregation, CQRS)**【42,58,59】。 資料庫和模式設計的傳統方法是基於這樣一種謬論,資料必須以與查詢相同的形式寫入。如果可以將資料從針對寫入最佳化的事件日誌轉換為針對讀取最佳化的應用狀態,那麼有關規範化和非規範化的爭論就變得無關緊要了(請參閱 “[多對一和多對多的關係](ch2.md#多對一和多對多的關係)”):在針對讀取最佳化的檢視中對資料進行非規範化是完全合理的,因為翻譯過程提供了使其與事件日誌保持一致的機制。 在 “[描述負載](ch1.md#描述負載)” 中,我們討論了推特主頁時間線,它是特定使用者關注的人群所發推特的快取(類似郵箱)。這是 **針對讀取最佳化的狀態** 的又一個例子:主頁時間線是高度非規範化的,因為你的推文與你所有粉絲的時間線都構成了重複。然而,扇出服務保持了這種重複狀態與新推特以及新關注關係的同步,從而保證了重複的可管理性。 #### 併發控制 事件溯源和變更資料捕獲的最大缺點是,事件日誌的消費者通常是非同步的,所以可能會出現這樣的情況:使用者會寫入日誌,然後從日誌衍生檢視中讀取,結果發現他的寫入還沒有反映在讀取檢視中。我們之前在 “[讀己之寫](ch5.md#讀己之寫)” 中討論了這個問題以及可能的解決方案。 一種解決方案是將事件追加到日誌時同步執行讀取檢視的更新。而將這些寫入操作合併為一個原子單元需要 **事務**,所以要麼將事件日誌和讀取檢視儲存在同一個儲存系統中,要麼就需要跨不同系統進行分散式事務。或者,你也可以使用在 “[使用全序廣播實現線性一致的儲存](ch9.md#使用全序廣播實現線性一致的儲存)” 中討論的方法。 另一方面,從事件日誌匯出當前狀態也簡化了併發控制的某些部分。許多對於多物件事務的需求(請參閱 “[單物件和多物件操作](ch7.md#單物件和多物件操作)”)源於單個使用者操作需要在多個不同的位置更改資料。透過事件溯源,你可以設計一個自包含的事件以表示一個使用者操作。然後使用者操作就只需要在一個地方進行單次寫入操作 —— 即將事件附加到日誌中 —— 這個還是很容易使原子化的。 如果事件日誌與應用狀態以相同的方式分割槽(例如,處理分割槽 3 中的客戶事件只需要更新分割槽 3 中的應用狀態),那麼直接使用單執行緒日誌消費者就不需要寫入併發控制了。它從設計上一次只處理一個事件(請參閱 “[真的序列執行](ch7.md#真的序列執行)”)。日誌透過在分割槽中定義事件的序列順序,消除了併發性的不確定性【24】。如果一個事件觸及多個狀態分割槽,那麼需要做更多的工作,我們將在 [第十二章](ch12.md) 討論。 #### 不變性的侷限性 許多不使用事件溯源模型的系統也還是依賴不可變性:各種資料庫在內部使用不可變的資料結構或多版本資料來支援時間點快照(請參閱 “[索引和快照隔離](ch7.md#索引和快照隔離)” )。 Git、Mercurial 和 Fossil 等版本控制系統也依靠不可變的資料來儲存檔案的版本歷史記錄。 永遠保持所有變更的不變歷史,在多大程度上是可行的?答案取決於資料集的流失率。一些工作負載主要是新增資料,很少更新或刪除;它們很容易保持不變。其他工作負載在相對較小的資料集上有較高的更新 / 刪除率;在這些情況下,不可變的歷史可能增至難以接受的巨大,碎片化可能成為一個問題,壓縮與垃圾收集的表現對於運維的穩健性變得至關重要【60,61】。 除了效能方面的原因外,也可能有出於管理方面的原因需要刪除資料的情況,儘管這些資料都是不可變的。例如,隱私條例可能要求在使用者關閉帳戶後刪除他們的個人資訊,資料保護立法可能要求刪除錯誤的資訊,或者可能需要阻止敏感資訊的意外洩露。 在這種情況下,僅僅在日誌中新增另一個事件來指明先前的資料應該被視為刪除是不夠的 —— 你實際上是想改寫歷史,並假裝資料從一開始就沒有寫入。例如,Datomic 管這個特性叫 **切除(excision)** 【62】,而 Fossil 版本控制系統有一個類似的概念叫 **避免(shunning)** 【63】。 真正刪除資料是非常非常困難的【64】,因為副本可能存在於很多地方:例如,儲存引擎,檔案系統和 SSD 通常會向一個新位置寫入,而不是原地覆蓋舊資料【52】,而備份通常是特意做成不可變的,防止意外刪除或損壞。刪除操作更多的是指 “使取回資料更困難”,而不是指 “使取回資料不可能”。無論如何,有時你必須得嘗試,正如我們在 “[立法與自律](ch12.md#立法與自律)” 中所看到的。 ## 流處理 到目前為止,本章中我們已經討論了流的來源(使用者活動事件,感測器和寫入資料庫),我們討論了流如何傳輸(直接透過訊息傳送,透過訊息代理,透過事件日誌)。 剩下的就是討論一下你可以用流做什麼 —— 也就是說,你可以處理它。一般來說,有三種選項: 1. 你可以將事件中的資料寫入資料庫、快取、搜尋索引或類似的儲存系統,然後能被其他客戶端查詢。如 [圖 11-5](../img/fig11-5.png) 所示,這是資料庫與系統其他部分所發生的變更保持同步的好方法 —— 特別是當流消費者是寫入資料庫的唯一客戶端時。如 “[批處理工作流的輸出](ch10.md#批處理工作流的輸出)” 中所討論的,它是寫入儲存系統的流等價物。 2. 你能以某種方式將事件推送給使用者,例如傳送報警郵件或推送通知,或將事件流式傳輸到可實時顯示的儀表板上。在這種情況下,人是流的最終消費者。 3. 你可以處理一個或多個輸入流,併產生一個或多個輸出流。流可能會經過由幾個這樣的處理階段組成的流水線,最後再輸出(選項 1 或 2)。 在本章的剩餘部分中,我們將討論選項 3:處理流以產生其他衍生流。處理這樣的流的程式碼片段,被稱為 **運算元(operator)** 或 **作業(job)**。它與我們在 [第十章](ch10.md) 中討論過的 Unix 程序和 MapReduce 作業密切相關,資料流的模式是相似的:一個流處理器以只讀的方式使用輸入流,並將其輸出以僅追加的方式寫入一個不同的位置。 流處理中的分割槽和並行化模式也非常類似於 [第十章](ch10.md) 中介紹的 MapReduce 和資料流引擎,因此我們不再重複這些主題。基本的 Map 操作(如轉換和過濾記錄)也是一樣的。 與批次作業相比的一個關鍵區別是,流不會結束。這種差異會帶來很多隱含的結果。正如本章開始部分所討論的,排序對無界資料集沒有意義,因此無法使用 **排序合併連線**(請參閱 “[Reduce 側連線與分組](ch10.md#Reduce側連線與分組)”)。容錯機制也必須改變:對於已經運行了幾分鐘的批處理作業,可以簡單地從頭開始重啟失敗任務,但是對於已經執行數年的流作業,重啟後從頭開始跑可能並不是一個可行的選項。 ### 流處理的應用 長期以來,流處理一直用於監控目的,如果某個事件發生,組織希望能得到警報。例如: * 欺詐檢測系統需要確定信用卡的使用模式是否有意外地變化,如果信用卡可能已被盜刷,則鎖卡。 * 交易系統需要檢查金融市場的價格變化,並根據指定的規則進行交易。 * 製造系統需要監控工廠中機器的狀態,如果出現故障,可以快速定位問題。 * 軍事和情報系統需要跟蹤潛在侵略者的活動,並在出現襲擊徵兆時發出警報。 這些型別的應用需要非常精密複雜的模式匹配與相關檢測。然而隨著時代的進步,流處理的其他用途也開始出現。在本節中,我們將簡要比較一下這些應用。 #### 複合事件處理 **複合事件處理(complex event processing, CEP)** 是 20 世紀 90 年代為分析事件流而開發出的一種方法,尤其適用於需要搜尋某些事件模式的應用【65,66】。與正則表示式允許你在字串中搜索特定字元模式的方式類似,CEP 允許你指定規則以在流中搜索某些事件模式。 CEP 系統通常使用高層次的宣告式查詢語言,比如 SQL,或者圖形使用者介面,來描述應該檢測到的事件模式。這些查詢被提交給處理引擎,該引擎消費輸入流,並在內部維護一個執行所需匹配的狀態機。當發現匹配時,引擎發出一個 **複合事件**(即 complex event,CEP 因此得名),並附有檢測到的事件模式詳情【67】。 在這些系統中,查詢和資料之間的關係與普通資料庫相比是顛倒的。通常情況下,資料庫會持久儲存資料,並將查詢視為臨時的:當查詢進入時,資料庫搜尋與查詢匹配的資料,然後在查詢完成時丟掉查詢。 CEP 引擎反轉了角色:查詢是長期儲存的,來自輸入流的事件不斷流過它們,搜尋匹配事件模式的查詢【68】。 CEP 的實現包括 Esper【69】、IBM InfoSphere Streams【70】、Apama、TIBCO StreamBase 和 SQLstream。像 Samza 這樣的分散式流處理元件,支援使用 SQL 在流上進行宣告式查詢【71】。 #### 流分析 使用流處理的另一個領域是對流進行分析。 CEP 與流分析之間的邊界是模糊的,但一般來說,分析往往對找出特定事件序列並不關心,而更關注大量事件上的聚合與統計指標 —— 例如: * 測量某種型別事件的速率(每個時間間隔內發生的頻率) * 滾動計算一段時間視窗內某個值的平均值 * 將當前的統計值與先前的時間區間的值對比(例如,檢測趨勢,當指標與上週同比異常偏高或偏低時報警) 這些統計值通常是在固定時間區間內進行計算的,例如,你可能想知道在過去 5 分鐘內服務每秒查詢次數的均值,以及此時間段內響應時間的第 99 百分位點。在幾分鐘內取平均,能抹平秒和秒之間的無關波動,且仍然能向你展示流量模式的時間圖景。聚合的時間間隔稱為 **視窗(window)**,我們將在 “[時間推理](#時間推理)” 中更詳細地討論視窗。 流分析系統有時會使用機率演算法,例如 Bloom filter(我們在 “[效能最佳化](ch3.md#效能最佳化)” 中遇到過)來管理成員資格,HyperLogLog【72】用於基數估計以及各種百分比估計算法(請參閱 “[實踐中的百分位點](ch1.md#實踐中的百分位點)“)。機率演算法產出近似的結果,但比起精確演算法的優點是記憶體使用要少得多。使用近似演算法有時讓人們覺得流處理系統總是有損的和不精確的,但這是錯誤看法:流處理並沒有任何內在的近似性,而機率演算法只是一種最佳化【73】。 許多開源分散式流處理框架的設計都是針對分析設計的:例如 Apache Storm、Spark Streaming、Flink、Concord、Samza 和 Kafka Streams 【74】。託管服務包括 Google Cloud Dataflow 和 Azure Stream Analytics。 #### 維護物化檢視 我們在 “[資料庫與流](#資料庫與流)” 中看到,資料庫的變更流可以用於維護衍生資料系統(如快取、搜尋索引和資料倉庫),並使其與源資料庫保持最新。我們可以將這些示例視作維護 **物化檢視(materialized view)** 的一種具體場景(請參閱 “[聚合:資料立方體和物化檢視](ch3.md#聚合:資料立方體和物化檢視)”):在某個資料集上衍生出一個替代檢視以便高效查詢,並在底層資料變更時更新檢視【50】。 同樣,在事件溯源中,應用程式的狀態是透過應用事件日誌來維護的;這裡的應用程式狀態也是一種物化檢視。與流分析場景不同的是,僅考慮某個時間視窗內的事件通常是不夠的:構建物化檢視可能需要任意時間段內的 **所有** 事件,除了那些可能由日誌壓縮丟棄的過時事件(請參閱 “[日誌壓縮](#日誌壓縮)“)。實際上,你需要一個可以一直延伸到時間開端的視窗。 原則上講,任何流處理元件都可以用於維護物化檢視,儘管 “永遠執行” 與一些面向分析的框架假設的 “主要在有限時間段視窗上執行” 背道而馳, Samza 和 Kafka Streams 支援這種用法,建立在 Kafka 對日誌壓縮的支援上【75】。 #### 在流上搜索 除了允許搜尋由多個事件構成模式的 CEP 外,有時也存在基於複雜標準(例如全文搜尋查詢)來搜尋單個事件的需求。 例如,媒體監測服務可以訂閱新聞文章 Feed 與來自媒體的播客,搜尋任何關於公司、產品或感興趣的話題的新聞。這是透過預先構建一個搜尋查詢來完成的,然後不斷地將新聞項的流與該查詢進行匹配。在一些網站上也有類似的功能:例如,當市場上出現符合其搜尋條件的新房產時,房地產網站的使用者可以要求網站通知他們。Elasticsearch 的這種過濾器功能,是實現這種流搜尋的一種選擇【76】。 傳統的搜尋引擎首先索引檔案,然後在索引上跑查詢。相比之下,搜尋一個數據流則反了過來:查詢被儲存下來,文件從查詢中流過,就像在 CEP 中一樣。最簡單的情況就是,你可以為每個文件測試每個查詢。但是如果你有大量查詢,這可能會變慢。為了最佳化這個過程,可以像對文件一樣,為查詢建立索引。因而收窄可能匹配的查詢集合【77】。 #### 訊息傳遞和RPC 在 “[訊息傳遞中的資料流](ch4.md#訊息傳遞中的資料流)” 中我們討論過,訊息傳遞系統可以作為 RPC 的替代方案,即作為一種服務間通訊的機制,比如在 Actor 模型中所使用的那樣。儘管這些系統也是基於訊息和事件,但我們通常不會將其視作流處理元件: * Actor 框架主要是管理模組通訊的併發和分散式執行的一種機制,而流處理主要是一種資料管理技術。 * Actor 之間的交流往往是短暫的、一對一的;而事件日誌則是持久的、多訂閱者的。 * Actor 可以以任意方式進行通訊(包括迴圈的請求 / 響應模式),但流處理通常配置在無環流水線中,其中每個流都是一個特定作業的輸出,由良好定義的輸入流中派生而來。 也就是說,RPC 類系統與流處理之間有一些交叉領域。例如,Apache Storm 有一個稱為 **分散式 RPC** 的功能,它允許將使用者查詢分散到一系列也處理事件流的節點上;然後這些查詢與來自輸入流的事件交織,而結果可以被彙總併發回給使用者【78】(另請參閱 “[多分割槽資料處理](ch12.md#多分割槽資料處理)”)。 也可以使用 Actor 框架來處理流。但是,很多這樣的框架在崩潰時不能保證訊息的傳遞,除非你實現了額外的重試邏輯,否則這種處理不是容錯的。 ### 時間推理 流處理通常需要與時間打交道,尤其是用於分析目的時候,會頻繁使用時間視窗,例如 “過去五分鐘的平均值”。“過去五分鐘” 的含義看上去似乎是清晰而無歧義的,但不幸的是,這個概念非常棘手。 在批處理中過程中,大量的歷史事件被快速地處理。如果需要按時間來分析,批處理器需要檢查每個事件中嵌入的時間戳。讀取執行批處理機器的系統時鐘沒有任何意義,因為處理執行的時間與事件實際發生的時間無關。 批處理可以在幾分鐘內讀取一年的歷史事件;在大多數情況下,感興趣的時間線是歷史中的一年,而不是處理中的幾分鐘。而且使用事件中的時間戳,使得處理是 **確定性** 的:在相同的輸入上再次執行相同的處理過程會得到相同的結果(請參閱 “[容錯](ch10.md#容錯)”)。 另一方面,許多流處理框架使用處理機器上的本地系統時鐘(**處理時間**,即 processing time)來確定 **視窗(windowing)**【79】。這種方法的優點是簡單,如果事件建立與事件處理之間的延遲可以忽略不計,那也是合理的。然而,如果存在任何顯著的處理延遲 —— 即,事件處理顯著地晚於事件實際發生的時間,這種處理方式就失效了。 #### 事件時間與處理時間 很多原因都可能導致處理延遲:排隊,網路故障(請參閱 “[不可靠的網路](ch8.md#不可靠的網路)”),效能問題導致訊息代理 / 訊息處理器出現爭用,流消費者重啟,從故障中恢復時重新處理過去的事件(請參閱 “[重播舊訊息](#重播舊訊息)”),或者在修復程式碼 BUG 之後。 而且,訊息延遲還可能導致無法預測訊息順序。例如,假設使用者首先發出一個 Web 請求(由 Web 伺服器 A 處理),然後發出第二個請求(由伺服器 B 處理)。 A 和 B 發出描述它們所處理請求的事件,但是 B 的事件在 A 的事件發生之前到達訊息代理。現在,流處理器將首先看到 B 事件,然後看到 A 事件,即使它們實際上是以相反的順序發生的。 有一個類比也許能幫助理解,“星球大戰” 電影:第四集於 1977 年發行,第五集於 1980 年,第六集於 1983 年,緊隨其後的是 1999 年的第一集,2002 年的第二集,和 2005 年的第三集,以及 2015 年的第七集【80】[^ii]。如果你按照按照它們上映的順序觀看電影,你處理電影的順序與它們敘事的順序就是不一致的。 (集數編號就像事件時間戳,而你觀看電影的日期就是處理時間)作為人類,我們能夠應對這種不連續性,但是流處理演算法需要專門編寫,以適應這種時序與順序的問題。 [^ii]: 感謝 Flink 社群的 Kostas Kloudas 提出這個比喻。 將事件時間和處理時間搞混會導致錯誤的資料。例如,假設你有一個流處理器用於測量請求速率(計算每秒請求數)。如果你重新部署流處理器,它可能會停止一分鐘,並在恢復之後處理積壓的事件。如果你按處理時間來衡量速率,那麼在處理積壓日誌時,請求速率看上去就像有一個異常的突發尖峰,而實際上請求速率是穩定的([圖 11-7](../img/fig11-7.png))。 ![](../img/fig11-7.png) **圖 11-7 按處理時間分窗,會因為處理速率的變動引入人為因素** #### 知道什麼時候準備好了 用事件時間來定義視窗的一個棘手的問題是,你永遠也無法確定是不是已經收到了特定視窗的所有事件,還是說還有一些事件正在來的路上。 例如,假設你將事件分組為一分鐘的視窗,以便統計每分鐘的請求數。你已經計數了一些帶有本小時內第 37 分鐘時間戳的事件,時間流逝,現在進入的主要都是本小時內第 38 和第 39 分鐘的事件。什麼時候才能宣佈你已經完成了第 37 分鐘的視窗計數,並輸出其計數器值? 在一段時間沒有看到任何新的事件之後,你可以超時並宣佈一個視窗已經就緒,但仍然可能發生這種情況:某些事件被緩衝在另一臺機器上,由於網路中斷而延遲。你需要能夠處理這種在視窗宣告完成之後到達的 **滯留(straggler)** 事件。大體上,你有兩種選擇【1】: 1. 忽略這些滯留事件,因為在正常情況下它們可能只是事件中的一小部分。你可以將丟棄事件的數量作為一個監控指標,並在出現大量丟訊息的情況時報警。 2. 釋出一個 **更正(correction)**,一個包括滯留事件的更新視窗值。你可能還需要收回以前的輸出。 在某些情況下,可以使用特殊的訊息來指示 “從現在開始,不會有比 t 更早時間戳的訊息了”,消費者可以使用它來觸發視窗【81】。但是,如果不同機器上的多個生產者都在生成事件,每個生產者都有自己的最小時間戳閾值,則消費者需要分別跟蹤每個生產者。在這種情況下,新增和刪除生產者都是比較棘手的。 #### 你用的是誰的時鐘? 當事件可能在系統內多個地方進行緩衝時,為事件分配時間戳更加困難了。例如,考慮一個移動應用向伺服器上報關於用量的事件。該應用可能會在裝置處於離線狀態時被使用,在這種情況下,它將在裝置本地緩衝事件,並在下一次網際網路連線可用時向伺服器上報這些事件(可能是幾小時甚至幾天)。對於這個流的任意消費者而言,它們就如延遲極大的滯留事件一樣。 在這種情況下,事件上的事件戳實際上應當是使用者交互發生的時間,取決於移動裝置的本地時鐘。然而使用者控制的裝置上的時鐘通常是不可信的,因為它可能會被無意或故意設定成錯誤的時間(請參閱 “[時鐘同步與準確性](ch8.md#時鐘同步與準確性)”)。伺服器收到事件的時間(取決於伺服器的時鐘)可能是更準確的,因為伺服器在你的控制之下,但在描述使用者互動方面意義不大。 要校正不正確的裝置時鐘,一種方法是記錄三個時間戳【82】: * 事件發生的時間,取決於裝置時鐘 * 事件傳送往伺服器的時間,取決於裝置時鐘 * 事件被伺服器接收的時間,取決於伺服器時鐘 透過從第三個時間戳中減去第二個時間戳,可以估算裝置時鐘和伺服器時鐘之間的偏移(假設網路延遲與所需的時間戳精度相比可忽略不計)。然後可以將該偏移應用於事件時間戳,從而估計事件實際發生的真實時間(假設裝置時鐘偏移在事件發生時與送往伺服器之間沒有變化)。 這並不是流處理獨有的問題,批處理有著完全一樣的時 間推理問題。只是在流處理的上下文中,我們更容易意識到時間的流逝。 #### 視窗的型別 當你知道如何確定一個事件的時間戳後,下一步就是如何定義時間段的視窗。然後視窗就可以用於聚合,例如事件計數,或計算視窗內值的平均值。有幾種視窗很常用【79,83】: * 滾動視窗(Tumbling Window) 滾動視窗有著固定的長度,每個事件都僅能屬於一個視窗。例如,假設你有一個 1 分鐘的滾動視窗,則所有時間戳在 `10:03:00` 和 `10:03:59` 之間的事件會被分組到一個視窗中,`10:04:00` 和 `10:04:59` 之間的事件被分組到下一個視窗,依此類推。透過將每個事件時間戳四捨五入至最近的分鐘來確定它所屬的視窗,可以實現 1 分鐘的滾動視窗。 * 跳動視窗(Hopping Window) 跳動視窗也有著固定的長度,但允許視窗重疊以提供一些平滑。例如,一個帶有 1 分鐘跳躍步長的 5 分鐘視窗將包含 `10:03:00` 至 `10:07:59` 之間的事件,而下一個視窗將覆蓋 `10:04:00` 至 `10:08:59` 之間的事件,等等。透過首先計算 1 分鐘的滾動視窗(tunmbling window),然後在幾個相鄰視窗上進行聚合,可以實現這種跳動視窗。 * 滑動視窗(Sliding Window) 滑動視窗包含了彼此間距在特定時長內的所有事件。例如,一個 5 分鐘的滑動視窗應當覆蓋 `10:03:39` 和 `10:08:12` 的事件,因為它們相距不超過 5 分鐘(注意滾動視窗與步長 5 分鐘的跳動視窗可能不會把這兩個事件分組到同一個視窗中,因為它們使用固定的邊界)。透過維護一個按時間排序的事件緩衝區,並不斷從視窗中移除過期的舊事件,可以實現滑動視窗。 * 會話視窗(Session window) 與其他視窗型別不同,會話視窗沒有固定的持續時間,而定義為:將同一使用者出現時間相近的所有事件分組在一起,而當用戶一段時間沒有活動時(例如,如果 30 分鐘內沒有事件)視窗結束。會話切分是網站分析的常見需求(請參閱 “[分組](ch10.md#分組)”)。 ### 流連線 在 [第十章](ch10.md) 中,我們討論了批處理作業如何透過鍵來連線資料集,以及這種連線是如何成為資料管道的重要組成部分的。由於流處理將資料管道泛化為對無限資料集進行增量處理,因此對流進行連線的需求也是完全相同的。 然而,新事件隨時可能出現在一個流中,這使得流連線要比批處理連線更具挑戰性。為了更好地理解情況,讓我們先來區分三種不同型別的連線:**流 - 流** 連線,**流 - 表** 連線,與 **表 - 表** 連線【84】。我們將在下面的章節中透過例子來說明。 #### 流流連線(視窗連線) 假設你的網站上有搜尋功能,而你想要找出搜尋 URL 的近期趨勢。每當有人鍵入搜尋查詢時,都會記錄下一個包含查詢與其返回結果的事件。每當有人點選其中一個搜尋結果時,就會記錄另一個記錄點選事件。為了計算搜尋結果中每個 URL 的點選率,你需要將搜尋動作與點選動作的事件連在一起,這些事件透過相同的會話 ID 進行連線。廣告系統中需要類似的分析【85】。 如果使用者丟棄了搜尋結果,點選可能永遠不會發生,即使它出現了,搜尋與點選之間的時間可能是高度可變的:在很多情況下,它可能是幾秒鐘,但也可能長達幾天或幾周(如果使用者執行搜尋,忘掉了這個瀏覽器頁面,過了一段時間後重新回到這個瀏覽器頁面上,並點選了一個結果)。由於可變的網路延遲,點選事件甚至可能先於搜尋事件到達。你可以選擇合適的連線視窗 —— 例如,如果點選與搜尋之間的時間間隔在一小時內,你可能會選擇連線兩者。 請注意,在點選事件中嵌入搜尋詳情與事件連線並不一樣:這樣做的話,只有當用戶點選了一個搜尋結果時你才能知道,而那些沒有點選的搜尋就無能為力了。為了衡量搜尋質量,你需要準確的點選率,為此搜尋事件和點選事件兩者都是必要的。 為了實現這種型別的連線,流處理器需要維護 **狀態**:例如,按會話 ID 索引最近一小時內發生的所有事件。無論何時發生搜尋事件或點選事件,都會被新增到合適的索引中,而流處理器也會檢查另一個索引是否有具有相同會話 ID 的事件到達。如果有匹配事件就會發出一個表示搜尋結果被點選的事件;如果搜尋事件直到過期都沒看見有匹配的點選事件,就會發出一個表示搜尋結果未被點選的事件。 #### 流表連線(流擴充) 在 “[示例:使用者活動事件分析](ch10.md#示例:使用者活動事件分析)”([圖 10-2](../img/fig10-2.png))中,我們看到了連線兩個資料集的批處理作業示例:一組使用者活動事件和一個使用者檔案資料庫。將使用者活動事件視為流,並在流處理器中連續執行相同的連線是很自然的想法:輸入是包含使用者 ID 的活動事件流,而輸出還是活動事件流,但其中使用者 ID 已經被擴充套件為使用者的檔案資訊。這個過程有時被稱為使用資料庫的資訊來 **擴充(enriching)** 活動事件。 要執行此連線,流處理器需要一次處理一個活動事件,在資料庫中查詢事件的使用者 ID,並將檔案資訊新增到活動事件中。資料庫查詢可以透過查詢遠端資料庫來實現。但正如在 “[示例:使用者活動事件分析](ch10.md#示例:使用者活動事件分析)” 一節中討論的,此類遠端查詢可能會很慢,並且有可能導致資料庫過載【75】。 另一種方法是將資料庫副本載入到流處理器中,以便在本地進行查詢而無需網路往返。這種技術與我們在 “[Map 側連線](ch10.md#Map側連線)” 中討論的雜湊連線非常相似:如果資料庫的本地副本足夠小,則可以是記憶體中的散列表,比較大的話也可以是本地磁碟上的索引。 與批處理作業的區別在於,批處理作業使用資料庫的時間點快照作為輸入,而流處理器是長時間執行的,且資料庫的內容可能隨時間而改變,所以流處理器資料庫的本地副本需要保持更新。這個問題可以透過變更資料捕獲來解決:流處理器可以訂閱使用者檔案資料庫的更新日誌,如同活動事件流一樣。當增添或修改檔案時,流處理器會更新其本地副本。因此,我們有了兩個流之間的連線:活動事件和檔案更新。 流表連線實際上非常類似於流流連線;最大的區別在於對於表的變更日誌流,連線使用了一個可以回溯到 “時間起點” 的視窗(概念上是無限的視窗),新版本的記錄會覆蓋更早的版本。對於輸入的流,連線可能壓根兒就沒有維護任何視窗。 #### 表表連線(維護物化檢視) 我們在 “[描述負載](ch1.md#描述負載)” 中討論的推特時間線例子時說過,當用戶想要檢視他們的主頁時間線時,迭代使用者所關注人群的推文併合並它們是一個開銷巨大的操作。 相反,我們需要一個時間線快取:一種每個使用者的 “收件箱”,在傳送推文的時候寫入這些資訊,因而讀取時間線時只需要簡單地查詢即可。物化與維護這個快取需要處理以下事件: * 當用戶 u 傳送新的推文時,它將被新增到每個關注使用者 u 的時間線上。 * 使用者刪除推文時,推文將從所有使用者的時間表中刪除。 * 當用戶 $u_1$ 開始關注使用者 $u_2$ 時,$u_2$ 最近的推文將被新增到 $u_1$ 的時間線上。 * 當用戶 $u_1$ 取消關注使用者 $u_2$ 時,$u_2$ 的推文將從 $u_1$ 的時間線中移除。 要在流處理器中實現這種快取維護,你需要推文事件流(傳送與刪除)和關注關係事件流(關注與取消關注)。流處理需要維護一個數據庫,包含每個使用者的粉絲集合。以便知道當一條新推文到達時,需要更新哪些時間線【86】。 觀察這個流處理過程的另一種視角是:它維護了一個連線了兩個表(推文與關注)的物化檢視,如下所示: ```sql SELECT follows.follower_id AS timeline_id, array_agg(tweets.* ORDER BY tweets.timestamp DESC) FROM tweets JOIN follows ON follows.followee_id = tweets.sender_id GROUP BY follows.follower_id ``` 流連線直接對應於這個查詢中的表連線。時間線實際上是這個查詢結果的快取,每當底層的表發生變化時都會更新 [^iii]。 [^iii]: 如果你將流視作表的衍生物,如 [圖 11-6](../img/fig11-6.png) 所示,而把一個連線看作是兩個表的乘法u·v,那麼會發生一些有趣的事情:物化連線的變化流遵循乘積法則:(u·v)'= u'v + uv'。 換句話說,任何推文的變化量都與當前的關注聯絡在一起,任何關注的變化量都與當前的推文相連線【49,50】。 #### 連線的時間依賴性 這裡描述的三種連線(流流,流表,表表)有很多共通之處:它們都需要流處理器維護連線一側的一些狀態(搜尋與點選事件,使用者檔案,關注列表),然後當連線另一側的訊息到達時查詢該狀態。 用於維護狀態的事件順序是很重要的(先關注然後取消關注,或者其他類似操作)。在分割槽日誌中,單個分割槽內的事件順序是保留下來的。但典型情況下是沒有跨流或跨分割槽的順序保證的。 這就產生了一個問題:如果不同流中的事件發生在近似的時間範圍內,則應該按照什麼樣的順序進行處理?在流表連線的例子中,如果使用者更新了它們的檔案,哪些活動事件與舊檔案連線(在檔案更新前處理),哪些又與新檔案連線(在檔案更新之後處理)?換句話說:你需要對一些狀態做連線,如果狀態會隨著時間推移而變化,那應當使用什麼時間點來連線呢【45】? 這種時序依賴可能出現在很多地方。例如銷售東西需要對發票應用適當的稅率,這取決於所處的國家 / 州,產品型別,銷售日期(因為稅率時不時會變化)。當連線銷售額與稅率表時,你可能期望的是使用銷售時的稅率參與連線。如果你正在重新處理歷史資料,銷售時的稅率可能和現在的稅率有所不同。 如果跨越流的事件順序是未定的,則連線會變為不確定性的【87】,這意味著你在同樣輸入上重跑相同的作業未必會得到相同的結果:當你重跑任務時,輸入流上的事件可能會以不同的方式交織。 在資料倉庫中,這個問題被稱為 **緩慢變化的維度(slowly changing dimension, SCD)**,通常透過對特定版本的記錄使用唯一的識別符號來解決:例如,每當稅率改變時都會獲得一個新的識別符號,而發票在銷售時會帶有稅率的識別符號【88,89】。這種變化使連線變為確定性的,但也會導致日誌壓縮無法進行:表中所有的記錄版本都需要保留。 ### 容錯 在本章的最後一節中,讓我們看一看流處理是如何容錯的。我們在 [第十章](ch10.md) 中看到,批處理框架可以很容易地容錯:如果 MapReduce 作業中的任務失敗,可以簡單地在另一臺機器上再次啟動,並且丟棄失敗任務的輸出。這種透明的重試是可能的,因為輸入檔案是不可變的,每個任務都將其輸出寫入到 HDFS 上的獨立檔案中,而輸出僅當任務成功完成後可見。 特別是,批處理容錯方法可確保批處理作業的輸出與沒有出錯的情況相同,即使實際上某些任務失敗了。看起來好像每條輸入記錄都被處理了恰好一次 —— 沒有記錄被跳過,而且沒有記錄被處理兩次。儘管重啟任務意味著實際上可能會多次處理記錄,但輸出中的可見效果看上去就像只處理過一次。這個原則被稱為 **恰好一次語義(exactly-once semantics)**,儘管 **等效一次(effectively-once)** 可能會是一個更寫實的術語【90】。 在流處理中也出現了同樣的容錯問題,但是處理起來沒有那麼直觀:等待某個任務完成之後再使其輸出可見並不是一個可行選項,因為你永遠無法處理完一個無限的流。 #### 微批次與存檔點 一個解決方案是將流分解成小塊,並像微型批處理一樣處理每個塊。這種方法被稱為 **微批次(microbatching)**,它被用於 Spark Streaming 【91】。批次的大小通常約為 1 秒,這是對效能妥協的結果:較小的批次會導致更大的排程與協調開銷,而較大的批次意味著流處理器結果可見之前的延遲要更長。 微批次也隱式提供了一個與批次大小相等的滾動視窗(按處理時間而不是事件時間戳分窗)。任何需要更大視窗的作業都需要顯式地將狀態從一個微批次轉移到下一個微批次。 Apache Flink 則使用不同的方法,它會定期生成狀態的滾動存檔點並將其寫入持久儲存【92,93】。如果流運算元崩潰,它可以從最近的存檔點重啟,並丟棄從最近檢查點到崩潰之間的所有輸出。存檔點會由訊息流中的 **壁障(barrier)** 觸發,類似於微批次之間的邊界,但不會強制一個特定的視窗大小。 在流處理框架的範圍內,微批次與存檔點方法提供了與批處理一樣的 **恰好一次語義**。但是,只要輸出離開流處理器(例如,寫入資料庫,向外部訊息代理傳送訊息,或傳送電子郵件),框架就無法拋棄失敗批次的輸出了。在這種情況下,重啟失敗任務會導致外部副作用發生兩次,只有微批次或存檔點不足以阻止這一問題。 #### 原子提交再現 為了在出現故障時表現出恰好處理一次的樣子,我們需要確保事件處理的所有輸出和副作用 **當且僅當** 處理成功時才會生效。這些影響包括傳送給下游運算元或外部訊息傳遞系統(包括電子郵件或推送通知)的任何訊息,任何資料庫寫入,對運算元狀態的任何變更,以及對輸入訊息的任何確認(包括在基於日誌的訊息代理中將消費者偏移量前移)。 這些事情要麼都原子地發生,要麼都不發生,但是它們不應當失去同步。如果這種方法聽起來很熟悉,那是因為我們在分散式事務和兩階段提交的上下文中討論過它(請參閱 “[恰好一次的訊息處理](ch9.md#恰好一次的訊息處理)”)。 在 [第九章](ch9.md) 中,我們討論了分散式事務傳統實現中的問題(如 XA)。然而在限制更為嚴苛的環境中,也是有可能高效實現這種原子提交機制的。 Google Cloud Dataflow【81,92】和 VoltDB 【94】中使用了這種方法,Apache Kafka 有計劃加入類似的功能【95,96】。與 XA 不同,這些實現不會嘗試跨異構技術提供事務,而是透過在流處理框架中同時管理狀態變更與訊息傳遞來內化事務。事務協議的開銷可以透過在單個事務中處理多個輸入訊息來分攤。 #### 冪等性 我們的目標是丟棄任何失敗任務的部分輸出,以便能安全地重試,而不會生效兩次。分散式事務是實現這個目標的一種方式,而另一種方式是依賴 **冪等性(idempotence)**【97】。 冪等操作是多次重複執行與單次執行效果相同的操作。例如,將鍵值儲存中的某個鍵設定為某個特定值是冪等的(再次寫入該值,只是用同樣的值替代),而遞增一個計數器不是冪等的(再次執行遞增意味著該值遞增兩次)。 即使一個操作不是天生冪等的,往往可以透過一些額外的元資料做成冪等的。例如,在使用來自 Kafka 的訊息時,每條訊息都有一個持久的、單調遞增的偏移量。將值寫入外部資料庫時可以將這個偏移量帶上,這樣你就可以判斷一條更新是不是已經執行過了,因而避免重複執行。 Storm 的 Trident 基於類似的想法來處理狀態【78】。依賴冪等性意味著隱含了一些假設:重啟一個失敗的任務必須以相同的順序重播相同的訊息(基於日誌的訊息代理能做這些事),處理必須是確定性的,沒有其他節點能同時更新相同的值【98,99】。 當從一個處理節點故障切換到另一個節點時,可能需要進行 **防護**(fencing,請參閱 “[領導者和鎖](ch8.md#領導者和鎖)”),以防止被假死節點干擾。儘管有這麼多注意事項,冪等操作是一種實現 **恰好一次語義** 的有效方式,僅需很小的額外開銷。 #### 失敗後重建狀態 任何需要狀態的流處理 —— 例如,任何視窗聚合(例如計數器,平均值和直方圖)以及任何用於連線的表和索引,都必須確保在失敗之後能恢復其狀態。 一種選擇是將狀態儲存在遠端資料儲存中,並進行復制,然而正如在 “[流表連線(流擴充)](#流表連線(流擴充))” 中所述,每個訊息都要查詢遠端資料庫可能會很慢。另一種方法是在流處理器本地儲存狀態,並定期複製。然後當流處理器從故障中恢復時,新任務可以讀取狀態副本,恢復處理而不丟失資料。 例如,Flink 定期捕獲運算元狀態的快照,並將它們寫入 HDFS 等持久儲存中【92,93】。 Samza 和 Kafka Streams 透過將狀態變更傳送到具有日誌壓縮功能的專用 Kafka 主題來複制狀態變更,這與變更資料捕獲類似【84,100】。 VoltDB 透過在多個節點上對每個輸入訊息進行冗餘處理來複制狀態(請參閱 “[真的序列執行](ch7.md#真的序列執行)”)。 在某些情況下,甚至可能都不需要複製狀態,因為它可以從輸入流重建。例如,如果狀態是從相當短的視窗中聚合而成,則簡單地重播該視窗中的輸入事件可能是足夠快的。如果狀態是透過變更資料捕獲來維護的資料庫的本地副本,那麼也可以從日誌壓縮的變更流中重建資料庫(請參閱 “[日誌壓縮](#日誌壓縮)”)。 然而,所有這些權衡取決於底層基礎架構的效能特徵:在某些系統中,網路延遲可能低於磁碟訪問延遲,網路頻寬也可能與磁碟頻寬相當。沒有針對所有情況的普適理想權衡,隨著儲存和網路技術的發展,本地狀態與遠端狀態的優點也可能會互換。 ## 本章小結 在本章中,我們討論了事件流,它們所服務的目的,以及如何處理它們。在某些方面,流處理非常類似於在 [第十章](ch10.md) 中討論的批處理,不過是在無限的(永無止境的)流而不是固定大小的輸入上持續進行。從這個角度來看,訊息代理和事件日誌可以視作檔案系統的流式等價物。 我們花了一些時間比較兩種訊息代理: * AMQP/JMS 風格的訊息代理 代理將單條訊息分配給消費者,消費者在成功處理單條訊息後確認訊息。訊息被確認後從代理中刪除。這種方法適合作為一種非同步形式的 RPC(另請參閱 “[訊息傳遞中的資料流](ch4.md#訊息傳遞中的資料流)”),例如在任務佇列中,訊息處理的確切順序並不重要,而且訊息在處理完之後,不需要回頭重新讀取舊訊息。 * 基於日誌的訊息代理 代理將一個分割槽中的所有訊息分配給同一個消費者節點,並始終以相同的順序傳遞訊息。並行是透過分割槽實現的,消費者透過存檔最近處理訊息的偏移量來跟蹤工作進度。訊息代理將訊息保留在磁碟上,因此如有必要的話,可以回跳並重新讀取舊訊息。 基於日誌的方法與資料庫中的複製日誌(請參閱 [第五章](ch5.md))和日誌結構儲存引擎(請參閱 [第三章](ch3.md))有相似之處。我們看到,這種方法對於消費輸入流,併產生衍生狀態或衍生輸出資料流的系統而言特別適用。 就流的來源而言,我們討論了幾種可能性:使用者活動事件,定期讀數的感測器,和 Feed 資料(例如,金融中的市場資料)能夠自然地表示為流。我們發現將資料庫寫入視作流也是很有用的:我們可以捕獲變更日誌 —— 即對資料庫所做的所有變更的歷史記錄 —— 隱式地透過變更資料捕獲,或顯式地透過事件溯源。日誌壓縮允許流也能保有資料庫內容的完整副本。 將資料庫表示為流為系統整合帶來了很多強大機遇。透過消費變更日誌並將其應用至衍生系統,你能使諸如搜尋索引、快取以及分析系統這類衍生資料系統不斷保持更新。你甚至能從頭開始,透過讀取從創世至今的所有變更日誌,為現有資料建立全新的檢視。 像流一樣維護狀態以及訊息重播的基礎設施,是在各種流處理框架中實現流連線和容錯的基礎。我們討論了流處理的幾種目的,包括搜尋事件模式(複雜事件處理),計算分窗聚合(流分析),以及保證衍生資料系統處於最新狀態(物化檢視)。 然後我們討論了在流處理中對時間進行推理的困難,包括處理時間與事件時間戳之間的區別,以及當你認為視窗已經完事之後,如何處理到達的掉隊事件的問題。 我們區分了流處理中可能出現的三種連線型別: * 流流連線 兩個輸入流都由活動事件組成,而連線運算元在某個時間視窗內搜尋相關的事件。例如,它可能會將同一個使用者 30 分鐘內進行的兩個活動聯絡在一起。如果你想要找出一個流內的相關事件,連線的兩側輸入可能實際上都是同一個流(**自連線**,即 self-join)。 * 流表連線 一個輸入流由活動事件組成,另一個輸入流是資料庫變更日誌。變更日誌保證了資料庫的本地副本是最新的。對於每個活動事件,連線運算元將查詢資料庫,並輸出一個擴充套件的活動事件。 * 表表連線 兩個輸入流都是資料庫變更日誌。在這種情況下,一側的每一個變化都與另一側的最新狀態相連線。結果是兩表連線所得物化檢視的變更流。 最後,我們討論了在流處理中實現容錯和恰好一次語義的技術。與批處理一樣,我們需要放棄任何失敗任務的部分輸出。然而由於流處理長時間執行並持續產生輸出,所以不能簡單地丟棄所有的輸出。相反,可以使用更細粒度的恢復機制,基於微批次、存檔點、事務或冪等寫入。 ## 參考文獻 1. Tyler Akidau, Robert Bradshaw, Craig Chambers, et al.: “[The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing](http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf),” *Proceedings of the VLDB Endowment*, volume 8, number 12, pages 1792–1803, August 2015. [doi:10.14778/2824032.2824076](http://dx.doi.org/10.14778/2824032.2824076) 1. Harold Abelson, Gerald Jay Sussman, and Julie Sussman: *Structure and Interpretation of Computer Programs*, 2nd edition. MIT Press, 1996. ISBN: 978-0-262-51087-5, available online at *mitpress.mit.edu* 1. Patrick Th. Eugster, Pascal A. Felber, Rachid Guerraoui, and Anne-Marie Kermarrec: “[The Many Faces of Publish/Subscribe](http://www.cs.ru.nl/~pieter/oss/manyfaces.pdf),” *ACM Computing Surveys*, volume 35, number 2, pages 114–131, June 2003. [doi:10.1145/857076.857078](http://dx.doi.org/10.1145/857076.857078) 1. Joseph M. Hellerstein and Michael Stonebraker: *Readings in Database Systems*, 4th edition. MIT Press, 2005. ISBN: 978-0-262-69314-1, available online at *redbook.cs.berkeley.edu* 1. Don Carney, Uğur Çetintemel, Mitch Cherniack, et al.: “[Monitoring Streams – A New Class of Data Management Applications](http://www.vldb.org/conf/2002/S07P02.pdf),” at *28th International Conference on Very Large Data Bases* (VLDB), August 2002. 1. Matthew Sackman: “[Pushing Back](http://www.lshift.net/blog/2016/05/05/pushing-back/),” *lshift.net*, May 5, 2016. Vicent Martí: “[Brubeck, a statsd-Compatible Metrics Aggregator](http://githubengineering.com/brubeck/),” *githubengineering.com*, June 15, 2015. Seth Lowenberger: “[MoldUDP64 Protocol Specification V 1.00](http://www.nasdaqtrader.com/content/technicalsupport/specifications/dataproducts/moldudp64.pdf),” *nasdaqtrader.com*, July 2009. 1. Pieter Hintjens: *ZeroMQ – The Guide*. O'Reilly Media, 2013. ISBN: 978-1-449-33404-8 1. Ian Malpass: “[Measure Anything, Measure Everything](https://codeascraft.com/2011/02/15/measure-anything-measure-everything/),” *codeascraft.com*, February 15, 2011. 1. Dieter Plaetinck: “[25 Graphite, Grafana and statsd Gotchas](https://blog.raintank.io/25-graphite-grafana-and-statsd-gotchas/),” *blog.raintank.io*, March 3, 2016. 1. Jeff Lindsay: “[Web Hooks to Revolutionize the Web](http://progrium.com/blog/2007/05/03/web-hooks-to-revolutionize-the-web/),” *progrium.com*, May 3, 2007. 1. Jim N. Gray: “[Queues Are Databases](http://research.microsoft.com/pubs/69641/tr-95-56.pdf),” Microsoft Research Technical Report MSR-TR-95-56, December 1995. 1. Mark Hapner, Rich Burridge, Rahul Sharma, et al.: “[JSR-343 Java Message Service (JMS) 2.0 Specification](https://jcp.org/en/jsr/detail?id=343),” *jms-spec.java.net*, March 2013. 1. Sanjay Aiyagari, Matthew Arrott, Mark Atwell, et al.: “[AMQP: Advanced Message Queuing Protocol Specification](http://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf),” Version 0-9-1, November 2008. 1. “[Google Cloud Pub/Sub: A Google-Scale Messaging Service](https://cloud.google.com/pubsub/architecture),” *cloud.google.com*, 2016. 1. “[Apache Kafka 0.9 Documentation](http://kafka.apache.org/documentation.html),” *kafka.apache.org*, November 2015. 1. Jay Kreps, Neha Narkhede, and Jun Rao: “[Kafka: A Distributed Messaging System for Log Processing](http://www.longyu23.com/doc/Kafka.pdf),” at *6th International Workshop on Networking Meets Databases* (NetDB), June 2011. 1. “[Amazon Kinesis Streams Developer Guide](http://docs.aws.amazon.com/streams/latest/dev/introduction.html),” *docs.aws.amazon.com*, April 2016. 1. Leigh Stewart and Sijie Guo: “[Building DistributedLog: Twitter’s High-Performance Replicated Log Service](https://blog.twitter.com/2015/building-distributedlog-twitter-s-high-performance-replicated-log-service),” *blog.twitter.com*, September 16, 2015. 1. “[DistributedLog Documentation](http://distributedlog.incubator.apache.org/docs/latest/),” Twitter, Inc., *distributedlog.io*, May 2016. Jay Kreps: “[Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines)](https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines),” *engineering.linkedin.com*, April 27, 2014. 1. Kartik Paramasivam: “[How We’re Improving and Advancing Kafka at LinkedIn](https://engineering.linkedin.com/apache-kafka/how-we_re-improving-and-advancing-kafka-linkedin),” *engineering.linkedin.com*, September 2, 2015. 1. Jay Kreps: “[The Log: What Every Software Engineer Should Know About Real-Time Data's Unifying Abstraction](http://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying),” *engineering.linkedin.com*, December 16, 2013. 1. Shirshanka Das, Chavdar Botev, Kapil Surlaker, et al.: “[All Aboard the Databus!](http://www.socc2012.org/s18-das.pdf),” at *3rd ACM Symposium on Cloud Computing* (SoCC), October 2012. 1. Yogeshwer Sharma, Philippe Ajoux, Petchean Ang, et al.: “[Wormhole: Reliable Pub-Sub to Support Geo-Replicated Internet Services](https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-sharma.pdf),” at *12th USENIX Symposium on Networked Systems Design and Implementation* (NSDI), May 2015. 1. P. P. S. Narayan: “[Sherpa Update](http://web.archive.org/web/20160801221400/https://developer.yahoo.com/blogs/ydn/sherpa-7992.html),” *developer.yahoo.com*, June 8, . 1. Martin Kleppmann: “[Bottled Water: Real-Time Integration of PostgreSQL and Kafka](http://martin.kleppmann.com/2015/04/23/bottled-water-real-time-postgresql-kafka.html),” *martin.kleppmann.com*, April 23, 2015. 1. Ben Osheroff: “[Introducing Maxwell, a mysql-to-kafka Binlog Processor](https://developer.zendesk.com/blog/introducing-maxwell-a-mysql-to-kafka-binlog-processor),” *developer.zendesk.com*, August 20, 2015. 1. Randall Hauch: “[Debezium 0.2.1 Released](http://debezium.io/blog/2016/06/10/Debezium-0/),” *debezium.io*, June 10, 2016. 1. Prem Santosh Udaya Shankar: “[Streaming MySQL Tables in Real-Time to Kafka](https://engineeringblog.yelp.com/2016/08/streaming-mysql-tables-in-real-time-to-kafka.html),” *engineeringblog.yelp.com*, August 1, 2016. 1. “[Mongoriver](https://github.com/stripe/mongoriver),” Stripe, Inc., *github.com*, September 2014. 1. Dan Harvey: “[Change Data Capture with Mongo + Kafka](http://www.slideshare.net/danharvey/change-data-capture-with-mongodb-and-kafka),” at *Hadoop Users Group UK*, August 2015. 1. “[Oracle GoldenGate 12c: Real-Time Access to Real-Time Information](http://www.oracle.com/us/products/middleware/data-integration/oracle-goldengate-realtime-access-2031152.pdf),” Oracle White Paper, March 2015. 1. “[Oracle GoldenGate Fundamentals: How Oracle GoldenGate Works](https://www.youtube.com/watch?v=6H9NibIiPQE),” Oracle Corporation, *youtube.com*, November 2012. 1. Slava Akhmechet: “[Advancing the Realtime Web](http://rethinkdb.com/blog/realtime-web/),” *rethinkdb.com*, January 27, 2015. 1. “[Firebase Realtime Database Documentation](https://firebase.google.com/docs/database/),” Google, Inc., *firebase.google.com*, May 2016. 1. “[Apache CouchDB 1.6 Documentation](http://docs.couchdb.org/en/latest/),” *docs.couchdb.org*, 2014. 1. Matt DeBergalis: “[Meteor 0.7.0: Scalable Database Queries Using MongoDB Oplog Instead of Poll-and-Diff](http://info.meteor.com/blog/meteor-070-scalable-database-queries-using-mongodb-oplog-instead-of-poll-and-diff),” *info.meteor.com*, December 17, 2013. 1. “[Chapter 15. Importing and Exporting Live Data](https://docs.voltdb.com/UsingVoltDB/ChapExport.php),” VoltDB 6.4 User Manual, *docs.voltdb.com*, June 2016. 1. Neha Narkhede: “[Announcing Kafka Connect: Building Large-Scale Low-Latency Data Pipelines](http://www.confluent.io/blog/announcing-kafka-connect-building-large-scale-low-latency-data-pipelines),” *confluent.io*, February 18, 2016. 1. Greg Young: “[CQRS and Event Sourcing](https://www.youtube.com/watch?v=JHGkaShoyNs),” at *Code on the Beach*, August 2014. 1. Martin Fowler: “[Event Sourcing](http://martinfowler.com/eaaDev/EventSourcing.html),” *martinfowler.com*, December 12, 2005. 1. Vaughn Vernon: *Implementing Domain-Driven Design*. Addison-Wesley Professional, 2013. ISBN: 978-0-321-83457-7 1. H. V. Jagadish, Inderpal Singh Mumick, and Abraham Silberschatz: “[View Maintenance Issues for the Chronicle Data Model](http://www.mathcs.emory.edu/~cheung/papers/StreamDB/Histogram/1995-Jagadish-Histo.pdf),” at *14th ACM SIGACT-SIGMOD-SIGART Symposium on Principles of Database Systems* (PODS), May 1995. [doi:10.1145/212433.220201](http://dx.doi.org/10.1145/212433.220201) 1. “[Event Store 3.5.0 Documentation](http://docs.geteventstore.com/),” Event Store LLP, *docs.geteventstore.com*, February 2016. 1. Martin Kleppmann: *Making Sense of Stream Processing*. Report, O'Reilly Media, May 2016. 1. Sander Mak: “[Event-Sourced Architectures with Akka](http://www.slideshare.net/SanderMak/eventsourced-architectures-with-akka),” at *JavaOne*, September 2014. 1. Julian Hyde: [personal communication](https://twitter.com/julianhyde/status/743374145006641153), June 2016. 1. Ashish Gupta and Inderpal Singh Mumick: *Materialized Views: Techniques, Implementations, and Applications*. MIT Press, 1999. ISBN: 978-0-262-57122-7 1. Timothy Griffin and Leonid Libkin: “[Incremental Maintenance of Views with Duplicates](http://homepages.inf.ed.ac.uk/libkin/papers/sigmod95.pdf),” at *ACM International Conference on Management of Data* (SIGMOD), May 1995. [doi:10.1145/223784.223849](http://dx.doi.org/10.1145/223784.223849) 1. Pat Helland: “[Immutability Changes Everything](http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper16.pdf),” at *7th Biennial Conference on Innovative Data Systems Research* (CIDR), January 2015. 1. Martin Kleppmann: “[Accounting for Computer Scientists](http://martin.kleppmann.com/2011/03/07/accounting-for-computer-scientists.html),” *martin.kleppmann.com*, March 7, 2011. 1. Pat Helland: “[Accountants Don't Use Erasers](https://blogs.msdn.microsoft.com/pathelland/2007/06/14/accountants-dont-use-erasers/),” *blogs.msdn.com*, June 14, 2007. 1. Fangjin Yang: “[Dogfooding with Druid, Samza, and Kafka: Metametrics at Metamarkets](https://metamarkets.com/2015/dogfooding-with-druid-samza-and-kafka-metametrics-at-metamarkets/),” *metamarkets.com*, June 3, 2015. 1. Gavin Li, Jianqiu Lv, and Hang Qi: “[Pistachio: Co-Locate the Data and Compute for Fastest Cloud Compute](http://yahoohadoop.tumblr.com/post/116365275781/pistachio-co-locate-the-data-and-compute-for),” *yahoohadoop.tumblr.com*, April 13, 2015. 1. Kartik Paramasivam: “[Stream Processing Hard Problems – Part 1: Killing Lambda](https://engineering.linkedin.com/blog/2016/06/stream-processing-hard-problems-part-1-killing-lambda),” *engineering.linkedin.com*, June 27, 2016. 1. Martin Fowler: “[CQRS](http://martinfowler.com/bliki/CQRS.html),” *martinfowler.com*, July 14, 2011. 1. Greg Young: “[CQRS Documents](https://cqrs.files.wordpress.com/2010/11/cqrs_documents.pdf),” *cqrs.files.wordpress.com*, November 2010. 1. Baron Schwartz: “[Immutability, MVCC, and Garbage Collection](http://www.xaprb.com/blog/2013/12/28/immutability-mvcc-and-garbage-collection/),” *xaprb.com*, December 28, 2013. 1. Daniel Eloff, Slava Akhmechet, Jay Kreps, et al.: ["Re: Turning the Database Inside-out with Apache Samza](https://news.ycombinator.com/item?id=9145197)," *Hacker News discussion, news.ycombinator.com*, March 4, 2015. 1. “[Datomic Development Resources: Excision](http://docs.datomic.com/excision.html),” Cognitect, Inc., *docs.datomic.com*. 1. “[Fossil Documentation: Deleting Content from Fossil](http://fossil-scm.org/index.html/doc/trunk/www/shunning.wiki),” *fossil-scm.org*, 2016. 1. Jay Kreps: “[The irony of distributed systems is that data loss is really easy but deleting data is surprisingly hard,](https://twitter.com/jaykreps/status/582580836425330688)” *twitter.com*, March 30, 2015. 1. David C. Luckham: “[What’s the Difference Between ESP and CEP?](http://www.complexevents.com/2006/08/01/what%E2%80%99s-the-difference-between-esp-and-cep/),” *complexevents.com*, August 1, 2006. 1. Srinath Perera: “[How Is Stream Processing and Complex Event Processing (CEP) Different?](https://www.quora.com/How-is-stream-processing-and-complex-event-processing-CEP-different),” *quora.com*, December 3, 2015. 1. Arvind Arasu, Shivnath Babu, and Jennifer Widom: “[The CQL Continuous Query Language: Semantic Foundations and Query Execution](http://research.microsoft.com/pubs/77607/cql.pdf),” *The VLDB Journal*, volume 15, number 2, pages 121–142, June 2006. [doi:10.1007/s00778-004-0147-z](http://dx.doi.org/10.1007/s00778-004-0147-z) 1. Julian Hyde: “[Data in Flight: How Streaming SQL Technology Can Help Solve the Web 2.0 Data Crunch](http://queue.acm.org/detail.cfm?id=1667562),” *ACM Queue*, volume 7, number 11, December 2009. [doi:10.1145/1661785.1667562](http://dx.doi.org/10.1145/1661785.1667562) 1. “[Esper Reference, Version 5.4.0](http://www.espertech.com/esper/release-5.4.0/esper-reference/html_single/index.html),” EsperTech, Inc., *espertech.com*, April 2016. 1. Zubair Nabi, Eric Bouillet, Andrew Bainbridge, and Chris Thomas: “[Of Streams and Storms](https://developer.ibm.com/streamsdev/wp-content/uploads/sites/15/2014/04/Streams-and-Storm-April-2014-Final.pdf),” IBM technical report, *developer.ibm.com*, April 2014. 1. Milinda Pathirage, Julian Hyde, Yi Pan, and Beth Plale: “[SamzaSQL: Scalable Fast Data Management with Streaming SQL](https://github.com/milinda/samzasql-hpbdc2016/blob/master/samzasql-hpbdc2016.pdf),” at *IEEE International Workshop on High-Performance Big Data Computing* (HPBDC), May 2016. [doi:10.1109/IPDPSW.2016.141](http://dx.doi.org/10.1109/IPDPSW.2016.141) 1. Philippe Flajolet, Éric Fusy, Olivier Gandouet, and Frédéric Meunier: “[HyperLo⁠g​Log: The Analysis of a Near-Optimal Cardinality Estimation Algorithm](http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf),” at *Conference on Analysis of Algorithms* (AofA), June 2007. 1. Jay Kreps: “[Questioning the Lambda Architecture](https://www.oreilly.com/ideas/questioning-the-lambda-architecture),” *oreilly.com*, July 2, 2014. 1. Ian Hellström: “[An Overview of Apache Streaming Technologies](https://databaseline.wordpress.com/2016/03/12/an-overview-of-apache-streaming-technologies/),” *databaseline.wordpress.com*, March 12, 2016. 1. Jay Kreps: “[Why Local State Is a Fundamental Primitive in Stream Processing](https://www.oreilly.com/ideas/why-local-state-is-a-fundamental-primitive-in-stream-processing),” *oreilly.com*, July 31, 2014. 1. Shay Banon: “[Percolator](https://www.elastic.co/blog/percolator),” *elastic.co*, February 8, 2011. 1. Alan Woodward and Martin Kleppmann: “[Real-Time Full-Text Search with Luwak and Samza](http://martin.kleppmann.com/2015/04/13/real-time-full-text-search-luwak-samza.html),” *martin.kleppmann.com*, April 13, 2015. 1. “[Apache Storm 1.0.1 Documentation](https://storm.apache.org/releases/1.0.1/index.html),” *storm.apache.org*, May 2016. 1. Tyler Akidau: “[The World Beyond Batch: Streaming 102](https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102),” *oreilly.com*, January 20, 2016. 1. Stephan Ewen: “[Streaming Analytics with Apache Flink](http://www.confluent.io/kafka-summit-2016-systems-advanced-streaming-analytics-with-apache-flink-and-apache-kafka),” at *Kafka Summit*, April 2016. 1. Tyler Akidau, Alex Balikov, Kaya Bekiroğlu, et al.: “[MillWheel: Fault-Tolerant Stream Processing at Internet Scale](http://research.google.com/pubs/pub41378.html),” at *39th International Conference on Very Large Data Bases* (VLDB), August 2013. 1. Alex Dean: “[Improving Snowplow's Understanding of Time](http://snowplowanalytics.com/blog/2015/09/15/improving-snowplows-understanding-of-time/),” *snowplowanalytics.com*, September 15, 2015. 1. “[Windowing (Azure Stream Analytics)](https://msdn.microsoft.com/en-us/library/azure/dn835019.aspx),” Microsoft Azure Reference, *msdn.microsoft.com*, April 2016. 1. “[State Management](http://samza.apache.org/learn/documentation/0.10/container/state-management.html),” Apache Samza 0.10 Documentation, *samza.apache.org*, December 2015. 1. Rajagopal Ananthanarayanan, Venkatesh Basker, Sumit Das, et al.: “[Photon: Fault-Tolerant and Scalable Joining of Continuous Data Streams](http://research.google.com/pubs/pub41318.html),” at *ACM International Conference on Management of Data* (SIGMOD), June 2013. [doi:10.1145/2463676.2465272](http://dx.doi.org/10.1145/2463676.2465272) 1. Martin Kleppmann: “[Samza Newsfeed Demo](https://github.com/ept/newsfeed),” *github.com*, September 2014. 1. Ben Kirwin: “[Doing the Impossible: Exactly-Once Messaging Patterns in Kafka](http://ben.kirw.in/2014/11/28/kafka-patterns/),” *ben.kirw.in*, November 28, 2014. 1. Pat Helland: “[Data on the Outside Versus Data on the Inside](http://cidrdb.org/cidr2005/papers/P12.pdf),” at *2nd Biennial Conference on Innovative Data Systems Research* (CIDR), January 2005. 1. Ralph Kimball and Margy Ross: *The Data Warehouse Toolkit: The Definitive Guide to Dimensional Modeling*, 3rd edition. John Wiley & Sons, 2013. ISBN: 978-1-118-53080-1 1. Viktor Klang: “[I'm coining the phrase 'effectively-once' for message processing with at-least-once + idempotent operations](https://twitter.com/viktorklang/status/789036133434978304),” *twitter.com*, October 20, 2016. 1. Matei Zaharia, Tathagata Das, Haoyuan Li, et al.: “[Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters](https://www.usenix.org/system/files/conference/hotcloud12/hotcloud12-final28.pdf),” at *4th USENIX Conference in Hot Topics in Cloud Computing* (HotCloud), June 2012. 1. Kostas Tzoumas, Stephan Ewen, and Robert Metzger: “[High-Throughput, Low-Latency, and Exactly-Once Stream Processing with Apache Flink](http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/),” *data-artisans.com*, August 5, 2015. 1. Paris Carbone, Gyula Fóra, Stephan Ewen, et al.: “[Lightweight Asynchronous Snapshots for Distributed Dataflows](http://arxiv.org/abs/1506.08603),” arXiv:1506.08603 [cs.DC], June 29, 2015. 1. Ryan Betts and John Hugg: *Fast Data: Smart and at Scale*. Report, O'Reilly Media, October 2015. 1. Flavio Junqueira: “[Making Sense of Exactly-Once Semantics](http://conferences.oreilly.com/strata/hadoop-big-data-eu/public/schedule/detail/49690),” at *Strata+Hadoop World London*, June 2016. 1. Jason Gustafson, Flavio Junqueira, Apurva Mehta, Sriram Subramanian, and Guozhang Wang: “[KIP-98 – Exactly Once Delivery and Transactional Messaging](https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging),” *cwiki.apache.org*, November 2016. 1. Pat Helland: “[Idempotence Is Not a Medical Condition](http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.401.1539&rep=rep1&type=pdf),” *Communications of the ACM*, volume 55, number 5, page 56, May 2012. [doi:10.1145/2160718.2160734](http://dx.doi.org/10.1145/2160718.2160734) 1. Jay Kreps: “[Re: Trying to Achieve Deterministic Behavior on Recovery/Rewind](http://mail-archives.apache.org/mod_mbox/samza-dev/201409.mbox/%3CCAOeJiJg%2Bc7Ei%3DgzCuOz30DD3G5Hm9yFY%3DUJ6SafdNUFbvRgorg%40mail.gmail.com%3E),” email to *samza-dev* mailing list, September 9, 2014. 1. E. N. (Mootaz) Elnozahy, Lorenzo Alvisi, Yi-Min Wang, and David B. Johnson: “[A Survey of Rollback-Recovery Protocols in Message-Passing Systems](http://www.cs.utexas.edu/~lorenzo/papers/SurveyFinal.pdf),” *ACM Computing Surveys*, volume 34, number 3, pages 375–408, September 2002. [doi:10.1145/568522.568525](http://dx.doi.org/10.1145/568522.568525) 1. Adam Warski: “[Kafka Streams – How Does It Fit the Stream Processing Landscape?](https://softwaremill.com/kafka-streams-how-does-it-fit-stream-landscape/),” *softwaremill.com*, June 1, 2016. ------ | 上一章 | 目錄 | 下一章 | | ------------------------- | ------------------------------- | ---------------------------------- | | [第十章:批處理](ch10.md) | [設計資料密集型應用](README.md) | [第十二章:資料系統的未來](ch12.md) |