mirror of
https://github.com/pingcap/tla-plus.git
synced 2024-12-27 04:50:15 +08:00
RaftMerge: body part of multi-raft region merge protocol except rollback. (#10)
This commit is contained in:
parent
44b8611bc2
commit
b66286b0dd
@ -1,5 +1,22 @@
|
||||
------------------------------ MODULE RaftMerge -------------------------------
|
||||
|
||||
\* This is the formal specification for the multi-raft region merge algorithm
|
||||
\* of TiKV.
|
||||
\*
|
||||
\* The whole data is divided into multiple shards called regions, and each
|
||||
\* region is replicated to several stores comprising a Raft group. Two regions
|
||||
\* can merge into a larger one if one region is reasonably small.
|
||||
\*
|
||||
\* This specification asserts two regions named A and B are replicated to the
|
||||
\* same set of stores. Each region has leader on store LeaderA and LeaderB
|
||||
\* respectively.
|
||||
\*
|
||||
\* Notice that TiKV uses a slightly different Raft model compared with Ongaro's
|
||||
\* original Raft implementation. A log is truly committed if the log is applied
|
||||
\* to the state machine, and then server will return the result to client.
|
||||
\* commit_index is only a marker, log may be dropped even if commit_index goes
|
||||
\* beyond that log.
|
||||
|
||||
EXTENDS Integers, FiniteSets, Sequences, TLC
|
||||
|
||||
CONSTANTS Store, Region
|
||||
@ -40,22 +57,44 @@ VARIABLES messages
|
||||
|
||||
\* The data structures in C. MAXS = |Store|.
|
||||
\*
|
||||
\* enum Log { LogNormal, LogPreMerge, LogMerge };
|
||||
\*
|
||||
\* enum RegionState { RegionNormal, RegionTombStone, RegionMerging };
|
||||
\*
|
||||
\* struct Raft {
|
||||
\* bool is_leader;
|
||||
\* vector<Log> logs;
|
||||
\* int commit_index;
|
||||
\* int apply_index;
|
||||
\* int num_applied; // number of applied normal logs
|
||||
\* int match_index[MAXS]; // leader only
|
||||
\* };
|
||||
\*
|
||||
\* struct Store {
|
||||
\* Raft raft[2]; // 2 for two regions
|
||||
\* Raft raft[2]; // 2 for two regions
|
||||
\* RegionState region[2]; // 2 for two regions
|
||||
\* } stores[MAXS];
|
||||
\*
|
||||
\* Note for ease of implementation, we use two 2-dimension arrays raft[MAXS][2].
|
||||
\* Note for ease of implementation, we use two 2-dimension arrays raft[MAXS][2]
|
||||
\* and region[MAXS][2].
|
||||
\*
|
||||
\* Also note that different from a real-world implementation, we don't
|
||||
\* introduce the concept of `epoch` here, which is used to figure out whether
|
||||
\* the configuration of one region has changed. Epoch matters when we are
|
||||
\* applying the logs into state machine, if it is stale, we will skip all
|
||||
\* later non-admin logs. Epoch will be changed when we are applying admin logs.
|
||||
|
||||
\* Log.
|
||||
CONSTANTS Log
|
||||
\* Log types.
|
||||
\* The logs are divided into two categories, normal logs and admin logs.
|
||||
\* Logs apart from LogNormal are admin logs.
|
||||
CONSTANTS LogNormal, \* RegionB only
|
||||
LogPreMerge,
|
||||
LogMerge
|
||||
|
||||
\* Region state types.
|
||||
CONSTANTS RegionNormal,
|
||||
RegionTombStone,
|
||||
RegionMerging
|
||||
|
||||
VARIABLES raft, region
|
||||
|
||||
@ -164,6 +203,7 @@ Receive(m) ==
|
||||
\* Leader i of region r receives a client request to append a log.
|
||||
ClientRequest(i, r, log) ==
|
||||
/\ raft[i][r].is_leader
|
||||
/\ region[i][r] = RegionNormal
|
||||
/\ client_requests_index < MaxClientRequests
|
||||
/\ LET
|
||||
new_logs == Append(raft[i][r].logs, log)
|
||||
@ -175,21 +215,137 @@ ClientRequest(i, r, log) ==
|
||||
/\ UNCHANGED <<messages, region>>
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
\* State transitions for Raft merge, and log applying.
|
||||
|
||||
\* Internal requests for Raft merge.
|
||||
\* Assume raft[i][r].is_leader, i.e., only leader can handle internal requests.
|
||||
InternalRequest(i, r, log) ==
|
||||
LET
|
||||
new_logs == Append(raft[i][r].logs, log)
|
||||
new_match_index == [raft[i][r].match_index EXCEPT ![i] = @ + 1]
|
||||
IN
|
||||
[raft EXCEPT ![i][r].logs = new_logs,
|
||||
![i][r].match_index = new_match_index]
|
||||
|
||||
\* Send merge request to the leader of Region B.
|
||||
ProposeMergeRequest(i) ==
|
||||
/\ raft[i][RegionB].is_leader
|
||||
/\ \* This request should be sent only once.
|
||||
Len(SelectSeq(raft[i][RegionB].logs, LAMBDA log : log.type = LogPreMerge)) = 0
|
||||
/\ raft' = InternalRequest(
|
||||
i, RegionB,
|
||||
[type |-> LogPreMerge,
|
||||
min_index |-> 1 + Min({raft[i][RegionB].match_index[j] : j \in Store})]
|
||||
)
|
||||
/\ UNCHANGED <<messages, region, client_vars>>
|
||||
|
||||
\* Return TRUE if there is a log applicable to the state machine.
|
||||
\* A log is applicable if it is committed, and the target region is not in
|
||||
\* TombStone state.
|
||||
LogAppliable(i, r) ==
|
||||
raft[i][r].apply_index < raft[i][r].commit_index
|
||||
/\ raft[i][r].apply_index < raft[i][r].commit_index
|
||||
/\ region[i][r] /= RegionTombStone
|
||||
|
||||
\* Apply Raft logs to make apply_index catch up with commit_index.
|
||||
\* This simply increases apply_index.
|
||||
ApplyLog(i, r) ==
|
||||
\* Apply LogPreMerge.
|
||||
ApplyPreMergeLog(i) ==
|
||||
LET
|
||||
next_index == raft[i][RegionB].apply_index + 1
|
||||
IN
|
||||
/\ LogAppliable(i, RegionB)
|
||||
/\ raft[i][RegionB].logs[next_index].type = LogPreMerge
|
||||
/\ IF raft[i][RegionA].is_leader
|
||||
THEN
|
||||
\* If this store is the leader of regionA, make a merge proposal, and
|
||||
\* advance apply_index.
|
||||
LET
|
||||
min_index == raft[i][RegionB].logs[next_index].min_index
|
||||
commit_index == next_index
|
||||
fetch_logs == SubSeq(raft[i][RegionB].logs, min_index, commit_index)
|
||||
IN
|
||||
raft' = [InternalRequest(
|
||||
i, RegionA,
|
||||
[type |-> LogMerge,
|
||||
min_index |-> min_index,
|
||||
commit_index |-> commit_index,
|
||||
entries |-> fetch_logs]
|
||||
)
|
||||
EXCEPT ![i][RegionB].apply_index = next_index]
|
||||
ELSE
|
||||
\* Otherwise, only advance apply_index.
|
||||
raft' = [raft EXCEPT ![i][RegionB].apply_index = next_index]
|
||||
/\ region' = [region EXCEPT ![i][RegionB] = RegionMerging]
|
||||
/\ UNCHANGED <<messages, client_vars>>
|
||||
|
||||
\* Apply LogMerge.
|
||||
\*
|
||||
\* This action is roughly divided into two sub-actions, and executed separately.
|
||||
\* The first step copies the logs to region B, to ensure it in sync with leader
|
||||
\* B. The second step waits until the copied logs in the first step are applied,
|
||||
\* then advances apply_index and marks this region as tombstone.
|
||||
ApplyMergeLogStep1(i) ==
|
||||
LET
|
||||
next_index == raft[i][RegionA].apply_index + 1
|
||||
min_index == raft[i][RegionA].logs[next_index].min_index
|
||||
commit_index == raft[i][RegionA].logs[next_index].commit_index
|
||||
new_logs ==
|
||||
LET
|
||||
old_logs == raft[i][RegionB].logs
|
||||
entries == raft[i][RegionA].logs[next_index].entries
|
||||
IN
|
||||
IF commit_index <= Len(raft[i][RegionB].logs)
|
||||
THEN old_logs
|
||||
ELSE old_logs \o SubSeq(entries, Len(old_logs) - min_index + 2, Len(entries))
|
||||
IN
|
||||
/\ raft' = [raft EXCEPT ![i][RegionB].logs = new_logs,
|
||||
![i][RegionB].commit_index = Max({@, commit_index})]
|
||||
/\ UNCHANGED <<messages, region, client_vars>>
|
||||
|
||||
ApplyMergeLogStep2(i) ==
|
||||
LET
|
||||
next_index == raft[i][RegionA].apply_index + 1
|
||||
commit_index == raft[i][RegionA].logs[next_index].commit_index
|
||||
IN
|
||||
/\ \* Lag logs have been applied.
|
||||
raft[i][RegionB].apply_index >= commit_index
|
||||
/\ raft' = [raft EXCEPT ![i][RegionA].apply_index = next_index]
|
||||
/\ region' = [region EXCEPT ![i][RegionB] = RegionTombStone]
|
||||
/\ UNCHANGED <<messages, client_vars>>
|
||||
|
||||
ApplyMergeLog(i) ==
|
||||
LET
|
||||
next_index == raft[i][RegionA].apply_index + 1
|
||||
IN
|
||||
/\ LogAppliable(i, RegionA)
|
||||
/\ raft[i][RegionA].logs[next_index].type = LogMerge
|
||||
/\ \/ ApplyMergeLogStep1(i)
|
||||
\/ ApplyMergeLogStep2(i)
|
||||
|
||||
\* Apply LogNormal.
|
||||
\* This log simply increases apply_index.
|
||||
ApplyNormalLog(i, r) ==
|
||||
LET
|
||||
next_index == raft[i][r].apply_index + 1
|
||||
IN
|
||||
/\ LogAppliable(i, r)
|
||||
/\ raft' = [raft EXCEPT ![i][r].apply_index = next_index]
|
||||
/\ raft[i][r].logs[next_index].type = LogNormal
|
||||
/\ LET
|
||||
\* Apply this log if this region is in normal state, otherwise skip it.
|
||||
\* Notice we don't check for epoch here as what is done in the real
|
||||
\* world implementation, but these two approaches are equivalent to
|
||||
\* check whether we have applied PreMergeLog, as applying PreMergeLog
|
||||
\* will also convert the region state from normal state.
|
||||
num_applied_delta == IF region[i][r] = RegionNormal THEN 1 ELSE 0
|
||||
IN
|
||||
raft' = [raft EXCEPT ![i][r].apply_index = next_index,
|
||||
![i][r].num_applied = @ + num_applied_delta]
|
||||
/\ UNCHANGED <<messages, region, client_vars>>
|
||||
|
||||
\* Apply Raft logs to make apply_index catch up with commit_index.
|
||||
ApplyLog(i) ==
|
||||
\/ \E r \in Region : ApplyNormalLog(i, r)
|
||||
\/ ApplyPreMergeLog(i)
|
||||
\/ ApplyMergeLog(i)
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
|
||||
\* Specification of Raft merge.
|
||||
@ -198,11 +354,14 @@ Next ==
|
||||
\/ \E i, j \in Store : \E r \in Region : AppendEntries(i, j, r)
|
||||
\/ \E i \in Store : \E r \in Region : AdvanceCommitIndex(i, r)
|
||||
\/ \E m \in messages : Receive(m)
|
||||
\/ \E i \in Store : \E r \in Region : ApplyLog(i, r)
|
||||
|
||||
\* External client can send requests to region leader.
|
||||
\/ \E i \in Store : \E r \in Region :
|
||||
ClientRequest(i, r, Log)
|
||||
\* External client can send requests to region B leader, to add a new log
|
||||
\* entry in region B.
|
||||
\/ \E i \in Store : ClientRequest(i, RegionB, [type |-> LogNormal])
|
||||
|
||||
\* Raft merge actions.
|
||||
\/ ProposeMergeRequest(LeaderB)
|
||||
\/ \E i \in Store : ApplyLog(i)
|
||||
|
||||
Init ==
|
||||
/\ messages = {}
|
||||
@ -215,6 +374,7 @@ Init ==
|
||||
logs |-> << >>,
|
||||
commit_index |-> 0,
|
||||
apply_index |-> 0,
|
||||
num_applied |-> 0,
|
||||
match_index |-> [j \in Store |-> 0]
|
||||
]
|
||||
]
|
||||
@ -223,7 +383,7 @@ Init ==
|
||||
raft = MarkLeader(MarkLeader(no_leader_raft, LeaderA, RegionA),
|
||||
LeaderB,
|
||||
RegionB)
|
||||
/\ region = TRUE
|
||||
/\ region = [i \in Store |-> [r \in Region |-> RegionNormal]]
|
||||
/\ client_requests_index = 0
|
||||
|
||||
Spec ==
|
||||
@ -233,19 +393,33 @@ Spec ==
|
||||
\* Type invariants.
|
||||
|
||||
LogType ==
|
||||
{Log}
|
||||
LET
|
||||
FlatLogType ==
|
||||
[type : {LogNormal}]
|
||||
\cup [type : {LogPreMerge}, min_index : Nat]
|
||||
IN
|
||||
FlatLogType
|
||||
\cup [type : {LogMerge},
|
||||
min_index : Nat,
|
||||
commit_index : Nat,
|
||||
entries : Seq(FlatLogType)]
|
||||
|
||||
RaftType ==
|
||||
[ is_leader : BOOLEAN,
|
||||
logs : Seq(LogType),
|
||||
commit_index : Nat,
|
||||
apply_index : Nat,
|
||||
num_applied : Nat,
|
||||
match_index : [Store -> Nat] \* Only available on leader.
|
||||
\* Initialized to zeroes on followers.
|
||||
]
|
||||
|
||||
RegionType ==
|
||||
{ RegionNormal, RegionTombStone, RegionMerging }
|
||||
|
||||
TypeInvariant ==
|
||||
/\ raft \in [Store -> [Region -> RaftType]]
|
||||
/\ raft \in [Store -> [Region -> RaftType]]
|
||||
/\ region \in [Store -> [Region -> RegionType]]
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
\* Some invariants for our simplified Raft model.
|
||||
@ -294,4 +468,37 @@ SimpliedRaftInvariant ==
|
||||
/\ LogInvariant
|
||||
/\ ApplyIndexInvariant
|
||||
|
||||
-------------------------------------------------------------------------------
|
||||
\* Some invariants for Raft region merge.
|
||||
|
||||
\* If a region on two different stores have applied the same logs, they should
|
||||
\* also share the same region state.
|
||||
RegionApplyInvariant ==
|
||||
\A i, j \in Store :
|
||||
(
|
||||
/\ i /= j
|
||||
/\ (\A r \in Region : raft[i][r].apply_index = raft[j][r].apply_index)
|
||||
) =>
|
||||
\A r \in Region : region[i][r] = region[j][r]
|
||||
|
||||
\* For any two stores of region B, if both done, they should have the same
|
||||
\* number of applied logs.
|
||||
MergeLogInvariant ==
|
||||
\A i, j \in Store :
|
||||
(
|
||||
/\ i /= j
|
||||
/\ region[i][RegionB] = RegionTombStone
|
||||
/\ region[j][RegionB] = RegionTombStone
|
||||
) =>
|
||||
LET
|
||||
applied_i == raft[i][RegionB].num_applied
|
||||
applied_j == raft[j][RegionB].num_applied
|
||||
IN
|
||||
applied_i = applied_j
|
||||
|
||||
\* Combination of the above invariants.
|
||||
RaftMergeInvariant ==
|
||||
/\ RegionApplyInvariant
|
||||
/\ MergeLogInvariant
|
||||
|
||||
===============================================================================
|
||||
|
Loading…
Reference in New Issue
Block a user