talent-plan-tinykv/kv/coprocessor/closure_exec.go

740 lines
18 KiB
Go
Raw Normal View History

package coprocessor
import (
"bytes"
"encoding/binary"
"fmt"
"math"
"sort"
"github.com/juju/errors"
"github.com/pingcap-incubator/tinykv/kv/coprocessor/rowcodec"
"github.com/pingcap-incubator/tinykv/kv/storage"
"github.com/pingcap-incubator/tinykv/kv/transaction/mvcc"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
mockpkg "github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tipb/go-tipb"
)
const chunkMaxRows = 1024
const (
pkColNotExists = iota
pkColIsSigned
pkColIsUnsigned
)
// buildClosureExecutor build a closureExecutor for the DAGRequest.
// Currently the composition of executors are:
// tableScan|indexScan [selection] [topN | limit | agg]
func (svr *CopHandler) buildClosureExecutor(dagCtx *dagContext, dagReq *tipb.DAGRequest) (*closureExecutor, error) {
ce, err := svr.newClosureExecutor(dagCtx, dagReq)
if err != nil {
return nil, errors.Trace(err)
}
executors := dagReq.Executors
scanExec := executors[0]
if scanExec.Tp == tipb.ExecType_TypeTableScan {
ce.processor = &tableScanProcessor{closureExecutor: ce}
} else {
ce.processor = &indexScanProcessor{closureExecutor: ce}
}
if len(executors) == 1 {
return ce, nil
}
if secondExec := executors[1]; secondExec.Tp == tipb.ExecType_TypeSelection {
ce.selectionCtx.conditions, err = convertToExprs(ce.sc, ce.fieldTps, secondExec.Selection.Conditions)
if err != nil {
return nil, errors.Trace(err)
}
ce.processor = &selectionProcessor{closureExecutor: ce}
}
lastExecutor := executors[len(executors)-1]
switch lastExecutor.Tp {
case tipb.ExecType_TypeLimit:
ce.limit = int(lastExecutor.Limit.Limit)
case tipb.ExecType_TypeTopN:
err = svr.buildTopNProcessor(ce, lastExecutor.TopN)
case tipb.ExecType_TypeAggregation:
err = svr.buildHashAggProcessor(ce, dagCtx, lastExecutor.Aggregation)
case tipb.ExecType_TypeStreamAgg:
err = svr.buildStreamAggProcessor(ce, dagCtx, executors)
case tipb.ExecType_TypeSelection:
ce.processor = &selectionProcessor{closureExecutor: ce}
default:
panic("unknown executor type " + lastExecutor.Tp.String())
}
if err != nil {
return nil, err
}
return ce, nil
}
func convertToExprs(sc *stmtctx.StatementContext, fieldTps []*types.FieldType, pbExprs []*tipb.Expr) ([]expression.Expression, error) {
exprs := make([]expression.Expression, 0, len(pbExprs))
for _, expr := range pbExprs {
e, err := expression.PBToExpr(expr, fieldTps, sc)
if err != nil {
return nil, errors.Trace(err)
}
exprs = append(exprs, e)
}
return exprs, nil
}
func (svr *CopHandler) newClosureExecutor(dagCtx *dagContext, dagReq *tipb.DAGRequest) (*closureExecutor, error) {
e := &closureExecutor{
evalContext: dagCtx.evalCtx,
reader: dagCtx.reader,
outputOff: dagReq.OutputOffsets,
startTS: dagCtx.startTS,
limit: math.MaxInt64,
}
seCtx := mockpkg.NewContext()
seCtx.GetSessionVars().StmtCtx = e.sc
e.seCtx = seCtx
executors := dagReq.Executors
scanExec := executors[0]
switch scanExec.Tp {
case tipb.ExecType_TypeTableScan:
tblScan := executors[0].TblScan
e.unique = true
e.scanCtx.desc = tblScan.Desc
case tipb.ExecType_TypeIndexScan:
idxScan := executors[0].IdxScan
e.unique = idxScan.GetUnique()
e.scanCtx.desc = idxScan.Desc
e.initIdxScanCtx()
default:
panic(fmt.Sprintf("unknown first executor type %s", executors[0].Tp))
}
ranges, err := svr.extractKVRanges(dagCtx.reader, dagCtx.keyRanges, e.scanCtx.desc)
if err != nil {
return nil, errors.Trace(err)
}
e.kvRanges = ranges
e.scanCtx.chk = chunk.NewChunkWithCapacity(e.fieldTps, 32)
if e.idxScanCtx == nil {
e.scanCtx.decoder, err = e.evalContext.newRowDecoder()
if err != nil {
return nil, errors.Trace(err)
}
}
return e, nil
}
func (e *closureExecutor) initIdxScanCtx() {
e.idxScanCtx = new(idxScanCtx)
e.idxScanCtx.columnLen = len(e.columnInfos)
e.idxScanCtx.pkStatus = pkColNotExists
lastColumn := e.columnInfos[len(e.columnInfos)-1]
// The PKHandle column info has been collected in ctx.
if lastColumn.GetPkHandle() {
if mysql.HasUnsignedFlag(uint(lastColumn.GetFlag())) {
e.idxScanCtx.pkStatus = pkColIsUnsigned
} else {
e.idxScanCtx.pkStatus = pkColIsSigned
}
e.idxScanCtx.columnLen--
} else if lastColumn.ColumnId == model.ExtraHandleID {
e.idxScanCtx.pkStatus = pkColIsSigned
e.idxScanCtx.columnLen--
}
}
func (svr *CopHandler) isCountAgg(pbAgg *tipb.Aggregation) bool {
if len(pbAgg.AggFunc) == 1 && len(pbAgg.GroupBy) == 0 {
aggFunc := pbAgg.AggFunc[0]
if aggFunc.Tp == tipb.ExprType_Count && len(aggFunc.Children) == 1 {
return true
}
}
return false
}
func (svr *CopHandler) tryBuildCountProcessor(e *closureExecutor, executors []*tipb.Executor) (bool, error) {
if len(executors) > 2 {
return false, nil
}
agg := executors[1].Aggregation
if !svr.isCountAgg(agg) {
return false, nil
}
child := agg.AggFunc[0].Children[0]
switch child.Tp {
case tipb.ExprType_ColumnRef:
_, idx, err := codec.DecodeInt(child.Val)
if err != nil {
return false, errors.Trace(err)
}
e.aggCtx.col = e.columnInfos[idx]
if e.aggCtx.col.PkHandle {
e.processor = &countStarProcessor{skipVal: skipVal(true), closureExecutor: e}
} else {
e.processor = &countColumnProcessor{closureExecutor: e}
}
case tipb.ExprType_Null, tipb.ExprType_ScalarFunc:
return false, nil
default:
e.processor = &countStarProcessor{skipVal: skipVal(true), closureExecutor: e}
}
return true, nil
}
func (svr *CopHandler) buildTopNProcessor(e *closureExecutor, topN *tipb.TopN) error {
heap, conds, err := svr.getTopNInfo(e.evalContext, topN)
if err != nil {
return errors.Trace(err)
}
ctx := &topNCtx{
heap: heap,
orderByExprs: conds,
sortRow: e.newTopNSortRow(),
}
e.topNCtx = ctx
e.processor = &topNProcessor{closureExecutor: e}
return nil
}
func (svr *CopHandler) buildHashAggProcessor(e *closureExecutor, ctx *dagContext, agg *tipb.Aggregation) error {
aggs, groupBys, err := svr.getAggInfo(ctx, agg)
if err != nil {
return err
}
e.processor = &hashAggProcessor{
closureExecutor: e,
aggExprs: aggs,
groupByExprs: groupBys,
groups: map[string]struct{}{},
groupKeys: nil,
aggCtxsMap: map[string][]*aggregation.AggEvaluateContext{},
}
return nil
}
func (svr *CopHandler) buildStreamAggProcessor(e *closureExecutor, ctx *dagContext, executors []*tipb.Executor) error {
ok, err := svr.tryBuildCountProcessor(e, executors)
if err != nil || ok {
return err
}
return svr.buildHashAggProcessor(e, ctx, executors[len(executors)-1].Aggregation)
}
// closureExecutor is an execution engine that flatten the DAGRequest.Executors to a single closure `processor` that
// process key/value pairs. We can define many closures for different kinds of requests, try to use the specially
// optimized one for some frequently used query.
type closureExecutor struct {
*evalContext
reader storage.StorageReader
outputOff []uint32
seCtx sessionctx.Context
kvRanges []kv.KeyRange
startTS uint64
ignoreLock bool
lockChecked bool
scanCtx scanCtx
idxScanCtx *idxScanCtx
selectionCtx selectionCtx
aggCtx aggCtx
topNCtx *topNCtx
rowCount int
unique bool
limit int
oldChunks []tipb.Chunk
oldRowBuf []byte
processor closureProcessor
}
type closureProcessor interface {
// Process accepts key and value, should not keep reference to them.
// Returns ScanBreak will break the scan loop.
Process(key, value []byte) error
// SkipValue returns if we can skip the value.
SkipValue() bool
Finish() error
}
type scanCtx struct {
count int
limit int
chk *chunk.Chunk
desc bool
decoder *rowcodec.Decoder
}
type idxScanCtx struct {
pkStatus int
columnLen int
}
type aggCtx struct {
col *tipb.ColumnInfo
}
type selectionCtx struct {
conditions []expression.Expression
}
type topNCtx struct {
heap *topNHeap
orderByExprs []expression.Expression
sortRow *sortRow
}
func (e *closureExecutor) execute() ([]tipb.Chunk, error) {
txn := mvcc.MvccTxn{Reader: e.reader, StartTS: e.startTS}
for _, ran := range e.kvRanges {
if e.unique && ran.IsPoint() {
val, err := txn.GetValue(ran.StartKey)
if err != nil {
return nil, errors.Trace(err)
}
if len(val) == 0 {
continue
}
err = e.processor.Process(ran.StartKey, val)
if err != nil {
return nil, errors.Trace(err)
}
} else {
if e.scanCtx.desc {
panic("do not support desc scan")
} else {
scanner := mvcc.NewScanner(ran.StartKey, &txn)
for {
key, val, err := scanner.Next()
if err != nil {
scanner.Close()
return nil, err
}
if key == nil && val == nil {
break
}
if bytes.Compare(key, ran.EndKey) >= 0 {
break
}
err = e.processor.Process(key, val)
if err != nil {
if err == ScanBreak {
break
}
scanner.Close()
return nil, err
}
}
scanner.Close()
}
}
if e.rowCount == e.limit {
break
}
}
err := e.processor.Finish()
return e.oldChunks, err
}
type countStarProcessor struct {
skipVal
*closureExecutor
}
// countStarProcess is used for `count(*)`.
func (e *countStarProcessor) Process(key, value []byte) error {
e.rowCount++
return nil
}
func (e *countStarProcessor) Finish() error {
return e.countFinish()
}
// countFinish is used for `count(*)`.
func (e *closureExecutor) countFinish() error {
d := types.NewIntDatum(int64(e.rowCount))
rowData, err := codec.EncodeValue(e.sc, nil, d)
if err != nil {
return errors.Trace(err)
}
e.oldChunks = appendRow(e.oldChunks, rowData, 0)
return nil
}
type countColumnProcessor struct {
skipVal
*closureExecutor
}
func (e *countColumnProcessor) Process(key, value []byte) error {
if e.idxScanCtx != nil {
values, _, err := tablecodec.CutIndexKeyNew(key, e.idxScanCtx.columnLen)
if err != nil {
return errors.Trace(err)
}
if values[0][0] != codec.NilFlag {
e.rowCount++
}
} else {
// Since the handle value doesn't affect the count result, we don't need to decode the handle.
isNull, err := e.scanCtx.decoder.ColumnIsNull(value, e.aggCtx.col.ColumnId, e.aggCtx.col.DefaultVal)
if err != nil {
return errors.Trace(err)
}
if !isNull {
e.rowCount++
}
}
return nil
}
func (e *countColumnProcessor) Finish() error {
return e.countFinish()
}
type skipVal bool
func (s skipVal) SkipValue() bool {
return bool(s)
}
// ScanBreak is returnd by ScanFunc to break the scan loop.
var ScanBreak = errors.New("scan break")
type tableScanProcessor struct {
skipVal
*closureExecutor
}
func (e *tableScanProcessor) Process(key, value []byte) error {
if e.rowCount == e.limit {
return ScanBreak
}
e.rowCount++
err := e.tableScanProcessCore(key, value)
if e.scanCtx.chk.NumRows() == chunkMaxRows {
err = e.chunkToOldChunk(e.scanCtx.chk)
}
return err
}
func (e *tableScanProcessor) Finish() error {
return e.scanFinish()
}
func (e *closureExecutor) processCore(key, value []byte) error {
if e.idxScanCtx != nil {
return e.indexScanProcessCore(key, value)
} else {
return e.tableScanProcessCore(key, value)
}
}
func (e *closureExecutor) hasSelection() bool {
return len(e.selectionCtx.conditions) > 0
}
func (e *closureExecutor) processSelection() (gotRow bool, err error) {
chk := e.scanCtx.chk
row := chk.GetRow(chk.NumRows() - 1)
gotRow = true
for _, expr := range e.selectionCtx.conditions {
d, err := expr.Eval(row)
if err != nil {
return false, errors.Trace(err)
}
if d.IsNull() {
gotRow = false
} else {
i, err := d.ToBool(e.sc)
if err != nil {
return false, errors.Trace(err)
}
gotRow = i != 0
}
if !gotRow {
chk.TruncateTo(chk.NumRows() - 1)
break
}
}
return
}
func (e *closureExecutor) tableScanProcessCore(key, value []byte) error {
handle, err := tablecodec.DecodeRowKey(key)
if err != nil {
return errors.Trace(err)
}
err = e.scanCtx.decoder.Decode(value, handle, e.scanCtx.chk)
if err != nil {
return errors.Trace(err)
}
return nil
}
func (e *closureExecutor) scanFinish() error {
return e.chunkToOldChunk(e.scanCtx.chk)
}
type indexScanProcessor struct {
skipVal
*closureExecutor
}
func (e *indexScanProcessor) Process(key, value []byte) error {
if e.rowCount == e.limit {
return ScanBreak
}
e.rowCount++
err := e.indexScanProcessCore(key, value)
if e.scanCtx.chk.NumRows() == chunkMaxRows {
err = e.chunkToOldChunk(e.scanCtx.chk)
}
return err
}
func (e *indexScanProcessor) Finish() error {
return e.scanFinish()
}
func (e *closureExecutor) indexScanProcessCore(key, value []byte) error {
colLen := e.idxScanCtx.columnLen
pkStatus := e.idxScanCtx.pkStatus
chk := e.scanCtx.chk
values, b, err := tablecodec.CutIndexKeyNew(key, colLen)
if err != nil {
return errors.Trace(err)
}
decoder := codec.NewDecoder(chk, e.sc.TimeZone)
for i, colVal := range values {
_, err = decoder.DecodeOne(colVal, i, e.fieldTps[i])
if err != nil {
return errors.Trace(err)
}
}
if len(b) > 0 {
if pkStatus != pkColNotExists {
_, err = decoder.DecodeOne(b, colLen, e.fieldTps[colLen])
if err != nil {
return errors.Trace(err)
}
}
} else if pkStatus != pkColNotExists {
chk.AppendInt64(colLen, int64(binary.BigEndian.Uint64(value)))
}
return nil
}
func (e *closureExecutor) chunkToOldChunk(chk *chunk.Chunk) error {
var oldRow []types.Datum
for i := 0; i < chk.NumRows(); i++ {
oldRow = oldRow[:0]
for _, outputOff := range e.outputOff {
d := chk.GetRow(i).GetDatum(int(outputOff), e.fieldTps[outputOff])
oldRow = append(oldRow, d)
}
var err error
e.oldRowBuf, err = codec.EncodeValue(e.sc, e.oldRowBuf[:0], oldRow...)
if err != nil {
return errors.Trace(err)
}
e.oldChunks = appendRow(e.oldChunks, e.oldRowBuf, i)
}
chk.Reset()
return nil
}
type selectionProcessor struct {
skipVal
*closureExecutor
}
func (e *selectionProcessor) Process(key, value []byte) error {
if e.rowCount == e.limit {
return ScanBreak
}
err := e.processCore(key, value)
if err != nil {
return errors.Trace(err)
}
gotRow, err := e.processSelection()
if err != nil {
return err
}
if gotRow {
e.rowCount++
if e.scanCtx.chk.NumRows() == chunkMaxRows {
err = e.chunkToOldChunk(e.scanCtx.chk)
}
}
return err
}
func (e *selectionProcessor) Finish() error {
return e.scanFinish()
}
type topNProcessor struct {
skipVal
*closureExecutor
}
func (e *topNProcessor) Process(key, value []byte) (err error) {
if err = e.processCore(key, value); err != nil {
return err
}
if e.hasSelection() {
gotRow, err1 := e.processSelection()
if err1 != nil || !gotRow {
return err1
}
}
ctx := e.topNCtx
row := e.scanCtx.chk.GetRow(0)
for i, expr := range ctx.orderByExprs {
ctx.sortRow.key[i], err = expr.Eval(row)
if err != nil {
return errors.Trace(err)
}
}
e.scanCtx.chk.Reset()
if ctx.heap.tryToAddRow(ctx.sortRow) {
ctx.sortRow.data[0] = append([]byte{}, key...)
ctx.sortRow.data[1] = append([]byte{}, value...)
ctx.sortRow = e.newTopNSortRow()
}
return errors.Trace(ctx.heap.err)
}
func (e *closureExecutor) newTopNSortRow() *sortRow {
return &sortRow{
key: make([]types.Datum, len(e.evalContext.columnInfos)),
data: make([][]byte, 2),
}
}
func (e *topNProcessor) Finish() error {
ctx := e.topNCtx
sort.Sort(&ctx.heap.topNSorter)
chk := e.scanCtx.chk
for _, row := range ctx.heap.rows {
err := e.processCore(row.data[0], row.data[1])
if err != nil {
return err
}
if chk.NumRows() == chunkMaxRows {
if err = e.chunkToOldChunk(chk); err != nil {
return errors.Trace(err)
}
}
}
return e.chunkToOldChunk(chk)
}
type hashAggProcessor struct {
skipVal
*closureExecutor
aggExprs []aggregation.Aggregation
groupByExprs []expression.Expression
groups map[string]struct{}
groupKeys [][]byte
aggCtxsMap map[string][]*aggregation.AggEvaluateContext
}
func (e *hashAggProcessor) Process(key, value []byte) (err error) {
err = e.processCore(key, value)
if err != nil {
return err
}
if e.hasSelection() {
gotRow, err1 := e.processSelection()
if err1 != nil || !gotRow {
return err1
}
}
row := e.scanCtx.chk.GetRow(e.scanCtx.chk.NumRows() - 1)
gk, err := e.getGroupKey(row)
if _, ok := e.groups[string(gk)]; !ok {
e.groups[string(gk)] = struct{}{}
e.groupKeys = append(e.groupKeys, gk)
}
// Update aggregate expressions.
aggCtxs := e.getContexts(gk)
for i, agg := range e.aggExprs {
err = agg.Update(aggCtxs[i], e.sc, row)
if err != nil {
return errors.Trace(err)
}
}
e.scanCtx.chk.Reset()
return nil
}
func (e *hashAggProcessor) getGroupKey(row chunk.Row) ([]byte, error) {
length := len(e.groupByExprs)
if length == 0 {
return nil, nil
}
key := make([]byte, 0, 32)
for _, item := range e.groupByExprs {
v, err := item.Eval(row)
if err != nil {
return nil, errors.Trace(err)
}
b, err := codec.EncodeValue(e.sc, nil, v)
if err != nil {
return nil, errors.Trace(err)
}
key = append(key, b...)
}
return key, nil
}
func (e *hashAggProcessor) getContexts(groupKey []byte) []*aggregation.AggEvaluateContext {
aggCtxs, ok := e.aggCtxsMap[string(groupKey)]
if !ok {
aggCtxs = make([]*aggregation.AggEvaluateContext, 0, len(e.aggExprs))
for _, agg := range e.aggExprs {
aggCtxs = append(aggCtxs, agg.CreateContext(e.sc))
}
e.aggCtxsMap[string(groupKey)] = aggCtxs
}
return aggCtxs
}
func (e *hashAggProcessor) Finish() error {
for i, gk := range e.groupKeys {
aggCtxs := e.getContexts(gk)
e.oldRowBuf = e.oldRowBuf[:0]
for i, agg := range e.aggExprs {
partialResults := agg.GetPartialResult(aggCtxs[i])
var err error
e.oldRowBuf, err = codec.EncodeValue(e.sc, e.oldRowBuf, partialResults...)
if err != nil {
return err
}
}
e.oldRowBuf = append(e.oldRowBuf, gk...)
e.oldChunks = appendRow(e.oldChunks, e.oldRowBuf, i)
}
return nil
}