mirror of
https://github.com/talent-plan/tinykv.git
synced 2025-01-15 23:00:17 +08:00
5e089a2cd1
Signed-off-by: Connor <zbk602423539@gmail.com> Co-authored-by: Nick Cameron <nrc@ncameron.org> Co-authored-by: linning <linningde25@gmail.com> Co-authored-by: YangKeao <keao.yang@yahoo.com> Co-authored-by: andylokandy <andylokandy@hotmail.com> Co-authored-by: Iosmanthus Teng <myosmanthustree@gmail.com>
364 lines
12 KiB
Go
364 lines
12 KiB
Go
// Copyright 2018 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package schedule
|
|
|
|
import (
|
|
"container/heap"
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pingcap-incubator/tinykv/proto/pkg/eraftpb"
|
|
"github.com/pingcap-incubator/tinykv/proto/pkg/metapb"
|
|
"github.com/pingcap-incubator/tinykv/proto/pkg/schedulerpb"
|
|
"github.com/pingcap-incubator/tinykv/scheduler/pkg/cache"
|
|
"github.com/pingcap-incubator/tinykv/scheduler/server/core"
|
|
"github.com/pingcap-incubator/tinykv/scheduler/server/schedule/operator"
|
|
"github.com/pingcap-incubator/tinykv/scheduler/server/schedule/opt"
|
|
"github.com/pingcap/log"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// The source of dispatched region.
|
|
const (
|
|
DispatchFromHeartBeat = "heartbeat"
|
|
DispatchFromNotifierQueue = "active push"
|
|
DispatchFromCreate = "create"
|
|
)
|
|
|
|
var (
|
|
slowNotifyInterval = 5 * time.Second
|
|
fastNotifyInterval = 2 * time.Second
|
|
)
|
|
|
|
// HeartbeatStreams is an interface of async region heartbeat.
|
|
type HeartbeatStreams interface {
|
|
SendMsg(region *core.RegionInfo, msg *schedulerpb.RegionHeartbeatResponse)
|
|
}
|
|
|
|
// OperatorController is used to limit the speed of scheduling.
|
|
type OperatorController struct {
|
|
sync.RWMutex
|
|
ctx context.Context
|
|
cluster opt.Cluster
|
|
operators map[uint64]*operator.Operator
|
|
hbStreams HeartbeatStreams
|
|
counts map[operator.OpKind]uint64
|
|
opRecords *OperatorRecords
|
|
opNotifierQueue operatorQueue
|
|
}
|
|
|
|
// NewOperatorController creates a OperatorController.
|
|
func NewOperatorController(ctx context.Context, cluster opt.Cluster, hbStreams HeartbeatStreams) *OperatorController {
|
|
return &OperatorController{
|
|
ctx: ctx,
|
|
cluster: cluster,
|
|
operators: make(map[uint64]*operator.Operator),
|
|
hbStreams: hbStreams,
|
|
counts: make(map[operator.OpKind]uint64),
|
|
opRecords: NewOperatorRecords(ctx),
|
|
opNotifierQueue: make(operatorQueue, 0),
|
|
}
|
|
}
|
|
|
|
// Ctx returns a context which will be canceled once RaftCluster is stopped.
|
|
// For now, it is only used to control the lifetime of TTL cache in schedulers.
|
|
func (oc *OperatorController) Ctx() context.Context {
|
|
return oc.ctx
|
|
}
|
|
|
|
// Dispatch is used to dispatch the operator of a region.
|
|
func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
|
|
// Check existed operator.
|
|
if op := oc.GetOperator(region.GetID()); op != nil {
|
|
timeout := op.IsTimeout()
|
|
if step := op.Check(region); step != nil && !timeout {
|
|
// When the "source" is heartbeat, the region may have a newer
|
|
// confver than the region that the operator holds. In this case,
|
|
// the operator is stale, and will not be executed even we would
|
|
// have sent it to TiKV servers. Here, we just cancel it.
|
|
origin := op.RegionEpoch()
|
|
latest := region.GetRegionEpoch()
|
|
changes := latest.GetConfVer() - origin.GetConfVer()
|
|
if source == DispatchFromHeartBeat &&
|
|
changes > uint64(op.ConfVerChanged(region)) {
|
|
|
|
if oc.RemoveOperator(op) {
|
|
log.Info("stale operator", zap.Uint64("region-id", region.GetID()), zap.Duration("takes", op.RunningTime()),
|
|
zap.Reflect("operator", op), zap.Uint64("diff", changes))
|
|
oc.opRecords.Put(op, schedulerpb.OperatorStatus_CANCEL)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
oc.SendScheduleCommand(region, step, source)
|
|
return
|
|
}
|
|
if op.IsFinish() && oc.RemoveOperator(op) {
|
|
log.Info("operator finish", zap.Uint64("region-id", region.GetID()), zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op))
|
|
oc.opRecords.Put(op, schedulerpb.OperatorStatus_SUCCESS)
|
|
} else if timeout && oc.RemoveOperator(op) {
|
|
log.Info("operator timeout", zap.Uint64("region-id", region.GetID()), zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op))
|
|
oc.opRecords.Put(op, schedulerpb.OperatorStatus_TIMEOUT)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (oc *OperatorController) getNextPushOperatorTime(step operator.OpStep, now time.Time) time.Time {
|
|
nextTime := slowNotifyInterval
|
|
switch step.(type) {
|
|
case operator.TransferLeader:
|
|
nextTime = fastNotifyInterval
|
|
}
|
|
return now.Add(nextTime)
|
|
}
|
|
|
|
// AddOperator adds operators to the running operators.
|
|
func (oc *OperatorController) AddOperator(ops ...*operator.Operator) bool {
|
|
oc.Lock()
|
|
defer oc.Unlock()
|
|
|
|
if !oc.checkAddOperator(ops...) {
|
|
for _, op := range ops {
|
|
oc.opRecords.Put(op, schedulerpb.OperatorStatus_CANCEL)
|
|
}
|
|
return false
|
|
}
|
|
for _, op := range ops {
|
|
oc.addOperatorLocked(op)
|
|
}
|
|
return true
|
|
}
|
|
|
|
// checkAddOperator checks if the operator can be added.
|
|
// There are several situations that cannot be added:
|
|
// - There is no such region in the cluster
|
|
// - The epoch of the operator and the epoch of the corresponding region are no longer consistent.
|
|
// - The region already has a higher priority or same priority operator.
|
|
func (oc *OperatorController) checkAddOperator(ops ...*operator.Operator) bool {
|
|
for _, op := range ops {
|
|
region := oc.cluster.GetRegion(op.RegionID())
|
|
if region == nil {
|
|
log.Debug("region not found, cancel add operator", zap.Uint64("region-id", op.RegionID()))
|
|
return false
|
|
}
|
|
if region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() || region.GetRegionEpoch().GetConfVer() != op.RegionEpoch().GetConfVer() {
|
|
log.Debug("region epoch not match, cancel add operator", zap.Uint64("region-id", op.RegionID()), zap.Reflect("old", region.GetRegionEpoch()), zap.Reflect("new", op.RegionEpoch()))
|
|
return false
|
|
}
|
|
if old := oc.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) {
|
|
log.Debug("already have operator, cancel add operator", zap.Uint64("region-id", op.RegionID()), zap.Reflect("old", old))
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func isHigherPriorityOperator(new, old *operator.Operator) bool {
|
|
return new.GetPriorityLevel() > old.GetPriorityLevel()
|
|
}
|
|
|
|
func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool {
|
|
regionID := op.RegionID()
|
|
|
|
log.Info("add operator", zap.Uint64("region-id", regionID), zap.Reflect("operator", op))
|
|
|
|
// If there is an old operator, replace it. The priority should be checked
|
|
// already.
|
|
if old, ok := oc.operators[regionID]; ok {
|
|
_ = oc.removeOperatorLocked(old)
|
|
log.Info("replace old operator", zap.Uint64("region-id", regionID), zap.Duration("takes", old.RunningTime()), zap.Reflect("operator", old))
|
|
oc.opRecords.Put(old, schedulerpb.OperatorStatus_REPLACE)
|
|
}
|
|
|
|
oc.operators[regionID] = op
|
|
op.SetStartTime(time.Now())
|
|
oc.updateCounts(oc.operators)
|
|
|
|
var step operator.OpStep
|
|
if region := oc.cluster.GetRegion(op.RegionID()); region != nil {
|
|
if step = op.Check(region); step != nil {
|
|
oc.SendScheduleCommand(region, step, DispatchFromCreate)
|
|
}
|
|
}
|
|
|
|
heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op, time: oc.getNextPushOperatorTime(step, time.Now())})
|
|
return true
|
|
}
|
|
|
|
// RemoveOperator removes a operator from the running operators.
|
|
func (oc *OperatorController) RemoveOperator(op *operator.Operator) (found bool) {
|
|
oc.Lock()
|
|
defer oc.Unlock()
|
|
return oc.removeOperatorLocked(op)
|
|
}
|
|
|
|
// GetOperatorStatus gets the operator and its status with the specify id.
|
|
func (oc *OperatorController) GetOperatorStatus(id uint64) *OperatorWithStatus {
|
|
oc.Lock()
|
|
defer oc.Unlock()
|
|
if op, ok := oc.operators[id]; ok {
|
|
return &OperatorWithStatus{
|
|
Op: op,
|
|
Status: schedulerpb.OperatorStatus_RUNNING,
|
|
}
|
|
}
|
|
return oc.opRecords.Get(id)
|
|
}
|
|
|
|
func (oc *OperatorController) removeOperatorLocked(op *operator.Operator) bool {
|
|
regionID := op.RegionID()
|
|
if cur := oc.operators[regionID]; cur == op {
|
|
delete(oc.operators, regionID)
|
|
oc.updateCounts(oc.operators)
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// GetOperator gets a operator from the given region.
|
|
func (oc *OperatorController) GetOperator(regionID uint64) *operator.Operator {
|
|
oc.RLock()
|
|
defer oc.RUnlock()
|
|
return oc.operators[regionID]
|
|
}
|
|
|
|
// GetOperators gets operators from the running operators.
|
|
func (oc *OperatorController) GetOperators() []*operator.Operator {
|
|
oc.RLock()
|
|
defer oc.RUnlock()
|
|
|
|
operators := make([]*operator.Operator, 0, len(oc.operators))
|
|
for _, op := range oc.operators {
|
|
operators = append(operators, op)
|
|
}
|
|
|
|
return operators
|
|
}
|
|
|
|
// SendScheduleCommand sends a command to the region.
|
|
func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step operator.OpStep, source string) {
|
|
log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step), zap.String("source", source))
|
|
switch st := step.(type) {
|
|
case operator.TransferLeader:
|
|
cmd := &schedulerpb.RegionHeartbeatResponse{
|
|
TransferLeader: &schedulerpb.TransferLeader{
|
|
Peer: region.GetStorePeer(st.ToStore),
|
|
},
|
|
}
|
|
oc.hbStreams.SendMsg(region, cmd)
|
|
case operator.AddPeer:
|
|
if region.GetStorePeer(st.ToStore) != nil {
|
|
// The newly added peer is pending.
|
|
return
|
|
}
|
|
cmd := &schedulerpb.RegionHeartbeatResponse{
|
|
ChangePeer: &schedulerpb.ChangePeer{
|
|
ChangeType: eraftpb.ConfChangeType_AddNode,
|
|
Peer: &metapb.Peer{
|
|
Id: st.PeerID,
|
|
StoreId: st.ToStore,
|
|
},
|
|
},
|
|
}
|
|
oc.hbStreams.SendMsg(region, cmd)
|
|
case operator.RemovePeer:
|
|
cmd := &schedulerpb.RegionHeartbeatResponse{
|
|
ChangePeer: &schedulerpb.ChangePeer{
|
|
ChangeType: eraftpb.ConfChangeType_RemoveNode,
|
|
Peer: region.GetStorePeer(st.FromStore),
|
|
},
|
|
}
|
|
oc.hbStreams.SendMsg(region, cmd)
|
|
default:
|
|
log.Error("unknown operator step", zap.Reflect("step", step))
|
|
}
|
|
}
|
|
|
|
// updateCounts updates resource counts using current pending operators.
|
|
func (oc *OperatorController) updateCounts(operators map[uint64]*operator.Operator) {
|
|
for k := range oc.counts {
|
|
delete(oc.counts, k)
|
|
}
|
|
for _, op := range operators {
|
|
oc.counts[op.Kind()]++
|
|
}
|
|
}
|
|
|
|
// OperatorCount gets the count of operators filtered by mask.
|
|
func (oc *OperatorController) OperatorCount(mask operator.OpKind) uint64 {
|
|
oc.RLock()
|
|
defer oc.RUnlock()
|
|
var total uint64
|
|
for k, count := range oc.counts {
|
|
if k&mask != 0 {
|
|
total += count
|
|
}
|
|
}
|
|
return total
|
|
}
|
|
|
|
// SetOperator is only used for test.
|
|
func (oc *OperatorController) SetOperator(op *operator.Operator) {
|
|
oc.Lock()
|
|
defer oc.Unlock()
|
|
oc.operators[op.RegionID()] = op
|
|
}
|
|
|
|
// OperatorWithStatus records the operator and its status.
|
|
type OperatorWithStatus struct {
|
|
Op *operator.Operator
|
|
Status schedulerpb.OperatorStatus
|
|
}
|
|
|
|
// MarshalJSON returns the status of operator as a JSON string
|
|
func (o *OperatorWithStatus) MarshalJSON() ([]byte, error) {
|
|
return []byte(`"` + fmt.Sprintf("status: %s, operator: %s", o.Status.String(), o.Op.String()) + `"`), nil
|
|
}
|
|
|
|
// OperatorRecords remains the operator and its status for a while.
|
|
type OperatorRecords struct {
|
|
ttl *cache.TTL
|
|
}
|
|
|
|
const operatorStatusRemainTime = 10 * time.Minute
|
|
|
|
// NewOperatorRecords returns a OperatorRecords.
|
|
func NewOperatorRecords(ctx context.Context) *OperatorRecords {
|
|
return &OperatorRecords{
|
|
ttl: cache.NewTTL(ctx, time.Minute, operatorStatusRemainTime),
|
|
}
|
|
}
|
|
|
|
// Get gets the operator and its status.
|
|
func (o *OperatorRecords) Get(id uint64) *OperatorWithStatus {
|
|
v, exist := o.ttl.Get(id)
|
|
if !exist {
|
|
return nil
|
|
}
|
|
return v.(*OperatorWithStatus)
|
|
}
|
|
|
|
// Put puts the operator and its status.
|
|
func (o *OperatorRecords) Put(op *operator.Operator, status schedulerpb.OperatorStatus) {
|
|
id := op.RegionID()
|
|
record := &OperatorWithStatus{
|
|
Op: op,
|
|
Status: status,
|
|
}
|
|
o.ttl.Put(id, record)
|
|
}
|