talent-plan-tinykv/kv/raftstore/peer_msg_handler.go

574 lines
17 KiB
Go

package raftstore
import (
"fmt"
"time"
"github.com/Connor1996/badger/y"
"github.com/pingcap-incubator/tinykv/kv/raftstore/message"
"github.com/pingcap-incubator/tinykv/kv/raftstore/runner"
"github.com/pingcap-incubator/tinykv/kv/raftstore/snap"
"github.com/pingcap-incubator/tinykv/kv/raftstore/util"
"github.com/pingcap-incubator/tinykv/log"
"github.com/pingcap-incubator/tinykv/proto/pkg/metapb"
"github.com/pingcap-incubator/tinykv/proto/pkg/raft_cmdpb"
rspb "github.com/pingcap-incubator/tinykv/proto/pkg/raft_serverpb"
"github.com/pingcap-incubator/tinykv/scheduler/pkg/btree"
"github.com/pingcap/errors"
)
type PeerTick int
const (
PeerTickRaft PeerTick = 0
PeerTickRaftLogGC PeerTick = 1
PeerTickSplitRegionCheck PeerTick = 2
PeerTickSchedulerHeartbeat PeerTick = 3
)
type peerMsgHandler struct {
*peer
ctx *GlobalContext
}
func newPeerMsgHandler(peer *peer, ctx *GlobalContext) *peerMsgHandler {
return &peerMsgHandler{
peer: peer,
ctx: ctx,
}
}
func (d *peerMsgHandler) HandleRaftReady() {
if d.stopped {
return
}
// Your Code Here (2B).
}
func (d *peerMsgHandler) HandleMsg(msg message.Msg) {
switch msg.Type {
case message.MsgTypeRaftMessage:
raftMsg := msg.Data.(*rspb.RaftMessage)
if err := d.onRaftMsg(raftMsg); err != nil {
log.Errorf("%s handle raft message error %v", d.Tag, err)
}
case message.MsgTypeRaftCmd:
raftCMD := msg.Data.(*message.MsgRaftCmd)
d.proposeRaftCommand(raftCMD.Request, raftCMD.Callback)
case message.MsgTypeTick:
d.onTick()
case message.MsgTypeSplitRegion:
split := msg.Data.(*message.MsgSplitRegion)
log.Infof("%s on split with %v", d.Tag, split.SplitKey)
d.onPrepareSplitRegion(split.RegionEpoch, split.SplitKey, split.Callback)
case message.MsgTypeRegionApproximateSize:
d.onApproximateRegionSize(msg.Data.(uint64))
case message.MsgTypeGcSnap:
gcSnap := msg.Data.(*message.MsgGCSnap)
d.onGCSnap(gcSnap.Snaps)
case message.MsgTypeStart:
d.startTicker()
}
}
func (d *peerMsgHandler) preProposeRaftCommand(req *raft_cmdpb.RaftCmdRequest) error {
// Check store_id, make sure that the msg is dispatched to the right place.
if err := util.CheckStoreID(req, d.storeID()); err != nil {
return err
}
// Check whether the store has the right peer to handle the request.
regionID := d.regionId
leaderID := d.LeaderId()
if !d.IsLeader() {
leader := d.getPeerFromCache(leaderID)
return &util.ErrNotLeader{RegionId: regionID, Leader: leader}
}
// peer_id must be the same as peer's.
if err := util.CheckPeerID(req, d.PeerId()); err != nil {
return err
}
// Check whether the term is stale.
if err := util.CheckTerm(req, d.Term()); err != nil {
return err
}
err := util.CheckRegionEpoch(req, d.Region(), true)
if errEpochNotMatching, ok := err.(*util.ErrEpochNotMatch); ok {
// Attach the region which might be split from the current region. But it doesn't
// matter if the region is not split from the current region. If the region meta
// received by the TiKV driver is newer than the meta cached in the driver, the meta is
// updated.
siblingRegion := d.findSiblingRegion()
if siblingRegion != nil {
errEpochNotMatching.Regions = append(errEpochNotMatching.Regions, siblingRegion)
}
return errEpochNotMatching
}
return err
}
func (d *peerMsgHandler) proposeRaftCommand(msg *raft_cmdpb.RaftCmdRequest, cb *message.Callback) {
err := d.preProposeRaftCommand(msg)
if err != nil {
cb.Done(ErrResp(err))
return
}
// Your Code Here (2B).
}
func (d *peerMsgHandler) onTick() {
if d.stopped {
return
}
d.ticker.tickClock()
if d.ticker.isOnTick(PeerTickRaft) {
d.onRaftBaseTick()
}
if d.ticker.isOnTick(PeerTickRaftLogGC) {
d.onRaftGCLogTick()
}
if d.ticker.isOnTick(PeerTickSchedulerHeartbeat) {
d.onSchedulerHeartbeatTick()
}
if d.ticker.isOnTick(PeerTickSplitRegionCheck) {
d.onSplitRegionCheckTick()
}
d.ctx.tickDriverSender <- d.regionId
}
func (d *peerMsgHandler) startTicker() {
d.ticker = newTicker(d.regionId, d.ctx.cfg)
d.ctx.tickDriverSender <- d.regionId
d.ticker.schedule(PeerTickRaft)
d.ticker.schedule(PeerTickRaftLogGC)
d.ticker.schedule(PeerTickSplitRegionCheck)
d.ticker.schedule(PeerTickSchedulerHeartbeat)
}
func (d *peerMsgHandler) onRaftBaseTick() {
d.RaftGroup.Tick()
d.ticker.schedule(PeerTickRaft)
}
func (d *peerMsgHandler) ScheduleCompactLog(truncatedIndex uint64) {
raftLogGCTask := &runner.RaftLogGCTask{
RaftEngine: d.ctx.engine.Raft,
RegionID: d.regionId,
StartIdx: d.LastCompactedIdx,
EndIdx: truncatedIndex + 1,
}
d.LastCompactedIdx = raftLogGCTask.EndIdx
d.ctx.raftLogGCTaskSender <- raftLogGCTask
}
func (d *peerMsgHandler) onRaftMsg(msg *rspb.RaftMessage) error {
log.Debugf("%s handle raft message %s from %d to %d",
d.Tag, msg.GetMessage().GetMsgType(), msg.GetFromPeer().GetId(), msg.GetToPeer().GetId())
if !d.validateRaftMessage(msg) {
return nil
}
if d.stopped {
return nil
}
if msg.GetIsTombstone() {
// we receive a message tells us to remove self.
d.handleGCPeerMsg(msg)
return nil
}
if d.checkMessage(msg) {
return nil
}
key, err := d.checkSnapshot(msg)
if err != nil {
return err
}
if key != nil {
// If the snapshot file is not used again, then it's OK to
// delete them here. If the snapshot file will be reused when
// receiving, then it will fail to pass the check again, so
// missing snapshot files should not be noticed.
s, err1 := d.ctx.snapMgr.GetSnapshotForApplying(*key)
if err1 != nil {
return err1
}
d.ctx.snapMgr.DeleteSnapshot(*key, s, false)
return nil
}
d.insertPeerCache(msg.GetFromPeer())
err = d.RaftGroup.Step(*msg.GetMessage())
if err != nil {
return err
}
if d.AnyNewPeerCatchUp(msg.FromPeer.Id) {
d.HeartbeatScheduler(d.ctx.schedulerTaskSender)
}
return nil
}
// return false means the message is invalid, and can be ignored.
func (d *peerMsgHandler) validateRaftMessage(msg *rspb.RaftMessage) bool {
regionID := msg.GetRegionId()
from := msg.GetFromPeer()
to := msg.GetToPeer()
log.Debugf("[region %d] handle raft message %s from %d to %d", regionID, msg, from.GetId(), to.GetId())
if to.GetStoreId() != d.storeID() {
log.Warnf("[region %d] store not match, to store id %d, mine %d, ignore it",
regionID, to.GetStoreId(), d.storeID())
return false
}
if msg.RegionEpoch == nil {
log.Errorf("[region %d] missing epoch in raft message, ignore it", regionID)
return false
}
return true
}
/// Checks if the message is sent to the correct peer.
///
/// Returns true means that the message can be dropped silently.
func (d *peerMsgHandler) checkMessage(msg *rspb.RaftMessage) bool {
fromEpoch := msg.GetRegionEpoch()
isVoteMsg := util.IsVoteMessage(msg.Message)
fromStoreID := msg.FromPeer.GetStoreId()
// Let's consider following cases with three nodes [1, 2, 3] and 1 is leader:
// a. 1 removes 2, 2 may still send MsgAppendResponse to 1.
// We should ignore this stale message and let 2 remove itself after
// applying the ConfChange log.
// b. 2 is isolated, 1 removes 2. When 2 rejoins the cluster, 2 will
// send stale MsgRequestVote to 1 and 3, at this time, we should tell 2 to gc itself.
// c. 2 is isolated but can communicate with 3. 1 removes 3.
// 2 will send stale MsgRequestVote to 3, 3 should ignore this message.
// d. 2 is isolated but can communicate with 3. 1 removes 2, then adds 4, remove 3.
// 2 will send stale MsgRequestVote to 3, 3 should tell 2 to gc itself.
// e. 2 is isolated. 1 adds 4, 5, 6, removes 3, 1. Now assume 4 is leader.
// After 2 rejoins the cluster, 2 may send stale MsgRequestVote to 1 and 3,
// 1 and 3 will ignore this message. Later 4 will send messages to 2 and 2 will
// rejoin the raft group again.
// f. 2 is isolated. 1 adds 4, 5, 6, removes 3, 1. Now assume 4 is leader, and 4 removes 2.
// unlike case e, 2 will be stale forever.
// TODO: for case f, if 2 is stale for a long time, 2 will communicate with scheduler and scheduler will
// tell 2 is stale, so 2 can remove itself.
region := d.Region()
if util.IsEpochStale(fromEpoch, region.RegionEpoch) && util.FindPeer(region, fromStoreID) == nil {
// The message is stale and not in current region.
handleStaleMsg(d.ctx.trans, msg, region.RegionEpoch, isVoteMsg)
return true
}
target := msg.GetToPeer()
if target.Id < d.PeerId() {
log.Infof("%s target peer ID %d is less than %d, msg maybe stale", d.Tag, target.Id, d.PeerId())
return true
} else if target.Id > d.PeerId() {
if d.MaybeDestroy() {
log.Infof("%s is stale as received a larger peer %s, destroying", d.Tag, target)
d.destroyPeer()
d.ctx.router.sendStore(message.NewMsg(message.MsgTypeStoreRaftMessage, msg))
}
return true
}
return false
}
func handleStaleMsg(trans Transport, msg *rspb.RaftMessage, curEpoch *metapb.RegionEpoch,
needGC bool) {
regionID := msg.RegionId
fromPeer := msg.FromPeer
toPeer := msg.ToPeer
msgType := msg.Message.GetMsgType()
if !needGC {
log.Infof("[region %d] raft message %s is stale, current %v ignore it",
regionID, msgType, curEpoch)
return
}
gcMsg := &rspb.RaftMessage{
RegionId: regionID,
FromPeer: toPeer,
ToPeer: fromPeer,
RegionEpoch: curEpoch,
IsTombstone: true,
}
if err := trans.Send(gcMsg); err != nil {
log.Errorf("[region %d] send message failed %v", regionID, err)
}
}
func (d *peerMsgHandler) handleGCPeerMsg(msg *rspb.RaftMessage) {
fromEpoch := msg.RegionEpoch
if !util.IsEpochStale(d.Region().RegionEpoch, fromEpoch) {
return
}
if !util.PeerEqual(d.Meta, msg.ToPeer) {
log.Infof("%s receive stale gc msg, ignore", d.Tag)
return
}
log.Infof("%s peer %s receives gc message, trying to remove", d.Tag, msg.ToPeer)
if d.MaybeDestroy() {
d.destroyPeer()
}
}
// Returns `None` if the `msg` doesn't contain a snapshot or it contains a snapshot which
// doesn't conflict with any other snapshots or regions. Otherwise a `snap.SnapKey` is returned.
func (d *peerMsgHandler) checkSnapshot(msg *rspb.RaftMessage) (*snap.SnapKey, error) {
if msg.Message.Snapshot == nil {
return nil, nil
}
regionID := msg.RegionId
snapshot := msg.Message.Snapshot
key := snap.SnapKeyFromRegionSnap(regionID, snapshot)
snapData := new(rspb.RaftSnapshotData)
err := snapData.Unmarshal(snapshot.Data)
if err != nil {
return nil, err
}
snapRegion := snapData.Region
peerID := msg.ToPeer.Id
var contains bool
for _, peer := range snapRegion.Peers {
if peer.Id == peerID {
contains = true
break
}
}
if !contains {
log.Infof("%s %s doesn't contains peer %d, skip", d.Tag, snapRegion, peerID)
return &key, nil
}
meta := d.ctx.storeMeta
meta.Lock()
defer meta.Unlock()
if !util.RegionEqual(meta.regions[d.regionId], d.Region()) {
if !d.isInitialized() {
log.Infof("%s stale delegate detected, skip", d.Tag)
return &key, nil
} else {
panic(fmt.Sprintf("%s meta corrupted %s != %s", d.Tag, meta.regions[d.regionId], d.Region()))
}
}
existRegions := meta.getOverlapRegions(snapRegion)
for _, existRegion := range existRegions {
if existRegion.GetId() == snapRegion.GetId() {
continue
}
log.Infof("%s region overlapped %s %s", d.Tag, existRegion, snapRegion)
return &key, nil
}
// check if snapshot file exists.
_, err = d.ctx.snapMgr.GetSnapshotForApplying(key)
if err != nil {
return nil, err
}
return nil, nil
}
func (d *peerMsgHandler) destroyPeer() {
log.Infof("%s starts destroy", d.Tag)
regionID := d.regionId
// We can't destroy a peer which is applying snapshot.
meta := d.ctx.storeMeta
meta.Lock()
defer meta.Unlock()
isInitialized := d.isInitialized()
if err := d.Destroy(d.ctx.engine, false); err != nil {
// If not panic here, the peer will be recreated in the next restart,
// then it will be gc again. But if some overlap region is created
// before restarting, the gc action will delete the overlap region's
// data too.
panic(fmt.Sprintf("%s destroy peer %v", d.Tag, err))
}
d.ctx.router.close(regionID)
d.stopped = true
if isInitialized && meta.regionRanges.Delete(&regionItem{region: d.Region()}) == nil {
panic(d.Tag + " meta corruption detected")
}
if _, ok := meta.regions[regionID]; !ok {
panic(d.Tag + " meta corruption detected")
}
delete(meta.regions, regionID)
}
func (d *peerMsgHandler) findSiblingRegion() (result *metapb.Region) {
meta := d.ctx.storeMeta
meta.RLock()
defer meta.RUnlock()
item := &regionItem{region: d.Region()}
meta.regionRanges.AscendGreaterOrEqual(item, func(i btree.Item) bool {
result = i.(*regionItem).region
return true
})
return
}
func (d *peerMsgHandler) onRaftGCLogTick() {
d.ticker.schedule(PeerTickRaftLogGC)
if !d.IsLeader() {
return
}
appliedIdx := d.peerStorage.AppliedIndex()
firstIdx, _ := d.peerStorage.FirstIndex()
var compactIdx uint64
if appliedIdx > firstIdx && appliedIdx-firstIdx >= d.ctx.cfg.RaftLogGcCountLimit {
compactIdx = appliedIdx
} else {
return
}
y.Assert(compactIdx > 0)
compactIdx -= 1
if compactIdx < firstIdx {
// In case compact_idx == first_idx before subtraction.
return
}
term, err := d.RaftGroup.Raft.RaftLog.Term(compactIdx)
if err != nil {
log.Fatalf("appliedIdx: %d, firstIdx: %d, compactIdx: %d", appliedIdx, firstIdx, compactIdx)
panic(err)
}
// Create a compact log request and notify directly.
regionID := d.regionId
request := newCompactLogRequest(regionID, d.Meta, compactIdx, term)
d.proposeRaftCommand(request, nil)
}
func (d *peerMsgHandler) onSplitRegionCheckTick() {
d.ticker.schedule(PeerTickSplitRegionCheck)
// To avoid frequent scan, we only add new scan tasks if all previous tasks
// have finished.
if len(d.ctx.splitCheckTaskSender) > 0 {
return
}
if !d.IsLeader() {
return
}
if d.ApproximateSize != nil && d.SizeDiffHint < d.ctx.cfg.RegionSplitSize/8 {
return
}
d.ctx.splitCheckTaskSender <- &runner.SplitCheckTask{
Region: d.Region(),
}
d.SizeDiffHint = 0
}
func (d *peerMsgHandler) onPrepareSplitRegion(regionEpoch *metapb.RegionEpoch, splitKey []byte, cb *message.Callback) {
if err := d.validateSplitRegion(regionEpoch, splitKey); err != nil {
cb.Done(ErrResp(err))
return
}
region := d.Region()
d.ctx.schedulerTaskSender <- &runner.SchedulerAskSplitTask{
Region: region,
SplitKey: splitKey,
Peer: d.Meta,
Callback: cb,
}
}
func (d *peerMsgHandler) validateSplitRegion(epoch *metapb.RegionEpoch, splitKey []byte) error {
if len(splitKey) == 0 {
err := errors.Errorf("%s split key should not be empty", d.Tag)
log.Error(err)
return err
}
if !d.IsLeader() {
// region on this store is no longer leader, skipped.
log.Infof("%s not leader, skip", d.Tag)
return &util.ErrNotLeader{
RegionId: d.regionId,
Leader: d.getPeerFromCache(d.LeaderId()),
}
}
region := d.Region()
latestEpoch := region.GetRegionEpoch()
// This is a little difference for `check_region_epoch` in region split case.
// Here we just need to check `version` because `conf_ver` will be update
// to the latest value of the peer, and then send to Scheduler.
if latestEpoch.Version != epoch.Version {
log.Infof("%s epoch changed, retry later, prev_epoch: %s, epoch %s",
d.Tag, latestEpoch, epoch)
return &util.ErrEpochNotMatch{
Message: fmt.Sprintf("%s epoch changed %s != %s, retry later", d.Tag, latestEpoch, epoch),
Regions: []*metapb.Region{region},
}
}
return nil
}
func (d *peerMsgHandler) onApproximateRegionSize(size uint64) {
d.ApproximateSize = &size
}
func (d *peerMsgHandler) onSchedulerHeartbeatTick() {
d.ticker.schedule(PeerTickSchedulerHeartbeat)
if !d.IsLeader() {
return
}
d.HeartbeatScheduler(d.ctx.schedulerTaskSender)
}
func (d *peerMsgHandler) onGCSnap(snaps []snap.SnapKeyWithSending) {
compactedIdx := d.peerStorage.truncatedIndex()
compactedTerm := d.peerStorage.truncatedTerm()
for _, snapKeyWithSending := range snaps {
key := snapKeyWithSending.SnapKey
if snapKeyWithSending.IsSending {
snap, err := d.ctx.snapMgr.GetSnapshotForSending(key)
if err != nil {
log.Errorf("%s failed to load snapshot for %s %v", d.Tag, key, err)
continue
}
if key.Term < compactedTerm || key.Index < compactedIdx {
log.Infof("%s snap file %s has been compacted, delete", d.Tag, key)
d.ctx.snapMgr.DeleteSnapshot(key, snap, false)
} else if fi, err1 := snap.Meta(); err1 == nil {
modTime := fi.ModTime()
if time.Since(modTime) > 4*time.Hour {
log.Infof("%s snap file %s has been expired, delete", d.Tag, key)
d.ctx.snapMgr.DeleteSnapshot(key, snap, false)
}
}
} else if key.Term <= compactedTerm &&
(key.Index < compactedIdx || key.Index == compactedIdx) {
log.Infof("%s snap file %s has been applied, delete", d.Tag, key)
a, err := d.ctx.snapMgr.GetSnapshotForApplying(key)
if err != nil {
log.Errorf("%s failed to load snapshot for %s %v", d.Tag, key, err)
continue
}
d.ctx.snapMgr.DeleteSnapshot(key, a, false)
}
}
}
func newAdminRequest(regionID uint64, peer *metapb.Peer) *raft_cmdpb.RaftCmdRequest {
return &raft_cmdpb.RaftCmdRequest{
Header: &raft_cmdpb.RaftRequestHeader{
RegionId: regionID,
Peer: peer,
},
}
}
func newCompactLogRequest(regionID uint64, peer *metapb.Peer, compactIndex, compactTerm uint64) *raft_cmdpb.RaftCmdRequest {
req := newAdminRequest(regionID, peer)
req.AdminRequest = &raft_cmdpb.AdminRequest{
CmdType: raft_cmdpb.AdminCmdType_CompactLog,
CompactLog: &raft_cmdpb.CompactLogRequest{
CompactIndex: compactIndex,
CompactTerm: compactTerm,
},
}
return req
}