From b66286b0ddab8619a6fe994580cc2f73c980b105 Mon Sep 17 00:00:00 2001 From: foreverbell Date: Wed, 21 Mar 2018 15:10:16 +0800 Subject: [PATCH] RaftMerge: body part of multi-raft region merge protocol except rollback. (#10) --- RaftMerge/RaftMerge.tla | 239 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 223 insertions(+), 16 deletions(-) diff --git a/RaftMerge/RaftMerge.tla b/RaftMerge/RaftMerge.tla index 5e031be..930730a 100644 --- a/RaftMerge/RaftMerge.tla +++ b/RaftMerge/RaftMerge.tla @@ -1,5 +1,22 @@ ------------------------------ MODULE RaftMerge ------------------------------- +\* This is the formal specification for the multi-raft region merge algorithm +\* of TiKV. +\* +\* The whole data is divided into multiple shards called regions, and each +\* region is replicated to several stores comprising a Raft group. Two regions +\* can merge into a larger one if one region is reasonably small. +\* +\* This specification asserts two regions named A and B are replicated to the +\* same set of stores. Each region has leader on store LeaderA and LeaderB +\* respectively. +\* +\* Notice that TiKV uses a slightly different Raft model compared with Ongaro's +\* original Raft implementation. A log is truly committed if the log is applied +\* to the state machine, and then server will return the result to client. +\* commit_index is only a marker, log may be dropped even if commit_index goes +\* beyond that log. + EXTENDS Integers, FiniteSets, Sequences, TLC CONSTANTS Store, Region @@ -40,22 +57,44 @@ VARIABLES messages \* The data structures in C. MAXS = |Store|. \* +\* enum Log { LogNormal, LogPreMerge, LogMerge }; +\* +\* enum RegionState { RegionNormal, RegionTombStone, RegionMerging }; +\* \* struct Raft { \* bool is_leader; \* vector logs; \* int commit_index; \* int apply_index; +\* int num_applied; // number of applied normal logs \* int match_index[MAXS]; // leader only \* }; \* \* struct Store { -\* Raft raft[2]; // 2 for two regions +\* Raft raft[2]; // 2 for two regions +\* RegionState region[2]; // 2 for two regions \* } stores[MAXS]; \* -\* Note for ease of implementation, we use two 2-dimension arrays raft[MAXS][2]. +\* Note for ease of implementation, we use two 2-dimension arrays raft[MAXS][2] +\* and region[MAXS][2]. +\* +\* Also note that different from a real-world implementation, we don't +\* introduce the concept of `epoch` here, which is used to figure out whether +\* the configuration of one region has changed. Epoch matters when we are +\* applying the logs into state machine, if it is stale, we will skip all +\* later non-admin logs. Epoch will be changed when we are applying admin logs. -\* Log. -CONSTANTS Log +\* Log types. +\* The logs are divided into two categories, normal logs and admin logs. +\* Logs apart from LogNormal are admin logs. +CONSTANTS LogNormal, \* RegionB only + LogPreMerge, + LogMerge + +\* Region state types. +CONSTANTS RegionNormal, + RegionTombStone, + RegionMerging VARIABLES raft, region @@ -164,6 +203,7 @@ Receive(m) == \* Leader i of region r receives a client request to append a log. ClientRequest(i, r, log) == /\ raft[i][r].is_leader + /\ region[i][r] = RegionNormal /\ client_requests_index < MaxClientRequests /\ LET new_logs == Append(raft[i][r].logs, log) @@ -175,21 +215,137 @@ ClientRequest(i, r, log) == /\ UNCHANGED <> ------------------------------------------------------------------------------- +\* State transitions for Raft merge, and log applying. + +\* Internal requests for Raft merge. +\* Assume raft[i][r].is_leader, i.e., only leader can handle internal requests. +InternalRequest(i, r, log) == + LET + new_logs == Append(raft[i][r].logs, log) + new_match_index == [raft[i][r].match_index EXCEPT ![i] = @ + 1] + IN + [raft EXCEPT ![i][r].logs = new_logs, + ![i][r].match_index = new_match_index] + +\* Send merge request to the leader of Region B. +ProposeMergeRequest(i) == + /\ raft[i][RegionB].is_leader + /\ \* This request should be sent only once. + Len(SelectSeq(raft[i][RegionB].logs, LAMBDA log : log.type = LogPreMerge)) = 0 + /\ raft' = InternalRequest( + i, RegionB, + [type |-> LogPreMerge, + min_index |-> 1 + Min({raft[i][RegionB].match_index[j] : j \in Store})] + ) + /\ UNCHANGED <> \* Return TRUE if there is a log applicable to the state machine. +\* A log is applicable if it is committed, and the target region is not in +\* TombStone state. LogAppliable(i, r) == - raft[i][r].apply_index < raft[i][r].commit_index + /\ raft[i][r].apply_index < raft[i][r].commit_index + /\ region[i][r] /= RegionTombStone -\* Apply Raft logs to make apply_index catch up with commit_index. -\* This simply increases apply_index. -ApplyLog(i, r) == +\* Apply LogPreMerge. +ApplyPreMergeLog(i) == + LET + next_index == raft[i][RegionB].apply_index + 1 + IN + /\ LogAppliable(i, RegionB) + /\ raft[i][RegionB].logs[next_index].type = LogPreMerge + /\ IF raft[i][RegionA].is_leader + THEN + \* If this store is the leader of regionA, make a merge proposal, and + \* advance apply_index. + LET + min_index == raft[i][RegionB].logs[next_index].min_index + commit_index == next_index + fetch_logs == SubSeq(raft[i][RegionB].logs, min_index, commit_index) + IN + raft' = [InternalRequest( + i, RegionA, + [type |-> LogMerge, + min_index |-> min_index, + commit_index |-> commit_index, + entries |-> fetch_logs] + ) + EXCEPT ![i][RegionB].apply_index = next_index] + ELSE + \* Otherwise, only advance apply_index. + raft' = [raft EXCEPT ![i][RegionB].apply_index = next_index] + /\ region' = [region EXCEPT ![i][RegionB] = RegionMerging] + /\ UNCHANGED <> + +\* Apply LogMerge. +\* +\* This action is roughly divided into two sub-actions, and executed separately. +\* The first step copies the logs to region B, to ensure it in sync with leader +\* B. The second step waits until the copied logs in the first step are applied, +\* then advances apply_index and marks this region as tombstone. +ApplyMergeLogStep1(i) == + LET + next_index == raft[i][RegionA].apply_index + 1 + min_index == raft[i][RegionA].logs[next_index].min_index + commit_index == raft[i][RegionA].logs[next_index].commit_index + new_logs == + LET + old_logs == raft[i][RegionB].logs + entries == raft[i][RegionA].logs[next_index].entries + IN + IF commit_index <= Len(raft[i][RegionB].logs) + THEN old_logs + ELSE old_logs \o SubSeq(entries, Len(old_logs) - min_index + 2, Len(entries)) + IN + /\ raft' = [raft EXCEPT ![i][RegionB].logs = new_logs, + ![i][RegionB].commit_index = Max({@, commit_index})] + /\ UNCHANGED <> + +ApplyMergeLogStep2(i) == + LET + next_index == raft[i][RegionA].apply_index + 1 + commit_index == raft[i][RegionA].logs[next_index].commit_index + IN + /\ \* Lag logs have been applied. + raft[i][RegionB].apply_index >= commit_index + /\ raft' = [raft EXCEPT ![i][RegionA].apply_index = next_index] + /\ region' = [region EXCEPT ![i][RegionB] = RegionTombStone] + /\ UNCHANGED <> + +ApplyMergeLog(i) == + LET + next_index == raft[i][RegionA].apply_index + 1 + IN + /\ LogAppliable(i, RegionA) + /\ raft[i][RegionA].logs[next_index].type = LogMerge + /\ \/ ApplyMergeLogStep1(i) + \/ ApplyMergeLogStep2(i) + +\* Apply LogNormal. +\* This log simply increases apply_index. +ApplyNormalLog(i, r) == LET next_index == raft[i][r].apply_index + 1 IN /\ LogAppliable(i, r) - /\ raft' = [raft EXCEPT ![i][r].apply_index = next_index] + /\ raft[i][r].logs[next_index].type = LogNormal + /\ LET + \* Apply this log if this region is in normal state, otherwise skip it. + \* Notice we don't check for epoch here as what is done in the real + \* world implementation, but these two approaches are equivalent to + \* check whether we have applied PreMergeLog, as applying PreMergeLog + \* will also convert the region state from normal state. + num_applied_delta == IF region[i][r] = RegionNormal THEN 1 ELSE 0 + IN + raft' = [raft EXCEPT ![i][r].apply_index = next_index, + ![i][r].num_applied = @ + num_applied_delta] /\ UNCHANGED <> +\* Apply Raft logs to make apply_index catch up with commit_index. +ApplyLog(i) == + \/ \E r \in Region : ApplyNormalLog(i, r) + \/ ApplyPreMergeLog(i) + \/ ApplyMergeLog(i) + ------------------------------------------------------------------------------- \* Specification of Raft merge. @@ -198,11 +354,14 @@ Next == \/ \E i, j \in Store : \E r \in Region : AppendEntries(i, j, r) \/ \E i \in Store : \E r \in Region : AdvanceCommitIndex(i, r) \/ \E m \in messages : Receive(m) - \/ \E i \in Store : \E r \in Region : ApplyLog(i, r) - \* External client can send requests to region leader. - \/ \E i \in Store : \E r \in Region : - ClientRequest(i, r, Log) + \* External client can send requests to region B leader, to add a new log + \* entry in region B. + \/ \E i \in Store : ClientRequest(i, RegionB, [type |-> LogNormal]) + + \* Raft merge actions. + \/ ProposeMergeRequest(LeaderB) + \/ \E i \in Store : ApplyLog(i) Init == /\ messages = {} @@ -215,6 +374,7 @@ Init == logs |-> << >>, commit_index |-> 0, apply_index |-> 0, + num_applied |-> 0, match_index |-> [j \in Store |-> 0] ] ] @@ -223,7 +383,7 @@ Init == raft = MarkLeader(MarkLeader(no_leader_raft, LeaderA, RegionA), LeaderB, RegionB) - /\ region = TRUE + /\ region = [i \in Store |-> [r \in Region |-> RegionNormal]] /\ client_requests_index = 0 Spec == @@ -233,19 +393,33 @@ Spec == \* Type invariants. LogType == - {Log} + LET + FlatLogType == + [type : {LogNormal}] + \cup [type : {LogPreMerge}, min_index : Nat] + IN + FlatLogType + \cup [type : {LogMerge}, + min_index : Nat, + commit_index : Nat, + entries : Seq(FlatLogType)] RaftType == [ is_leader : BOOLEAN, logs : Seq(LogType), commit_index : Nat, apply_index : Nat, + num_applied : Nat, match_index : [Store -> Nat] \* Only available on leader. \* Initialized to zeroes on followers. ] +RegionType == + { RegionNormal, RegionTombStone, RegionMerging } + TypeInvariant == - /\ raft \in [Store -> [Region -> RaftType]] + /\ raft \in [Store -> [Region -> RaftType]] + /\ region \in [Store -> [Region -> RegionType]] ------------------------------------------------------------------------------- \* Some invariants for our simplified Raft model. @@ -294,4 +468,37 @@ SimpliedRaftInvariant == /\ LogInvariant /\ ApplyIndexInvariant +------------------------------------------------------------------------------- +\* Some invariants for Raft region merge. + +\* If a region on two different stores have applied the same logs, they should +\* also share the same region state. +RegionApplyInvariant == + \A i, j \in Store : + ( + /\ i /= j + /\ (\A r \in Region : raft[i][r].apply_index = raft[j][r].apply_index) + ) => + \A r \in Region : region[i][r] = region[j][r] + +\* For any two stores of region B, if both done, they should have the same +\* number of applied logs. +MergeLogInvariant == + \A i, j \in Store : + ( + /\ i /= j + /\ region[i][RegionB] = RegionTombStone + /\ region[j][RegionB] = RegionTombStone + ) => + LET + applied_i == raft[i][RegionB].num_applied + applied_j == raft[j][RegionB].num_applied + IN + applied_i = applied_j + +\* Combination of the above invariants. +RaftMergeInvariant == + /\ RegionApplyInvariant + /\ MergeLogInvariant + ===============================================================================