mirror of
https://github.com/talent-plan/tinykv.git
synced 2025-04-04 00:20:25 +08:00
I am recently debugging at 3B. And when I am reading the Destroy function at file kv/raftstore/peer.go, I found that there is a comment "write kv rocksdb first in case of restart happen between two write". The previous lab in tinykv seems never mention about rocksdb, while the TiKV mentioned. So I think there is a typo, and this comment should be modified to "write kv badgerDB first in case of restart happen between two write".
393 lines
12 KiB
Go
393 lines
12 KiB
Go
package raftstore
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/pingcap-incubator/tinykv/kv/config"
|
|
"github.com/pingcap-incubator/tinykv/kv/raftstore/message"
|
|
"github.com/pingcap-incubator/tinykv/kv/raftstore/meta"
|
|
"github.com/pingcap-incubator/tinykv/kv/raftstore/runner"
|
|
"github.com/pingcap-incubator/tinykv/kv/raftstore/util"
|
|
"github.com/pingcap-incubator/tinykv/kv/util/engine_util"
|
|
"github.com/pingcap-incubator/tinykv/kv/util/worker"
|
|
"github.com/pingcap-incubator/tinykv/log"
|
|
"github.com/pingcap-incubator/tinykv/proto/pkg/eraftpb"
|
|
"github.com/pingcap-incubator/tinykv/proto/pkg/metapb"
|
|
rspb "github.com/pingcap-incubator/tinykv/proto/pkg/raft_serverpb"
|
|
"github.com/pingcap-incubator/tinykv/raft"
|
|
"github.com/pingcap/errors"
|
|
)
|
|
|
|
func NotifyStaleReq(term uint64, cb *message.Callback) {
|
|
cb.Done(ErrRespStaleCommand(term))
|
|
}
|
|
|
|
func NotifyReqRegionRemoved(regionId uint64, cb *message.Callback) {
|
|
regionNotFound := &util.ErrRegionNotFound{RegionId: regionId}
|
|
resp := ErrResp(regionNotFound)
|
|
cb.Done(resp)
|
|
}
|
|
|
|
// If we create the peer actively, like bootstrap/split/merge region, we should
|
|
// use this function to create the peer. The region must contain the peer info
|
|
// for this store.
|
|
func createPeer(storeID uint64, cfg *config.Config, sched chan<- worker.Task,
|
|
engines *engine_util.Engines, region *metapb.Region) (*peer, error) {
|
|
metaPeer := util.FindPeer(region, storeID)
|
|
if metaPeer == nil {
|
|
return nil, errors.Errorf("find no peer for store %d in region %v", storeID, region)
|
|
}
|
|
log.Infof("region %v create peer with ID %d", region, metaPeer.Id)
|
|
return NewPeer(storeID, cfg, engines, region, sched, metaPeer)
|
|
}
|
|
|
|
// The peer can be created from another node with raft membership changes, and we only
|
|
// know the region_id and peer_id when creating this replicated peer, the region info
|
|
// will be retrieved later after applying snapshot.
|
|
func replicatePeer(storeID uint64, cfg *config.Config, sched chan<- worker.Task,
|
|
engines *engine_util.Engines, regionID uint64, metaPeer *metapb.Peer) (*peer, error) {
|
|
// We will remove tombstone key when apply snapshot
|
|
log.Infof("[region %v] replicates peer with ID %d", regionID, metaPeer.GetId())
|
|
region := &metapb.Region{
|
|
Id: regionID,
|
|
RegionEpoch: &metapb.RegionEpoch{},
|
|
}
|
|
return NewPeer(storeID, cfg, engines, region, sched, metaPeer)
|
|
}
|
|
|
|
type proposal struct {
|
|
// index + term for unique identification
|
|
index uint64
|
|
term uint64
|
|
cb *message.Callback
|
|
}
|
|
|
|
type peer struct {
|
|
// The ticker of the peer, used to trigger
|
|
// * raft tick
|
|
// * raft log gc
|
|
// * region heartbeat
|
|
// * split check
|
|
ticker *ticker
|
|
// Instance of the Raft module
|
|
RaftGroup *raft.RawNode
|
|
// The peer storage for the Raft module
|
|
peerStorage *PeerStorage
|
|
|
|
// Record the meta information of the peer
|
|
Meta *metapb.Peer
|
|
regionId uint64
|
|
// Tag which is useful for printing log
|
|
Tag string
|
|
|
|
// Record the callback of the proposals
|
|
// (Used in 2B)
|
|
proposals []*proposal
|
|
|
|
// Index of last scheduled compacted raft log.
|
|
// (Used in 2C)
|
|
LastCompactedIdx uint64
|
|
|
|
// Cache the peers information from other stores
|
|
// when sending raft messages to other peers, it's used to get the store id of target peer
|
|
// (Used in 3B conf change)
|
|
peerCache map[uint64]*metapb.Peer
|
|
// Record the instants of peers being added into the configuration.
|
|
// Remove them after they are not pending any more.
|
|
// (Used in 3B conf change)
|
|
PeersStartPendingTime map[uint64]time.Time
|
|
// Mark the peer as stopped, set when peer is destroyed
|
|
// (Used in 3B conf change)
|
|
stopped bool
|
|
|
|
// An inaccurate difference in region size since last reset.
|
|
// split checker is triggered when it exceeds the threshold, it makes split checker not scan the data very often
|
|
// (Used in 3B split)
|
|
SizeDiffHint uint64
|
|
// Approximate size of the region.
|
|
// It's updated everytime the split checker scan the data
|
|
// (Used in 3B split)
|
|
ApproximateSize *uint64
|
|
}
|
|
|
|
func NewPeer(storeId uint64, cfg *config.Config, engines *engine_util.Engines, region *metapb.Region, regionSched chan<- worker.Task,
|
|
meta *metapb.Peer) (*peer, error) {
|
|
if meta.GetId() == util.InvalidID {
|
|
return nil, fmt.Errorf("invalid peer id")
|
|
}
|
|
tag := fmt.Sprintf("[region %v] %v", region.GetId(), meta.GetId())
|
|
|
|
ps, err := NewPeerStorage(engines, region, regionSched, tag)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
appliedIndex := ps.AppliedIndex()
|
|
|
|
raftCfg := &raft.Config{
|
|
ID: meta.GetId(),
|
|
ElectionTick: cfg.RaftElectionTimeoutTicks,
|
|
HeartbeatTick: cfg.RaftHeartbeatTicks,
|
|
Applied: appliedIndex,
|
|
Storage: ps,
|
|
}
|
|
|
|
raftGroup, err := raft.NewRawNode(raftCfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
p := &peer{
|
|
Meta: meta,
|
|
regionId: region.GetId(),
|
|
RaftGroup: raftGroup,
|
|
peerStorage: ps,
|
|
peerCache: make(map[uint64]*metapb.Peer),
|
|
PeersStartPendingTime: make(map[uint64]time.Time),
|
|
Tag: tag,
|
|
ticker: newTicker(region.GetId(), cfg),
|
|
}
|
|
|
|
// If this region has only one peer and I am the one, campaign directly.
|
|
if len(region.GetPeers()) == 1 && region.GetPeers()[0].GetStoreId() == storeId {
|
|
err = p.RaftGroup.Campaign()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return p, nil
|
|
}
|
|
|
|
func (p *peer) insertPeerCache(peer *metapb.Peer) {
|
|
p.peerCache[peer.GetId()] = peer
|
|
}
|
|
|
|
func (p *peer) removePeerCache(peerID uint64) {
|
|
delete(p.peerCache, peerID)
|
|
}
|
|
|
|
func (p *peer) getPeerFromCache(peerID uint64) *metapb.Peer {
|
|
if peer, ok := p.peerCache[peerID]; ok {
|
|
return peer
|
|
}
|
|
for _, peer := range p.peerStorage.Region().GetPeers() {
|
|
if peer.GetId() == peerID {
|
|
p.insertPeerCache(peer)
|
|
return peer
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *peer) nextProposalIndex() uint64 {
|
|
return p.RaftGroup.Raft.RaftLog.LastIndex() + 1
|
|
}
|
|
|
|
/// Tries to destroy itself. Returns a job (if needed) to do more cleaning tasks.
|
|
func (p *peer) MaybeDestroy() bool {
|
|
if p.stopped {
|
|
log.Infof("%v is being destroyed, skip", p.Tag)
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
/// Does the real destroy worker.Task which includes:
|
|
/// 1. Set the region to tombstone;
|
|
/// 2. Clear data;
|
|
/// 3. Notify all pending requests.
|
|
func (p *peer) Destroy(engine *engine_util.Engines, keepData bool) error {
|
|
start := time.Now()
|
|
region := p.Region()
|
|
log.Infof("%v begin to destroy", p.Tag)
|
|
|
|
// Set Tombstone state explicitly
|
|
kvWB := new(engine_util.WriteBatch)
|
|
raftWB := new(engine_util.WriteBatch)
|
|
if err := p.peerStorage.clearMeta(kvWB, raftWB); err != nil {
|
|
return err
|
|
}
|
|
meta.WriteRegionState(kvWB, region, rspb.PeerState_Tombstone)
|
|
// write kv badgerDB first in case of restart happen between two write
|
|
if err := kvWB.WriteToDB(engine.Kv); err != nil {
|
|
return err
|
|
}
|
|
if err := raftWB.WriteToDB(engine.Raft); err != nil {
|
|
return err
|
|
}
|
|
|
|
if p.peerStorage.isInitialized() && !keepData {
|
|
// If we meet panic when deleting data and raft log, the dirty data
|
|
// will be cleared by a newer snapshot applying or restart.
|
|
p.peerStorage.ClearData()
|
|
}
|
|
|
|
for _, proposal := range p.proposals {
|
|
NotifyReqRegionRemoved(region.Id, proposal.cb)
|
|
}
|
|
p.proposals = nil
|
|
|
|
log.Infof("%v destroy itself, takes %v", p.Tag, time.Now().Sub(start))
|
|
return nil
|
|
}
|
|
|
|
func (p *peer) isInitialized() bool {
|
|
return p.peerStorage.isInitialized()
|
|
}
|
|
|
|
func (p *peer) storeID() uint64 {
|
|
return p.Meta.StoreId
|
|
}
|
|
|
|
func (p *peer) Region() *metapb.Region {
|
|
return p.peerStorage.Region()
|
|
}
|
|
|
|
/// Set the region of a peer.
|
|
///
|
|
/// This will update the region of the peer, caller must ensure the region
|
|
/// has been preserved in a durable device.
|
|
func (p *peer) SetRegion(region *metapb.Region) {
|
|
p.peerStorage.SetRegion(region)
|
|
}
|
|
|
|
func (p *peer) PeerId() uint64 {
|
|
return p.Meta.GetId()
|
|
}
|
|
|
|
func (p *peer) LeaderId() uint64 {
|
|
return p.RaftGroup.Raft.Lead
|
|
}
|
|
|
|
func (p *peer) IsLeader() bool {
|
|
return p.RaftGroup.Raft.State == raft.StateLeader
|
|
}
|
|
|
|
func (p *peer) Send(trans Transport, msgs []eraftpb.Message) {
|
|
for _, msg := range msgs {
|
|
err := p.sendRaftMessage(msg, trans)
|
|
if err != nil {
|
|
log.Debugf("%v send message err: %v", p.Tag, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Collects all pending peers and update `peers_start_pending_time`.
|
|
func (p *peer) CollectPendingPeers() []*metapb.Peer {
|
|
pendingPeers := make([]*metapb.Peer, 0, len(p.Region().GetPeers()))
|
|
truncatedIdx := p.peerStorage.truncatedIndex()
|
|
for id, progress := range p.RaftGroup.GetProgress() {
|
|
if id == p.Meta.GetId() {
|
|
continue
|
|
}
|
|
if progress.Match < truncatedIdx {
|
|
if peer := p.getPeerFromCache(id); peer != nil {
|
|
pendingPeers = append(pendingPeers, peer)
|
|
if _, ok := p.PeersStartPendingTime[id]; !ok {
|
|
now := time.Now()
|
|
p.PeersStartPendingTime[id] = now
|
|
log.Debugf("%v peer %v start pending at %v", p.Tag, id, now)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return pendingPeers
|
|
}
|
|
|
|
func (p *peer) clearPeersStartPendingTime() {
|
|
for id := range p.PeersStartPendingTime {
|
|
delete(p.PeersStartPendingTime, id)
|
|
}
|
|
}
|
|
|
|
/// Returns `true` if any new peer catches up with the leader in replicating logs.
|
|
/// And updates `PeersStartPendingTime` if needed.
|
|
func (p *peer) AnyNewPeerCatchUp(peerId uint64) bool {
|
|
if len(p.PeersStartPendingTime) == 0 {
|
|
return false
|
|
}
|
|
if !p.IsLeader() {
|
|
p.clearPeersStartPendingTime()
|
|
return false
|
|
}
|
|
if startPendingTime, ok := p.PeersStartPendingTime[peerId]; ok {
|
|
truncatedIdx := p.peerStorage.truncatedIndex()
|
|
progress, ok := p.RaftGroup.Raft.Prs[peerId]
|
|
if ok {
|
|
if progress.Match >= truncatedIdx {
|
|
delete(p.PeersStartPendingTime, peerId)
|
|
elapsed := time.Since(startPendingTime)
|
|
log.Debugf("%v peer %v has caught up logs, elapsed: %v", p.Tag, peerId, elapsed)
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (p *peer) MaybeCampaign(parentIsLeader bool) bool {
|
|
// The peer campaigned when it was created, no need to do it again.
|
|
if len(p.Region().GetPeers()) <= 1 || !parentIsLeader {
|
|
return false
|
|
}
|
|
|
|
// If last peer is the leader of the region before split, it's intuitional for
|
|
// it to become the leader of new split region.
|
|
p.RaftGroup.Campaign()
|
|
return true
|
|
}
|
|
|
|
func (p *peer) Term() uint64 {
|
|
return p.RaftGroup.Raft.Term
|
|
}
|
|
|
|
func (p *peer) HeartbeatScheduler(ch chan<- worker.Task) {
|
|
clonedRegion := new(metapb.Region)
|
|
err := util.CloneMsg(p.Region(), clonedRegion)
|
|
if err != nil {
|
|
return
|
|
}
|
|
ch <- &runner.SchedulerRegionHeartbeatTask{
|
|
Region: clonedRegion,
|
|
Peer: p.Meta,
|
|
PendingPeers: p.CollectPendingPeers(),
|
|
ApproximateSize: p.ApproximateSize,
|
|
}
|
|
}
|
|
|
|
func (p *peer) sendRaftMessage(msg eraftpb.Message, trans Transport) error {
|
|
sendMsg := new(rspb.RaftMessage)
|
|
sendMsg.RegionId = p.regionId
|
|
// set current epoch
|
|
sendMsg.RegionEpoch = &metapb.RegionEpoch{
|
|
ConfVer: p.Region().RegionEpoch.ConfVer,
|
|
Version: p.Region().RegionEpoch.Version,
|
|
}
|
|
|
|
fromPeer := *p.Meta
|
|
toPeer := p.getPeerFromCache(msg.To)
|
|
if toPeer == nil {
|
|
return fmt.Errorf("failed to lookup recipient peer %v in region %v", msg.To, p.regionId)
|
|
}
|
|
log.Debugf("%v, send raft msg %v from %v to %v", p.Tag, msg.MsgType, fromPeer, toPeer)
|
|
|
|
sendMsg.FromPeer = &fromPeer
|
|
sendMsg.ToPeer = toPeer
|
|
|
|
// There could be two cases:
|
|
// 1. Target peer already exists but has not established communication with leader yet
|
|
// 2. Target peer is added newly due to member change or region split, but it's not
|
|
// created yet
|
|
// For both cases the region start key and end key are attached in RequestVote and
|
|
// Heartbeat message for the store of that peer to check whether to create a new peer
|
|
// when receiving these messages, or just to wait for a pending region split to perform
|
|
// later.
|
|
if p.peerStorage.isInitialized() && util.IsInitialMsg(&msg) {
|
|
sendMsg.StartKey = append([]byte{}, p.Region().StartKey...)
|
|
sendMsg.EndKey = append([]byte{}, p.Region().EndKey...)
|
|
}
|
|
sendMsg.Message = &msg
|
|
return trans.Send(sendMsg)
|
|
}
|