diff --git a/kv/raftstore/peer_storage.go b/kv/raftstore/peer_storage.go index 234e9c59..87ff9c9c 100644 --- a/kv/raftstore/peer_storage.go +++ b/kv/raftstore/peer_storage.go @@ -244,34 +244,10 @@ func (ps *PeerStorage) validateSnap(snap *eraftpb.Snapshot) bool { return true } -// Append the given entries to the raft log using previous last index or self.last_index. -// Return the new last index for later update. After we commit in engine, we can set last_index -// to the return one. +// Append the given entries to the raft log and update ps.raftState also delete log entries that will +// never be committed func (ps *PeerStorage) Append(entries []eraftpb.Entry, raftWB *engine_util.WriteBatch) error { // Your Code Here (2B). - // TODO: Delete Start - log.Debugf("%s append %d entries", ps.Tag, len(entries)) - prevLastIndex := ps.raftState.LastIndex - if len(entries) == 0 { - return nil - } - lastEntry := entries[len(entries)-1] - lastIndex := lastEntry.Index - lastTerm := lastEntry.Term - for _, entry := range entries { - err := raftWB.SetMeta(meta.RaftLogKey(ps.region.Id, entry.Index), &entry) - if err != nil { - return err - } - } - // Delete any previously appended log entries which never committed. - for i := lastIndex + 1; i <= prevLastIndex; i++ { - raftWB.DeleteMeta(meta.RaftLogKey(ps.region.Id, i)) - } - ps.raftState.LastIndex = lastIndex - ps.raftState.LastTerm = lastTerm - return nil - // TODO: Delete End } func (ps *PeerStorage) clearMeta(kvWB, raftWB *engine_util.WriteBatch) error { @@ -290,6 +266,7 @@ func (ps *PeerStorage) clearExtraData(newRegion *metapb.Region) { } } +// ClearMeta delete stale metadata like raftState, applyState, regionState and raft log entries func ClearMeta(engines *engine_util.Engines, kvWB, raftWB *engine_util.WriteBatch, regionID uint64, lastIndex uint64) error { start := time.Now() kvWB.DeleteMeta(meta.RegionStateKey(regionID)) @@ -327,102 +304,18 @@ func ClearMeta(engines *engine_util.Engines, kvWB, raftWB *engine_util.WriteBatc return nil } -// Apply the peer with given snapshot. +// Apply the peer with given snapshot func (ps *PeerStorage) ApplySnapshot(snapshot *eraftpb.Snapshot, kvWB *engine_util.WriteBatch, raftWB *engine_util.WriteBatch) (*ApplySnapResult, error) { + // Hint: things need to do here including: update peer storage state like raftState and applyState, etc, + // and send RegionTaskApply task to region worker through ps.regionSched, also remenber call ps.clearMeta + // and ps.clearExtraData to delete stale data // Your Code Here (2B). - // TODO: Delete Start - log.Infof("%v begin to apply snapshot", ps.Tag) - - snapData := new(rspb.RaftSnapshotData) - if err := snapData.Unmarshal(snapshot.Data); err != nil { - return nil, err - } - - if snapData.Region.Id != ps.region.Id { - return nil, fmt.Errorf("mismatch region id %v != %v", snapData.Region.Id, ps.region.Id) - } - - if ps.isInitialized() { - // we can only delete the old data when the peer is initialized. - if err := ps.clearMeta(kvWB, raftWB); err != nil { - return nil, err - } - ps.clearExtraData(snapData.Region) - } - - ps.raftState.LastIndex = snapshot.Metadata.Index - ps.raftState.LastTerm = snapshot.Metadata.Term - - applyRes := &ApplySnapResult{ - PrevRegion: ps.region, - Region: snapData.Region, - } - ps.region = snapData.Region - ps.applyState = rspb.RaftApplyState{ - AppliedIndex: snap.Metadata.Index, - // The snapshot only contains log which index > applied index, so - // here the truncate state's (index, term) is in snapshot metadata. - TruncatedState: &rspb.RaftTruncatedState{ - Index: snapshot.Metadata.Index, - Term: snapshot.Metadata.Term, - }, - } - kvWB.SetMeta(meta.ApplyStateKey(ps.region.GetId()), &ps.applyState) - meta.WriteRegionState(kvWB, snapData.Region, rspb.PeerState_Normal) - ch := make(chan bool) - ps.snapState = snap.SnapState{ - StateType: snap.SnapState_Applying, - } - ps.regionSched <- &runner.RegionTaskApply{ - RegionId: ps.region.Id, - Notifier: ch, - SnapMeta: snapshot.Metadata, - StartKey: snapData.Region.GetStartKey(), - EndKey: snapData.Region.GetEndKey(), - } - // wait until apply finish - <-ch - - log.Debugf("%v apply snapshot for region %v with state %v ok", ps.Tag, snapData.Region, ps.applyState) - return applyRes, nil - // TODO: Delete End } /// Save memory states to disk. /// Do not modify ready in this function, this is a requirement to advance the ready object properly later. func (ps *PeerStorage) SaveReadyState(ready *raft.Ready) (*ApplySnapResult, error) { // Your Code Here (2B). - // TODO: Delete Start - kvWB, raftWB := new(engine_util.WriteBatch), new(engine_util.WriteBatch) - prevRaftState := ps.raftState - - var applyRes *ApplySnapResult = nil - var err error - if !raft.IsEmptySnap(&ready.Snapshot) { - applyRes, err = ps.ApplySnapshot(&ready.Snapshot, kvWB, raftWB) - if err != nil { - return nil, err - } - } - - if len(ready.Entries) != 0 { - if err := ps.Append(ready.Entries, raftWB); err != nil { - return nil, err - } - } - - if !raft.IsEmptyHardState(ready.HardState) { - ps.raftState.HardState = &ready.HardState - } - - if !proto.Equal(&prevRaftState, &ps.raftState) { - raftWB.SetMeta(meta.RaftStateKey(ps.region.GetId()), &ps.raftState) - } - - kvWB.MustWriteToDB(ps.Engines.Kv) - raftWB.MustWriteToDB(ps.Engines.Raft) - return applyRes, nil - // TODO: Delete End } func (ps *PeerStorage) ClearData() {