// Copyright 2018 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. package schedule import ( "container/heap" "context" "fmt" "sync" "time" "github.com/pingcap-incubator/tinykv/proto/pkg/eraftpb" "github.com/pingcap-incubator/tinykv/proto/pkg/metapb" "github.com/pingcap-incubator/tinykv/proto/pkg/schedulerpb" "github.com/pingcap-incubator/tinykv/scheduler/pkg/cache" "github.com/pingcap-incubator/tinykv/scheduler/server/core" "github.com/pingcap-incubator/tinykv/scheduler/server/schedule/operator" "github.com/pingcap-incubator/tinykv/scheduler/server/schedule/opt" "github.com/pingcap/log" "go.uber.org/zap" ) // The source of dispatched region. const ( DispatchFromHeartBeat = "heartbeat" DispatchFromNotifierQueue = "active push" DispatchFromCreate = "create" ) var ( slowNotifyInterval = 5 * time.Second fastNotifyInterval = 2 * time.Second ) // HeartbeatStreams is an interface of async region heartbeat. type HeartbeatStreams interface { SendMsg(region *core.RegionInfo, msg *schedulerpb.RegionHeartbeatResponse) } // OperatorController is used to limit the speed of scheduling. type OperatorController struct { sync.RWMutex ctx context.Context cluster opt.Cluster operators map[uint64]*operator.Operator hbStreams HeartbeatStreams counts map[operator.OpKind]uint64 opRecords *OperatorRecords opNotifierQueue operatorQueue } // NewOperatorController creates a OperatorController. func NewOperatorController(ctx context.Context, cluster opt.Cluster, hbStreams HeartbeatStreams) *OperatorController { return &OperatorController{ ctx: ctx, cluster: cluster, operators: make(map[uint64]*operator.Operator), hbStreams: hbStreams, counts: make(map[operator.OpKind]uint64), opRecords: NewOperatorRecords(ctx), opNotifierQueue: make(operatorQueue, 0), } } // Ctx returns a context which will be canceled once RaftCluster is stopped. // For now, it is only used to control the lifetime of TTL cache in schedulers. func (oc *OperatorController) Ctx() context.Context { return oc.ctx } // Dispatch is used to dispatch the operator of a region. func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) { // Check existed operator. if op := oc.GetOperator(region.GetID()); op != nil { timeout := op.IsTimeout() if step := op.Check(region); step != nil && !timeout { // When the "source" is heartbeat, the region may have a newer // confver than the region that the operator holds. In this case, // the operator is stale, and will not be executed even we would // have sent it to TiKV servers. Here, we just cancel it. origin := op.RegionEpoch() latest := region.GetRegionEpoch() changes := latest.GetConfVer() - origin.GetConfVer() if source == DispatchFromHeartBeat && changes > uint64(op.ConfVerChanged(region)) { if oc.RemoveOperator(op) { log.Info("stale operator", zap.Uint64("region-id", region.GetID()), zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op), zap.Uint64("diff", changes)) oc.opRecords.Put(op, schedulerpb.OperatorStatus_CANCEL) } return } oc.SendScheduleCommand(region, step, source) return } if op.IsFinish() && oc.RemoveOperator(op) { log.Info("operator finish", zap.Uint64("region-id", region.GetID()), zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op)) oc.opRecords.Put(op, schedulerpb.OperatorStatus_SUCCESS) } else if timeout && oc.RemoveOperator(op) { log.Info("operator timeout", zap.Uint64("region-id", region.GetID()), zap.Duration("takes", op.RunningTime()), zap.Reflect("operator", op)) oc.opRecords.Put(op, schedulerpb.OperatorStatus_TIMEOUT) } } } func (oc *OperatorController) getNextPushOperatorTime(step operator.OpStep, now time.Time) time.Time { nextTime := slowNotifyInterval switch step.(type) { case operator.TransferLeader: nextTime = fastNotifyInterval } return now.Add(nextTime) } // AddOperator adds operators to the running operators. func (oc *OperatorController) AddOperator(ops ...*operator.Operator) bool { oc.Lock() defer oc.Unlock() if !oc.checkAddOperator(ops...) { for _, op := range ops { oc.opRecords.Put(op, schedulerpb.OperatorStatus_CANCEL) } return false } for _, op := range ops { oc.addOperatorLocked(op) } return true } // checkAddOperator checks if the operator can be added. // There are several situations that cannot be added: // - There is no such region in the cluster // - The epoch of the operator and the epoch of the corresponding region are no longer consistent. // - The region already has a higher priority or same priority operator. func (oc *OperatorController) checkAddOperator(ops ...*operator.Operator) bool { for _, op := range ops { region := oc.cluster.GetRegion(op.RegionID()) if region == nil { log.Debug("region not found, cancel add operator", zap.Uint64("region-id", op.RegionID())) return false } if region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() || region.GetRegionEpoch().GetConfVer() != op.RegionEpoch().GetConfVer() { log.Debug("region epoch not match, cancel add operator", zap.Uint64("region-id", op.RegionID()), zap.Reflect("old", region.GetRegionEpoch()), zap.Reflect("new", op.RegionEpoch())) return false } if old := oc.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) { log.Debug("already have operator, cancel add operator", zap.Uint64("region-id", op.RegionID()), zap.Reflect("old", old)) return false } } return true } func isHigherPriorityOperator(new, old *operator.Operator) bool { return new.GetPriorityLevel() > old.GetPriorityLevel() } func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool { regionID := op.RegionID() log.Info("add operator", zap.Uint64("region-id", regionID), zap.Reflect("operator", op)) // If there is an old operator, replace it. The priority should be checked // already. if old, ok := oc.operators[regionID]; ok { _ = oc.removeOperatorLocked(old) log.Info("replace old operator", zap.Uint64("region-id", regionID), zap.Duration("takes", old.RunningTime()), zap.Reflect("operator", old)) oc.opRecords.Put(old, schedulerpb.OperatorStatus_REPLACE) } oc.operators[regionID] = op op.SetStartTime(time.Now()) oc.updateCounts(oc.operators) var step operator.OpStep if region := oc.cluster.GetRegion(op.RegionID()); region != nil { if step = op.Check(region); step != nil { oc.SendScheduleCommand(region, step, DispatchFromCreate) } } heap.Push(&oc.opNotifierQueue, &operatorWithTime{op: op, time: oc.getNextPushOperatorTime(step, time.Now())}) return true } // RemoveOperator removes a operator from the running operators. func (oc *OperatorController) RemoveOperator(op *operator.Operator) (found bool) { oc.Lock() defer oc.Unlock() return oc.removeOperatorLocked(op) } // GetOperatorStatus gets the operator and its status with the specify id. func (oc *OperatorController) GetOperatorStatus(id uint64) *OperatorWithStatus { oc.Lock() defer oc.Unlock() if op, ok := oc.operators[id]; ok { return &OperatorWithStatus{ Op: op, Status: schedulerpb.OperatorStatus_RUNNING, } } return oc.opRecords.Get(id) } func (oc *OperatorController) removeOperatorLocked(op *operator.Operator) bool { regionID := op.RegionID() if cur := oc.operators[regionID]; cur == op { delete(oc.operators, regionID) oc.updateCounts(oc.operators) return true } return false } // GetOperator gets a operator from the given region. func (oc *OperatorController) GetOperator(regionID uint64) *operator.Operator { oc.RLock() defer oc.RUnlock() return oc.operators[regionID] } // GetOperators gets operators from the running operators. func (oc *OperatorController) GetOperators() []*operator.Operator { oc.RLock() defer oc.RUnlock() operators := make([]*operator.Operator, 0, len(oc.operators)) for _, op := range oc.operators { operators = append(operators, op) } return operators } // SendScheduleCommand sends a command to the region. func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step operator.OpStep, source string) { log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step), zap.String("source", source)) switch st := step.(type) { case operator.TransferLeader: cmd := &schedulerpb.RegionHeartbeatResponse{ TransferLeader: &schedulerpb.TransferLeader{ Peer: region.GetStorePeer(st.ToStore), }, } oc.hbStreams.SendMsg(region, cmd) case operator.AddPeer: if region.GetStorePeer(st.ToStore) != nil { // The newly added peer is pending. return } cmd := &schedulerpb.RegionHeartbeatResponse{ ChangePeer: &schedulerpb.ChangePeer{ ChangeType: eraftpb.ConfChangeType_AddNode, Peer: &metapb.Peer{ Id: st.PeerID, StoreId: st.ToStore, }, }, } oc.hbStreams.SendMsg(region, cmd) case operator.RemovePeer: cmd := &schedulerpb.RegionHeartbeatResponse{ ChangePeer: &schedulerpb.ChangePeer{ ChangeType: eraftpb.ConfChangeType_RemoveNode, Peer: region.GetStorePeer(st.FromStore), }, } oc.hbStreams.SendMsg(region, cmd) default: log.Error("unknown operator step", zap.Reflect("step", step)) } } // updateCounts updates resource counts using current pending operators. func (oc *OperatorController) updateCounts(operators map[uint64]*operator.Operator) { for k := range oc.counts { delete(oc.counts, k) } for _, op := range operators { oc.counts[op.Kind()]++ } } // OperatorCount gets the count of operators filtered by mask. func (oc *OperatorController) OperatorCount(mask operator.OpKind) uint64 { oc.RLock() defer oc.RUnlock() var total uint64 for k, count := range oc.counts { if k&mask != 0 { total += count } } return total } // SetOperator is only used for test. func (oc *OperatorController) SetOperator(op *operator.Operator) { oc.Lock() defer oc.Unlock() oc.operators[op.RegionID()] = op } // OperatorWithStatus records the operator and its status. type OperatorWithStatus struct { Op *operator.Operator Status schedulerpb.OperatorStatus } // MarshalJSON returns the status of operator as a JSON string func (o *OperatorWithStatus) MarshalJSON() ([]byte, error) { return []byte(`"` + fmt.Sprintf("status: %s, operator: %s", o.Status.String(), o.Op.String()) + `"`), nil } // OperatorRecords remains the operator and its status for a while. type OperatorRecords struct { ttl *cache.TTL } const operatorStatusRemainTime = 10 * time.Minute // NewOperatorRecords returns a OperatorRecords. func NewOperatorRecords(ctx context.Context) *OperatorRecords { return &OperatorRecords{ ttl: cache.NewTTL(ctx, time.Minute, operatorStatusRemainTime), } } // Get gets the operator and its status. func (o *OperatorRecords) Get(id uint64) *OperatorWithStatus { v, exist := o.ttl.Get(id) if !exist { return nil } return v.(*OperatorWithStatus) } // Put puts the operator and its status. func (o *OperatorRecords) Put(op *operator.Operator, status schedulerpb.OperatorStatus) { id := op.RegionID() record := &OperatorWithStatus{ Op: op, Status: status, } o.ttl.Put(id, record) }