mirror of
https://github.com/talent-plan/tinykv.git
synced 2025-01-15 23:00:17 +08:00
cb91131143
Co-authored-by: Connor <zbk602423539@gmail.com>
210 lines
7.0 KiB
Go
210 lines
7.0 KiB
Go
package runner
|
|
|
|
import (
|
|
"encoding/hex"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/Connor1996/badger"
|
|
"github.com/juju/errors"
|
|
"github.com/pingcap-incubator/tinykv/kv/raftstore/meta"
|
|
"github.com/pingcap-incubator/tinykv/kv/raftstore/snap"
|
|
"github.com/pingcap-incubator/tinykv/kv/raftstore/util"
|
|
"github.com/pingcap-incubator/tinykv/kv/util/engine_util"
|
|
"github.com/pingcap-incubator/tinykv/kv/util/worker"
|
|
"github.com/pingcap-incubator/tinykv/log"
|
|
"github.com/pingcap-incubator/tinykv/proto/pkg/eraftpb"
|
|
"github.com/pingcap-incubator/tinykv/proto/pkg/metapb"
|
|
rspb "github.com/pingcap-incubator/tinykv/proto/pkg/raft_serverpb"
|
|
)
|
|
|
|
// There're some tasks for region worker, such as:
|
|
// `RegionTaskGen` which will cause the worker to generate a snapshot according to RegionId,
|
|
// `RegionTaskApply` which will apply a snapshot to the region that id equals RegionId,
|
|
// `RegionTaskDestroy` which will clean up the key range from StartKey to EndKey.
|
|
|
|
type RegionTaskGen struct {
|
|
RegionId uint64 // specify the region which the task is for.
|
|
Notifier chan<- *eraftpb.Snapshot // when it finishes snapshot generating, it notifies notifier.
|
|
}
|
|
|
|
type RegionTaskApply struct {
|
|
RegionId uint64 // specify the region which the task is for.
|
|
Notifier chan<- bool // when it finishes snapshot applying, it notifies notifier.
|
|
SnapMeta *eraftpb.SnapshotMetadata // the region meta information of the snapshot
|
|
StartKey []byte // `StartKey` and `EndKey` are origin region's range, it's used to clean up certain range of region before applying snapshot.
|
|
EndKey []byte
|
|
}
|
|
|
|
type RegionTaskDestroy struct {
|
|
RegionId uint64 // specify the region which the task is for.
|
|
StartKey []byte // `StartKey` and `EndKey` are used to destroy certain range of region.
|
|
EndKey []byte
|
|
}
|
|
|
|
type regionTaskHandler struct {
|
|
ctx *snapContext
|
|
}
|
|
|
|
func NewRegionTaskHandler(engines *engine_util.Engines, mgr *snap.SnapManager) *regionTaskHandler {
|
|
return ®ionTaskHandler{
|
|
ctx: &snapContext{
|
|
engines: engines,
|
|
mgr: mgr,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (r *regionTaskHandler) Handle(t worker.Task) {
|
|
switch t.(type) {
|
|
case *RegionTaskGen:
|
|
task := t.(*RegionTaskGen)
|
|
// It is safe for now to handle generating and applying snapshot concurrently,
|
|
// but it may not when merge is implemented.
|
|
r.ctx.handleGen(task.RegionId, task.Notifier)
|
|
case *RegionTaskApply:
|
|
task := t.(*RegionTaskApply)
|
|
r.ctx.handleApply(task.RegionId, task.Notifier, task.StartKey, task.EndKey, task.SnapMeta)
|
|
case *RegionTaskDestroy:
|
|
task := t.(*RegionTaskDestroy)
|
|
r.ctx.cleanUpRange(task.RegionId, task.StartKey, task.EndKey)
|
|
}
|
|
}
|
|
|
|
type snapContext struct {
|
|
engines *engine_util.Engines
|
|
batchSize uint64
|
|
mgr *snap.SnapManager
|
|
}
|
|
|
|
// handleGen handles the task of generating snapshot of the Region.
|
|
func (snapCtx *snapContext) handleGen(regionId uint64, notifier chan<- *eraftpb.Snapshot) {
|
|
snap, err := doSnapshot(snapCtx.engines, snapCtx.mgr, regionId)
|
|
if err != nil {
|
|
log.Errorf("failed to generate snapshot!!!, [regionId: %d, err : %v]", regionId, err)
|
|
notifier <- nil
|
|
} else {
|
|
notifier <- snap
|
|
}
|
|
}
|
|
|
|
// applySnap applies snapshot data of the Region.
|
|
func (snapCtx *snapContext) applySnap(regionId uint64, startKey, endKey []byte, snapMeta *eraftpb.SnapshotMetadata) error {
|
|
log.Infof("begin apply snap data. [regionId: %d]", regionId)
|
|
|
|
// cleanUpOriginData clear up the region data before applying snapshot
|
|
snapCtx.cleanUpRange(regionId, startKey, endKey)
|
|
|
|
snapKey := snap.SnapKey{RegionID: regionId, Index: snapMeta.Index, Term: snapMeta.Term}
|
|
snapCtx.mgr.Register(snapKey, snap.SnapEntryApplying)
|
|
defer snapCtx.mgr.Deregister(snapKey, snap.SnapEntryApplying)
|
|
|
|
snapshot, err := snapCtx.mgr.GetSnapshotForApplying(snapKey)
|
|
if err != nil {
|
|
return errors.New(fmt.Sprintf("missing snapshot file %s", err))
|
|
}
|
|
|
|
t := time.Now()
|
|
applyOptions := snap.NewApplyOptions(snapCtx.engines.Kv, &metapb.Region{
|
|
Id: regionId,
|
|
StartKey: startKey,
|
|
EndKey: endKey,
|
|
})
|
|
if err := snapshot.Apply(*applyOptions); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Infof("applying new data. [regionId: %d, timeTakes: %v]", regionId, time.Now().Sub(t))
|
|
return nil
|
|
}
|
|
|
|
// handleApply tries to apply the snapshot of the specified Region. It calls `applySnap` to do the actual work.
|
|
func (snapCtx *snapContext) handleApply(regionId uint64, notifier chan<- bool, startKey, endKey []byte, snapMeta *eraftpb.SnapshotMetadata) {
|
|
err := snapCtx.applySnap(regionId, startKey, endKey, snapMeta)
|
|
if err != nil {
|
|
notifier <- false
|
|
log.Fatalf("failed to apply snap!!!. err: %v", err)
|
|
}
|
|
notifier <- true
|
|
}
|
|
|
|
// cleanUpRange cleans up the data within the range.
|
|
func (snapCtx *snapContext) cleanUpRange(regionId uint64, startKey, endKey []byte) {
|
|
if err := engine_util.DeleteRange(snapCtx.engines.Kv, startKey, endKey); err != nil {
|
|
log.Fatalf("failed to delete data in range, [regionId: %d, startKey: %s, endKey: %s, err: %v]", regionId,
|
|
hex.EncodeToString(startKey), hex.EncodeToString(endKey), err)
|
|
} else {
|
|
log.Infof("succeed in deleting data in range. [regionId: %d, startKey: %s, endKey: %s]", regionId,
|
|
hex.EncodeToString(startKey), hex.EncodeToString(endKey))
|
|
}
|
|
}
|
|
|
|
func getAppliedIdxTermForSnapshot(raft *badger.DB, kv *badger.Txn, regionId uint64) (uint64, uint64, error) {
|
|
applyState := new(rspb.RaftApplyState)
|
|
err := engine_util.GetMetaFromTxn(kv, meta.ApplyStateKey(regionId), applyState)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
idx := applyState.AppliedIndex
|
|
var term uint64
|
|
if idx == applyState.TruncatedState.Index {
|
|
term = applyState.TruncatedState.Term
|
|
} else {
|
|
entry, err := meta.GetRaftEntry(raft, regionId, idx)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
} else {
|
|
term = entry.GetTerm()
|
|
}
|
|
}
|
|
return idx, term, nil
|
|
}
|
|
|
|
func doSnapshot(engines *engine_util.Engines, mgr *snap.SnapManager, regionId uint64) (*eraftpb.Snapshot, error) {
|
|
log.Debugf("begin to generate a snapshot. [regionId: %d]", regionId)
|
|
|
|
txn := engines.Kv.NewTransaction(false)
|
|
|
|
index, term, err := getAppliedIdxTermForSnapshot(engines.Raft, txn, regionId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
key := snap.SnapKey{RegionID: regionId, Index: index, Term: term}
|
|
mgr.Register(key, snap.SnapEntryGenerating)
|
|
defer mgr.Deregister(key, snap.SnapEntryGenerating)
|
|
|
|
regionState := new(rspb.RegionLocalState)
|
|
err = engine_util.GetMetaFromTxn(txn, meta.RegionStateKey(regionId), regionState)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
if regionState.GetState() != rspb.PeerState_Normal {
|
|
return nil, errors.Errorf("snap job %d seems stale, skip", regionId)
|
|
}
|
|
|
|
region := regionState.GetRegion()
|
|
confState := util.ConfStateFromRegion(region)
|
|
snapshot := &eraftpb.Snapshot{
|
|
Metadata: &eraftpb.SnapshotMetadata{
|
|
Index: key.Index,
|
|
Term: key.Term,
|
|
ConfState: &confState,
|
|
},
|
|
}
|
|
s, err := mgr.GetSnapshotForBuilding(key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Set snapshot data
|
|
snapshotData := &rspb.RaftSnapshotData{Region: region}
|
|
snapshotStatics := snap.SnapStatistics{}
|
|
err = s.Build(txn, region, snapshotData, &snapshotStatics, mgr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
snapshot.Data, err = snapshotData.Marshal()
|
|
return snapshot, err
|
|
}
|