mirror of
https://github.com/talent-plan/tinykv.git
synced 2024-12-27 13:20:24 +08:00
add comment for peer storage
Signed-off-by: linning <linningde25@gmail.com>
This commit is contained in:
parent
f24d6eb488
commit
14cf2d23bf
@ -244,34 +244,10 @@ func (ps *PeerStorage) validateSnap(snap *eraftpb.Snapshot) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Append the given entries to the raft log using previous last index or self.last_index.
|
// Append the given entries to the raft log and update ps.raftState also delete log entries that will
|
||||||
// Return the new last index for later update. After we commit in engine, we can set last_index
|
// never be committed
|
||||||
// to the return one.
|
|
||||||
func (ps *PeerStorage) Append(entries []eraftpb.Entry, raftWB *engine_util.WriteBatch) error {
|
func (ps *PeerStorage) Append(entries []eraftpb.Entry, raftWB *engine_util.WriteBatch) error {
|
||||||
// Your Code Here (2B).
|
// Your Code Here (2B).
|
||||||
// TODO: Delete Start
|
|
||||||
log.Debugf("%s append %d entries", ps.Tag, len(entries))
|
|
||||||
prevLastIndex := ps.raftState.LastIndex
|
|
||||||
if len(entries) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
lastEntry := entries[len(entries)-1]
|
|
||||||
lastIndex := lastEntry.Index
|
|
||||||
lastTerm := lastEntry.Term
|
|
||||||
for _, entry := range entries {
|
|
||||||
err := raftWB.SetMeta(meta.RaftLogKey(ps.region.Id, entry.Index), &entry)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Delete any previously appended log entries which never committed.
|
|
||||||
for i := lastIndex + 1; i <= prevLastIndex; i++ {
|
|
||||||
raftWB.DeleteMeta(meta.RaftLogKey(ps.region.Id, i))
|
|
||||||
}
|
|
||||||
ps.raftState.LastIndex = lastIndex
|
|
||||||
ps.raftState.LastTerm = lastTerm
|
|
||||||
return nil
|
|
||||||
// TODO: Delete End
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *PeerStorage) clearMeta(kvWB, raftWB *engine_util.WriteBatch) error {
|
func (ps *PeerStorage) clearMeta(kvWB, raftWB *engine_util.WriteBatch) error {
|
||||||
@ -290,6 +266,7 @@ func (ps *PeerStorage) clearExtraData(newRegion *metapb.Region) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ClearMeta delete stale metadata like raftState, applyState, regionState and raft log entries
|
||||||
func ClearMeta(engines *engine_util.Engines, kvWB, raftWB *engine_util.WriteBatch, regionID uint64, lastIndex uint64) error {
|
func ClearMeta(engines *engine_util.Engines, kvWB, raftWB *engine_util.WriteBatch, regionID uint64, lastIndex uint64) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
kvWB.DeleteMeta(meta.RegionStateKey(regionID))
|
kvWB.DeleteMeta(meta.RegionStateKey(regionID))
|
||||||
@ -327,102 +304,18 @@ func ClearMeta(engines *engine_util.Engines, kvWB, raftWB *engine_util.WriteBatc
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply the peer with given snapshot.
|
// Apply the peer with given snapshot
|
||||||
func (ps *PeerStorage) ApplySnapshot(snapshot *eraftpb.Snapshot, kvWB *engine_util.WriteBatch, raftWB *engine_util.WriteBatch) (*ApplySnapResult, error) {
|
func (ps *PeerStorage) ApplySnapshot(snapshot *eraftpb.Snapshot, kvWB *engine_util.WriteBatch, raftWB *engine_util.WriteBatch) (*ApplySnapResult, error) {
|
||||||
|
// Hint: things need to do here including: update peer storage state like raftState and applyState, etc,
|
||||||
|
// and send RegionTaskApply task to region worker through ps.regionSched, also remenber call ps.clearMeta
|
||||||
|
// and ps.clearExtraData to delete stale data
|
||||||
// Your Code Here (2B).
|
// Your Code Here (2B).
|
||||||
// TODO: Delete Start
|
|
||||||
log.Infof("%v begin to apply snapshot", ps.Tag)
|
|
||||||
|
|
||||||
snapData := new(rspb.RaftSnapshotData)
|
|
||||||
if err := snapData.Unmarshal(snapshot.Data); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if snapData.Region.Id != ps.region.Id {
|
|
||||||
return nil, fmt.Errorf("mismatch region id %v != %v", snapData.Region.Id, ps.region.Id)
|
|
||||||
}
|
|
||||||
|
|
||||||
if ps.isInitialized() {
|
|
||||||
// we can only delete the old data when the peer is initialized.
|
|
||||||
if err := ps.clearMeta(kvWB, raftWB); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
ps.clearExtraData(snapData.Region)
|
|
||||||
}
|
|
||||||
|
|
||||||
ps.raftState.LastIndex = snapshot.Metadata.Index
|
|
||||||
ps.raftState.LastTerm = snapshot.Metadata.Term
|
|
||||||
|
|
||||||
applyRes := &ApplySnapResult{
|
|
||||||
PrevRegion: ps.region,
|
|
||||||
Region: snapData.Region,
|
|
||||||
}
|
|
||||||
ps.region = snapData.Region
|
|
||||||
ps.applyState = rspb.RaftApplyState{
|
|
||||||
AppliedIndex: snap.Metadata.Index,
|
|
||||||
// The snapshot only contains log which index > applied index, so
|
|
||||||
// here the truncate state's (index, term) is in snapshot metadata.
|
|
||||||
TruncatedState: &rspb.RaftTruncatedState{
|
|
||||||
Index: snapshot.Metadata.Index,
|
|
||||||
Term: snapshot.Metadata.Term,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
kvWB.SetMeta(meta.ApplyStateKey(ps.region.GetId()), &ps.applyState)
|
|
||||||
meta.WriteRegionState(kvWB, snapData.Region, rspb.PeerState_Normal)
|
|
||||||
ch := make(chan bool)
|
|
||||||
ps.snapState = snap.SnapState{
|
|
||||||
StateType: snap.SnapState_Applying,
|
|
||||||
}
|
|
||||||
ps.regionSched <- &runner.RegionTaskApply{
|
|
||||||
RegionId: ps.region.Id,
|
|
||||||
Notifier: ch,
|
|
||||||
SnapMeta: snapshot.Metadata,
|
|
||||||
StartKey: snapData.Region.GetStartKey(),
|
|
||||||
EndKey: snapData.Region.GetEndKey(),
|
|
||||||
}
|
|
||||||
// wait until apply finish
|
|
||||||
<-ch
|
|
||||||
|
|
||||||
log.Debugf("%v apply snapshot for region %v with state %v ok", ps.Tag, snapData.Region, ps.applyState)
|
|
||||||
return applyRes, nil
|
|
||||||
// TODO: Delete End
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Save memory states to disk.
|
/// Save memory states to disk.
|
||||||
/// Do not modify ready in this function, this is a requirement to advance the ready object properly later.
|
/// Do not modify ready in this function, this is a requirement to advance the ready object properly later.
|
||||||
func (ps *PeerStorage) SaveReadyState(ready *raft.Ready) (*ApplySnapResult, error) {
|
func (ps *PeerStorage) SaveReadyState(ready *raft.Ready) (*ApplySnapResult, error) {
|
||||||
// Your Code Here (2B).
|
// Your Code Here (2B).
|
||||||
// TODO: Delete Start
|
|
||||||
kvWB, raftWB := new(engine_util.WriteBatch), new(engine_util.WriteBatch)
|
|
||||||
prevRaftState := ps.raftState
|
|
||||||
|
|
||||||
var applyRes *ApplySnapResult = nil
|
|
||||||
var err error
|
|
||||||
if !raft.IsEmptySnap(&ready.Snapshot) {
|
|
||||||
applyRes, err = ps.ApplySnapshot(&ready.Snapshot, kvWB, raftWB)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(ready.Entries) != 0 {
|
|
||||||
if err := ps.Append(ready.Entries, raftWB); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !raft.IsEmptyHardState(ready.HardState) {
|
|
||||||
ps.raftState.HardState = &ready.HardState
|
|
||||||
}
|
|
||||||
|
|
||||||
if !proto.Equal(&prevRaftState, &ps.raftState) {
|
|
||||||
raftWB.SetMeta(meta.RaftStateKey(ps.region.GetId()), &ps.raftState)
|
|
||||||
}
|
|
||||||
|
|
||||||
kvWB.MustWriteToDB(ps.Engines.Kv)
|
|
||||||
raftWB.MustWriteToDB(ps.Engines.Raft)
|
|
||||||
return applyRes, nil
|
|
||||||
// TODO: Delete End
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *PeerStorage) ClearData() {
|
func (ps *PeerStorage) ClearData() {
|
||||||
|
Loading…
Reference in New Issue
Block a user