From ceddc106f092c2e6dcf171cfbcbf5e4dfe96cd28 Mon Sep 17 00:00:00 2001 From: huang-b Date: Tue, 24 Nov 2020 10:59:03 +0800 Subject: [PATCH] executor,planner/core,util/plancodec: extend executor.ShuffleExec and planner.core.PhysicalShuffle to support multiple data sources (#20942) --- cmd/explaintest/r/window_function.result | 12 +- executor/benchmark_test.go | 11 +- executor/builder.go | 61 +++++---- executor/shuffle.go | 117 +++++++++++------- planner/core/explain.go | 7 +- planner/core/initialize.go | 6 +- planner/core/physical_plans.go | 19 +-- planner/core/plan.go | 10 +- planner/core/resolve_indices.go | 14 ++- planner/core/stringer.go | 4 +- planner/core/testdata/plan_suite_out.json | 2 +- .../testdata/plan_suite_unexported_out.json | 10 +- util/plancodec/id.go | 4 +- 13 files changed, 168 insertions(+), 109 deletions(-) diff --git a/cmd/explaintest/r/window_function.result b/cmd/explaintest/r/window_function.result index 545b8ec831..9c9f39790b 100644 --- a/cmd/explaintest/r/window_function.result +++ b/cmd/explaintest/r/window_function.result @@ -66,7 +66,7 @@ Projection_7 10000.00 root Column#6 explain select sum(a) over(partition by a order by b) from t; id estRows task access object operator info Projection_7 10000.00 root Column#6 -└─Shuffle_12 10000.00 root execution info: concurrency:4, data source:TableReader_10 +└─Shuffle_12 10000.00 root execution info: concurrency:4, data sources:[TableReader_10] └─Window_8 10000.00 root sum(cast(test.t.a, decimal(32,0) BINARY))->Column#6 over(partition by test.t.a order by test.t.b range between unbounded preceding and current row) └─Sort_11 10000.00 root test.t.a, test.t.b └─TableReader_10 10000.00 root data:TableFullScan_9 @@ -74,7 +74,7 @@ Projection_7 10000.00 root Column#6 explain select sum(a) over(partition by a order by b rows unbounded preceding) from t; id estRows task access object operator info Projection_7 10000.00 root Column#6 -└─Shuffle_12 10000.00 root execution info: concurrency:4, data source:TableReader_10 +└─Shuffle_12 10000.00 root execution info: concurrency:4, data sources:[TableReader_10] └─Window_8 10000.00 root sum(cast(test.t.a, decimal(32,0) BINARY))->Column#6 over(partition by test.t.a order by test.t.b rows between unbounded preceding and current row) └─Sort_11 10000.00 root test.t.a, test.t.b └─TableReader_10 10000.00 root data:TableFullScan_9 @@ -82,7 +82,7 @@ Projection_7 10000.00 root Column#6 explain select sum(a) over(partition by a order by b rows between 1 preceding and 1 following) from t; id estRows task access object operator info Projection_7 10000.00 root Column#6 -└─Shuffle_12 10000.00 root execution info: concurrency:4, data source:TableReader_10 +└─Shuffle_12 10000.00 root execution info: concurrency:4, data sources:[TableReader_10] └─Window_8 10000.00 root sum(cast(test.t.a, decimal(32,0) BINARY))->Column#6 over(partition by test.t.a order by test.t.b rows between 1 preceding and 1 following) └─Sort_11 10000.00 root test.t.a, test.t.b └─TableReader_10 10000.00 root data:TableFullScan_9 @@ -90,7 +90,7 @@ Projection_7 10000.00 root Column#6 explain select sum(a) over(partition by a order by b range between 1 preceding and 1 following) from t; id estRows task access object operator info Projection_7 10000.00 root Column#6 -└─Shuffle_12 10000.00 root execution info: concurrency:4, data source:TableReader_10 +└─Shuffle_12 10000.00 root execution info: concurrency:4, data sources:[TableReader_10] └─Window_8 10000.00 root sum(cast(test.t.a, decimal(32,0) BINARY))->Column#6 over(partition by test.t.a order by test.t.b range between 1 preceding and 1 following) └─Sort_11 10000.00 root test.t.a, test.t.b └─TableReader_10 10000.00 root data:TableFullScan_9 @@ -98,7 +98,7 @@ Projection_7 10000.00 root Column#6 explain select sum(a) over(partition by a order by c range between interval '2:30' minute_second preceding and interval '2:30' minute_second following) from t; id estRows task access object operator info Projection_7 10000.00 root Column#6 -└─Shuffle_12 10000.00 root execution info: concurrency:4, data source:TableReader_10 +└─Shuffle_12 10000.00 root execution info: concurrency:4, data sources:[TableReader_10] └─Window_8 10000.00 root sum(cast(test.t.a, decimal(32,0) BINARY))->Column#6 over(partition by test.t.a order by test.t.c range between interval "2:30" "MINUTE_SECOND" preceding and interval "2:30" "MINUTE_SECOND" following) └─Sort_11 10000.00 root test.t.a, test.t.c └─TableReader_10 10000.00 root data:TableFullScan_9 @@ -119,7 +119,7 @@ analyze table t1; explain select sum(a) over(partition by b) from t1; id estRows task access object operator info Projection_7 3.00 root Column#4 -└─Shuffle_12 3.00 root execution info: concurrency:2, data source:TableReader_10 +└─Shuffle_12 3.00 root execution info: concurrency:2, data sources:[TableReader_10] └─Window_8 3.00 root sum(cast(test.t1.a, decimal(32,0) BINARY))->Column#4 over(partition by test.t1.b) └─Sort_11 3.00 root test.t1.b └─TableReader_10 3.00 root data:TableFullScan_9 diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index d710deb59b..06b0089728 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/expression/aggregation" "github.com/pingcap/tidb/planner/core" + plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/planner/property" "github.com/pingcap/tidb/planner/util" "github.com/pingcap/tidb/sessionctx" @@ -497,11 +498,11 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f } plan = core.PhysicalShuffle{ - Concurrency: concurrency, - Tail: tail, - DataSource: src, - SplitterType: core.PartitionHashSplitterType, - HashByItems: byItems, + Concurrency: concurrency, + Tails: []plannercore.PhysicalPlan{tail}, + DataSources: []plannercore.PhysicalPlan{src}, + SplitterType: core.PartitionHashSplitterType, + HashByItemArrays: [][]expression.Expression{byItems}, }.Init(ctx, nil, 0) plan.SetChildren(win) } else { diff --git a/executor/builder.go b/executor/builder.go index ad128c559b..50f17eec7d 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -216,8 +216,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor { return b.buildWindow(v) case *plannercore.PhysicalShuffle: return b.buildShuffle(v) - case *plannercore.PhysicalShuffleDataSourceStub: - return b.buildShuffleDataSourceStub(v) + case *plannercore.PhysicalShuffleReceiverStub: + return b.buildShuffleReceiverStub(v) case *plannercore.SQLBindPlan: return b.buildSQLBindExec(v) case *plannercore.SplitRegion: @@ -3670,40 +3670,57 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec func (b *executorBuilder) buildShuffle(v *plannercore.PhysicalShuffle) *ShuffleExec { base := newBaseExecutor(b.ctx, v.Schema(), v.ID()) - shuffle := &ShuffleExec{baseExecutor: base, - concurrency: v.Concurrency, + shuffle := &ShuffleExec{ + baseExecutor: base, + concurrency: v.Concurrency, } switch v.SplitterType { case plannercore.PartitionHashSplitterType: - shuffle.splitter = &partitionHashSplitter{ - byItems: v.HashByItems, - numWorkers: shuffle.concurrency, + splitters := make([]partitionSplitter, len(v.HashByItemArrays)) + for i, hashByItemArray := range v.HashByItemArrays { + hashSplitter := &partitionHashSplitter{ + byItems: hashByItemArray, + numWorkers: shuffle.concurrency, + } + copy(hashSplitter.byItems, hashByItemArray) + splitters[i] = hashSplitter } + shuffle.splitters = splitters default: panic("Not implemented. Should not reach here.") } - shuffle.dataSource = b.build(v.DataSource) - if b.err != nil { - return nil + shuffle.dataSources = make([]Executor, len(v.DataSources)) + for i, dataSource := range v.DataSources { + shuffle.dataSources[i] = b.build(dataSource) + if b.err != nil { + return nil + } } - // head & tail of physical plans' chain within "partition". - var head, tail = v.Children()[0], v.Tail - + head := v.Children()[0] shuffle.workers = make([]*shuffleWorker, shuffle.concurrency) for i := range shuffle.workers { - w := &shuffleWorker{ - baseExecutor: newBaseExecutor(b.ctx, v.DataSource.Schema(), v.DataSource.ID()), + receivers := make([]*shuffleReceiver, len(v.DataSources)) + for j, dataSource := range v.DataSources { + receivers[j] = &shuffleReceiver{ + baseExecutor: newBaseExecutor(b.ctx, dataSource.Schema(), dataSource.ID()), + } } - stub := plannercore.PhysicalShuffleDataSourceStub{ - Worker: (unsafe.Pointer)(w), - }.Init(b.ctx, v.DataSource.Stats(), v.DataSource.SelectBlockOffset(), nil) - stub.SetSchema(v.DataSource.Schema()) + w := &shuffleWorker{ + receivers: receivers, + } + + for j, dataSource := range v.DataSources { + stub := plannercore.PhysicalShuffleReceiverStub{ + Receiver: (unsafe.Pointer)(receivers[j]), + }.Init(b.ctx, dataSource.Stats(), dataSource.SelectBlockOffset(), nil) + stub.SetSchema(dataSource.Schema()) + v.Tails[j].SetChildren(stub) + } - tail.SetChildren(stub) w.childExec = b.build(head) if b.err != nil { return nil @@ -3715,8 +3732,8 @@ func (b *executorBuilder) buildShuffle(v *plannercore.PhysicalShuffle) *ShuffleE return shuffle } -func (b *executorBuilder) buildShuffleDataSourceStub(v *plannercore.PhysicalShuffleDataSourceStub) *shuffleWorker { - return (*shuffleWorker)(v.Worker) +func (b *executorBuilder) buildShuffleReceiverStub(v *plannercore.PhysicalShuffleReceiverStub) *shuffleReceiver { + return (*shuffleReceiver)(v.Receiver) } func (b *executorBuilder) buildSQLBindExec(v *plannercore.SQLBindPlan) Executor { diff --git a/executor/shuffle.go b/executor/shuffle.go index fd4ab99130..bb611e6fa0 100644 --- a/executor/shuffle.go +++ b/executor/shuffle.go @@ -29,10 +29,11 @@ import ( ) // ShuffleExec is the executor to run other executors in a parallel manner. -// 1. It fetches chunks from `DataSource`. -// 2. It splits tuples from `DataSource` into N partitions (Only "split by hash" is implemented so far). -// 3. It invokes N workers in parallel, assign each partition as input to each worker and execute child executors. -// 4. It collects outputs from each worker, then sends outputs to its parent. +// 1. It fetches chunks from M `DataSources` (value of M depends on the actual executor, e.g. M = 1 for WindowExec, M = 2 for MergeJoinExec). +// 2. It splits tuples from each `DataSource` into N partitions (Only "split by hash" is implemented so far). +// 3. It invokes N workers in parallel, each one has M `receiver` to receive partitions from `DataSources` +// 4. It assigns partitions received as input to each worker and executes child executors. +// 5. It collects outputs from each worker, then sends outputs to its parent. // // +-------------+ // +-------| Main Thread | @@ -80,8 +81,9 @@ type ShuffleExec struct { prepared bool executed bool - splitter partitionSplitter - dataSource Executor + // each dataSource has a corresponding spliter + splitters []partitionSplitter + dataSources []Executor finishCh chan struct{} outputCh chan *shuffleOutput @@ -95,8 +97,11 @@ type shuffleOutput struct { // Open implements the Executor Open interface. func (e *ShuffleExec) Open(ctx context.Context) error { - if err := e.dataSource.Open(ctx); err != nil { - return err + for _, s := range e.dataSources { + if err := s.Open(ctx); err != nil { + return err + } + } if err := e.baseExecutor.Open(ctx); err != nil { return err @@ -109,8 +114,11 @@ func (e *ShuffleExec) Open(ctx context.Context) error { for _, w := range e.workers { w.finishCh = e.finishCh - w.inputCh = make(chan *chunk.Chunk, 1) - w.inputHolderCh = make(chan *chunk.Chunk, 1) + for _, r := range w.receivers { + r.inputCh = make(chan *chunk.Chunk, 1) + r.inputHolderCh = make(chan *chunk.Chunk, 1) + } + w.outputCh = e.outputCh w.outputHolderCh = make(chan *chunk.Chunk, 1) @@ -118,7 +126,9 @@ func (e *ShuffleExec) Open(ctx context.Context) error { return err } - w.inputHolderCh <- newFirstChunk(e.dataSource) + for i, r := range w.receivers { + r.inputHolderCh <- newFirstChunk(e.dataSources[i]) + } w.outputHolderCh <- newFirstChunk(e) } @@ -129,15 +139,19 @@ func (e *ShuffleExec) Open(ctx context.Context) error { func (e *ShuffleExec) Close() error { if !e.prepared { for _, w := range e.workers { - close(w.inputHolderCh) - close(w.inputCh) + for _, r := range w.receivers { + close(r.inputHolderCh) + close(r.inputCh) + } close(w.outputHolderCh) } close(e.outputCh) } close(e.finishCh) for _, w := range e.workers { - for range w.inputCh { + for _, r := range w.receivers { + for range r.inputCh { + } } } for range e.outputCh { // workers exit before `e.outputCh` is closed. @@ -150,16 +164,27 @@ func (e *ShuffleExec) Close() error { e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) } - err := e.dataSource.Close() - err1 := e.baseExecutor.Close() - if err != nil { - return errors.Trace(err) + // close dataSources + errArr := make([]error, len(e.dataSources)) + for i, dataSource := range e.dataSources { + errArr[i] = dataSource.Close() } - return errors.Trace(err1) + // close baseExecutor + baseCloseErr := e.baseExecutor.Close() + // check close error + for _, err := range errArr { + if err != nil { + return errors.Trace(err) + } + } + return errors.Trace(baseCloseErr) } func (e *ShuffleExec) prepare4ParallelExec(ctx context.Context) { - go e.fetchDataAndSplit(ctx) + // create a goroutine for each dataSource to fetch and split data + for i := range e.dataSources { + go e.fetchDataAndSplit(ctx, i) + } waitGroup := &sync.WaitGroup{} waitGroup.Add(len(e.workers)) @@ -213,25 +238,25 @@ func recoveryShuffleExec(output chan *shuffleOutput, r interface{}) { logutil.BgLogger().Error("shuffle panicked", zap.Error(err), zap.Stack("stack")) } -func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context) { +func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context, dataSourceIndex int) { var ( err error workerIndices []int ) results := make([]*chunk.Chunk, len(e.workers)) - chk := newFirstChunk(e.dataSource) + chk := newFirstChunk(e.dataSources[dataSourceIndex]) defer func() { if r := recover(); r != nil { recoveryShuffleExec(e.outputCh, r) } for _, w := range e.workers { - close(w.inputCh) + close(w.receivers[dataSourceIndex].inputCh) } }() for { - err = Next(ctx, e.dataSource, chk) + err = Next(ctx, e.dataSources[dataSourceIndex], chk) if err != nil { e.outputCh <- &shuffleOutput{err: err} return @@ -240,7 +265,7 @@ func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context) { break } - workerIndices, err = e.splitter.split(e.ctx, chk, workerIndices) + workerIndices, err = e.splitters[dataSourceIndex].split(e.ctx, chk, workerIndices) if err != nil { e.outputCh <- &shuffleOutput{err: err} return @@ -254,47 +279,40 @@ func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context) { select { case <-e.finishCh: return - case results[workerIdx] = <-w.inputHolderCh: + case results[workerIdx] = <-w.receivers[dataSourceIndex].inputHolderCh: break } } results[workerIdx].AppendRow(chk.GetRow(i)) if results[workerIdx].IsFull() { - w.inputCh <- results[workerIdx] + w.receivers[dataSourceIndex].inputCh <- results[workerIdx] results[workerIdx] = nil } } } for i, w := range e.workers { if results[i] != nil { - w.inputCh <- results[i] + w.receivers[dataSourceIndex].inputCh <- results[i] results[i] = nil } } } -var _ Executor = &shuffleWorker{} +var _ Executor = &shuffleReceiver{} -// shuffleWorker is the multi-thread worker executing child executors within "partition". -type shuffleWorker struct { +// shuffleReceiver receives chunk from dataSource through inputCh +type shuffleReceiver struct { baseExecutor - childExec Executor finishCh <-chan struct{} executed bool - // Workers get inputs from dataFetcherThread by `inputCh`, - // and output results to main thread by `outputCh`. - // `inputHolderCh` and `outputHolderCh` are "Chunk Holder" channels of `inputCh` and `outputCh` respectively, - // which give the `*Chunk` back, to implement the data transport in a streaming manner. - inputCh chan *chunk.Chunk - inputHolderCh chan *chunk.Chunk - outputCh chan *shuffleOutput - outputHolderCh chan *chunk.Chunk + inputCh chan *chunk.Chunk + inputHolderCh chan *chunk.Chunk } // Open implements the Executor Open interface. -func (e *shuffleWorker) Open(ctx context.Context) error { +func (e *shuffleReceiver) Open(ctx context.Context) error { if err := e.baseExecutor.Open(ctx); err != nil { return err } @@ -303,13 +321,13 @@ func (e *shuffleWorker) Open(ctx context.Context) error { } // Close implements the Executor Close interface. -func (e *shuffleWorker) Close() error { +func (e *shuffleReceiver) Close() error { return errors.Trace(e.baseExecutor.Close()) } // Next implements the Executor Next interface. // It is called by `Tail` executor within "shuffle", to fetch data from `DataSource` by `inputCh`. -func (e *shuffleWorker) Next(ctx context.Context, req *chunk.Chunk) error { +func (e *shuffleReceiver) Next(ctx context.Context, req *chunk.Chunk) error { req.Reset() if e.executed { return nil @@ -329,6 +347,19 @@ func (e *shuffleWorker) Next(ctx context.Context, req *chunk.Chunk) error { } } +// shuffleWorker is the multi-thread worker executing child executors within "partition". +type shuffleWorker struct { + childExec Executor + + finishCh <-chan struct{} + + // each receiver corresponse to a dataSource + receivers []*shuffleReceiver + + outputCh chan *shuffleOutput + outputHolderCh chan *chunk.Chunk +} + func (e *shuffleWorker) run(ctx context.Context, waitGroup *sync.WaitGroup) { defer func() { if r := recover(); r != nil { diff --git a/planner/core/explain.go b/planner/core/explain.go index 0df1abd644..0940e6b428 100644 --- a/planner/core/explain.go +++ b/planner/core/explain.go @@ -788,8 +788,13 @@ func (p *PhysicalWindow) ExplainInfo() string { // ExplainInfo implements Plan interface. func (p *PhysicalShuffle) ExplainInfo() string { + explainIds := make([]fmt.Stringer, len(p.DataSources)) + for i := range p.DataSources { + explainIds[i] = p.DataSources[i].ExplainID() + } + buffer := bytes.NewBufferString("") - fmt.Fprintf(buffer, "execution info: concurrency:%v, data source:%v", p.Concurrency, p.DataSource.ExplainID()) + fmt.Fprintf(buffer, "execution info: concurrency:%v, data sources:%v", p.Concurrency, explainIds) return buffer.String() } diff --git a/planner/core/initialize.go b/planner/core/initialize.go index 2926268f1e..e156365249 100644 --- a/planner/core/initialize.go +++ b/planner/core/initialize.go @@ -216,9 +216,9 @@ func (p PhysicalShuffle) Init(ctx sessionctx.Context, stats *property.StatsInfo, return &p } -// Init initializes PhysicalShuffleDataSourceStub. -func (p PhysicalShuffleDataSourceStub) Init(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalShuffleDataSourceStub { - p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeShuffleDataSourceStub, &p, offset) +// Init initializes PhysicalShuffleReceiverStub. +func (p PhysicalShuffleReceiverStub) Init(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalShuffleReceiverStub { + p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeShuffleReceiver, &p, offset) p.childrenReqProps = props p.stats = stats return &p diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 1e68984b6e..a94249e59d 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -60,7 +60,7 @@ var ( _ PhysicalPlan = &PhysicalUnionScan{} _ PhysicalPlan = &PhysicalWindow{} _ PhysicalPlan = &PhysicalShuffle{} - _ PhysicalPlan = &PhysicalShuffleDataSourceStub{} + _ PhysicalPlan = &PhysicalShuffleReceiverStub{} _ PhysicalPlan = &BatchPointGetPlan{} ) @@ -1235,7 +1235,7 @@ func (p *PhysicalWindow) ExtractCorrelatedCols() []*expression.CorrelatedColumn } // PhysicalShuffle represents a shuffle plan. -// `Tail` and `DataSource` are the last plan within and the first plan following the "shuffle", respectively, +// `Tails` and `DataSources` are the last plan within and the first plan following the "shuffle", respectively, // to build the child executors chain. // Take `Window` operator for example: // Shuffle -> Window -> Sort -> DataSource, will be separated into: @@ -1246,11 +1246,12 @@ type PhysicalShuffle struct { basePhysicalPlan Concurrency int - Tail PhysicalPlan - DataSource PhysicalPlan + Tails []PhysicalPlan + DataSources []PhysicalPlan SplitterType PartitionSplitterType - HashByItems []expression.Expression + // each DataSource has an array of HashByItems + HashByItemArrays [][]expression.Expression } // PartitionSplitterType is the type of `Shuffle` executor splitter, which splits data source into partitions. @@ -1261,13 +1262,13 @@ const ( PartitionHashSplitterType = iota ) -// PhysicalShuffleDataSourceStub represents a data source stub of `PhysicalShuffle`, +// PhysicalShuffleReceiverStub represents a receiver stub of `PhysicalShuffle`, // and actually, is executed by `executor.shuffleWorker`. -type PhysicalShuffleDataSourceStub struct { +type PhysicalShuffleReceiverStub struct { physicalSchemaProducer - // Worker points to `executor.shuffleWorker`. - Worker unsafe.Pointer + // Worker points to `executor.shuffleReceiver`. + Receiver unsafe.Pointer } // CollectPlanStatsVersion uses to collect the statistics version of the plan. diff --git a/planner/core/plan.go b/planner/core/plan.go index a603815524..7cc495247c 100644 --- a/planner/core/plan.go +++ b/planner/core/plan.go @@ -126,11 +126,11 @@ func optimizeByShuffle4Window(pp *PhysicalWindow, ctx sessionctx.Context) *Physi } reqProp := &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64} shuffle := PhysicalShuffle{ - Concurrency: concurrency, - Tail: tail, - DataSource: dataSource, - SplitterType: PartitionHashSplitterType, - HashByItems: byItems, + Concurrency: concurrency, + Tails: []PhysicalPlan{tail}, + DataSources: []PhysicalPlan{dataSource}, + SplitterType: PartitionHashSplitterType, + HashByItemArrays: [][]expression.Expression{byItems}, }.Init(ctx, pp.statsInfo(), pp.SelectBlockOffset(), reqProp) return shuffle } diff --git a/planner/core/resolve_indices.go b/planner/core/resolve_indices.go index 8f4a35f221..f87e7c79e8 100644 --- a/planner/core/resolve_indices.go +++ b/planner/core/resolve_indices.go @@ -498,11 +498,15 @@ func (p *PhysicalShuffle) ResolveIndices() (err error) { if err != nil { return err } - for i := range p.HashByItems { - // "Shuffle" get value of items from `DataSource`, other than children[0]. - p.HashByItems[i], err = p.HashByItems[i].ResolveIndices(p.DataSource.Schema()) - if err != nil { - return err + // There may be one or more DataSource + for i := range p.HashByItemArrays { + // Each DataSource has an array of HashByItems + for j := range p.HashByItemArrays[i] { + // "Shuffle" get value of items from `DataSource`, other than children[0]. + p.HashByItemArrays[i][j], err = p.HashByItemArrays[i][j].ResolveIndices(p.DataSources[i].Schema()) + if err != nil { + return err + } } } return err diff --git a/planner/core/stringer.go b/planner/core/stringer.go index b005d097eb..a7a5e02e95 100644 --- a/planner/core/stringer.go +++ b/planner/core/stringer.go @@ -266,8 +266,8 @@ func toString(in Plan, strs []string, idxs []int) ([]string, []int) { str = fmt.Sprintf("Window(%s)", x.ExplainInfo()) case *PhysicalShuffle: str = fmt.Sprintf("Partition(%s)", x.ExplainInfo()) - case *PhysicalShuffleDataSourceStub: - str = fmt.Sprintf("PartitionDataSourceStub(%s)", x.ExplainInfo()) + case *PhysicalShuffleReceiverStub: + str = fmt.Sprintf("PartitionReceiverStub(%s)", x.ExplainInfo()) case *PointGetPlan: str = fmt.Sprintf("PointGet(") if x.IndexInfo != nil { diff --git a/planner/core/testdata/plan_suite_out.json b/planner/core/testdata/plan_suite_out.json index d2c771a448..30f825afe3 100644 --- a/planner/core/testdata/plan_suite_out.json +++ b/planner/core/testdata/plan_suite_out.json @@ -1882,7 +1882,7 @@ }, { "SQL": "select lead(a, 1) over (partition by b) as c from t", - "Best": "TableReader(Table(t))->Sort->Window(lead(test.t.a, 1)->Column#14 over(partition by test.t.b))->Partition(execution info: concurrency:4, data source:TableReader_10)->Projection" + "Best": "TableReader(Table(t))->Sort->Window(lead(test.t.a, 1)->Column#14 over(partition by test.t.b))->Partition(execution info: concurrency:4, data sources:[TableReader_10])->Projection" } ] }, diff --git a/planner/core/testdata/plan_suite_unexported_out.json b/planner/core/testdata/plan_suite_unexported_out.json index 776ef1757e..d0e73580b0 100644 --- a/planner/core/testdata/plan_suite_unexported_out.json +++ b/planner/core/testdata/plan_suite_unexported_out.json @@ -257,8 +257,8 @@ "Name": "TestWindowParallelFunction", "Cases": [ "TableReader(Table(t))->Window(avg(cast(test.t.a, decimal(65,30) BINARY))->Column#14 over(partition by test.t.a))->Projection", - "TableReader(Table(t))->Sort->Window(avg(cast(test.t.a, decimal(65,30) BINARY))->Column#14 over(partition by test.t.b))->Partition(execution info: concurrency:4, data source:TableReader_10)->Projection", - "IndexReader(Index(t.f)[[NULL,+inf]])->Projection->Sort->Window(avg(cast(Column#16, decimal(24,4) BINARY))->Column#17 over(partition by Column#15))->Partition(execution info: concurrency:4, data source:Projection_8)->Projection", + "TableReader(Table(t))->Sort->Window(avg(cast(test.t.a, decimal(65,30) BINARY))->Column#14 over(partition by test.t.b))->Partition(execution info: concurrency:4, data sources:[TableReader_10])->Projection", + "IndexReader(Index(t.f)[[NULL,+inf]])->Projection->Sort->Window(avg(cast(Column#16, decimal(24,4) BINARY))->Column#17 over(partition by Column#15))->Partition(execution info: concurrency:4, data sources:[Projection_8])->Projection", "TableReader(Table(t))->Sort->Window(avg(cast(test.t.a, decimal(65,30) BINARY))->Column#14 over(order by test.t.a, test.t.b desc range between unbounded preceding and current row))->Projection", "TableReader(Table(t))->Window(avg(cast(test.t.a, decimal(65,30) BINARY))->Column#14 over(partition by test.t.a))->Projection", "[planner:1054]Unknown column 'z' in 'field list'", @@ -300,7 +300,7 @@ "[planner:1210]Incorrect arguments to nth_value", "[planner:1210]Incorrect arguments to ntile", "IndexReader(Index(t.f)[[NULL,+inf]])->Window(ntile()->Column#14 over())->Projection", - "TableReader(Table(t))->Sort->Window(avg(cast(test.t.a, decimal(65,30) BINARY))->Column#14 over(partition by test.t.b))->Partition(execution info: concurrency:4, data source:TableReader_10)->Projection", + "TableReader(Table(t))->Sort->Window(avg(cast(test.t.a, decimal(65,30) BINARY))->Column#14 over(partition by test.t.b))->Partition(execution info: concurrency:4, data sources:[TableReader_10])->Projection", "TableReader(Table(t))->Window(nth_value(test.t.i_date, 1)->Column#14 over())->Projection", "TableReader(Table(t))->Window(sum(cast(test.t.b, decimal(65,0) BINARY))->Column#15, sum(cast(test.t.c, decimal(65,0) BINARY))->Column#16 over(order by test.t.a range between unbounded preceding and current row))->Projection", "[planner:3593]You cannot use the window function 'sum' in this context.'", @@ -308,7 +308,7 @@ "[planner:3593]You cannot use the window function 'row_number' in this context.'", "TableReader(Table(t))->Sort->Window(sum(cast(test.t.c, decimal(65,0) BINARY))->Column#17 over(partition by test.t.a order by test.t.c range between unbounded preceding and current row))->Sort->Window(sum(cast(test.t.b, decimal(65,0) BINARY))->Column#18 over(order by test.t.a, test.t.b, test.t.c range between unbounded preceding and current row))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#19 over(partition by test.t.a order by test.t.b range between unbounded preceding and current row))->Window(sum(cast(test.t.d, decimal(65,0) BINARY))->Column#20 over())->Projection", "[planner:3587]Window 'w1' with RANGE N PRECEDING/FOLLOWING frame requires exactly one ORDER BY expression, of numeric or temporal type", - "TableReader(Table(t))->Sort->Window(dense_rank()->Column#14 over(partition by test.t.b order by test.t.a desc, test.t.b desc))->Partition(execution info: concurrency:4, data source:TableReader_9)->Projection", + "TableReader(Table(t))->Sort->Window(dense_rank()->Column#14 over(partition by test.t.b order by test.t.a desc, test.t.b desc))->Partition(execution info: concurrency:4, data sources:[TableReader_9])->Projection", "[planner:3587]Window 'w1' with RANGE N PRECEDING/FOLLOWING frame requires exactly one ORDER BY expression, of numeric or temporal type", "[planner:3585]Window 'w1': frame end cannot be UNBOUNDED PRECEDING.", "[planner:3584]Window 'w1': frame start cannot be UNBOUNDED FOLLOWING.", @@ -322,7 +322,7 @@ "[planner:3586]Window 'w': frame start or end is negative, NULL or of non-integral type", "[planner:3586]Window 'w': frame start or end is negative, NULL or of non-integral type", "[planner:3586]Window 'w': frame start or end is negative, NULL or of non-integral type", - "TableReader(Table(t))->Sort->Window(row_number()->Column#14 over(partition by test.t.b))->Partition(execution info: concurrency:4, data source:TableReader_10)->Projection" + "TableReader(Table(t))->Sort->Window(row_number()->Column#14 over(partition by test.t.b))->Partition(execution info: concurrency:4, data sources:[TableReader_10])->Projection" ] }, { diff --git a/util/plancodec/id.go b/util/plancodec/id.go index ec27e00da4..ea52b48250 100644 --- a/util/plancodec/id.go +++ b/util/plancodec/id.go @@ -92,8 +92,8 @@ const ( TypeWindow = "Window" // TypeShuffle is the type of Shuffle. TypeShuffle = "Shuffle" - // TypeShuffleDataSourceStub is the type of Shuffle. - TypeShuffleDataSourceStub = "ShuffleDataSourceStub" + // TypeShuffleReceiver is the type of Shuffle. + TypeShuffleReceiver = "ShuffleReceiver" // TypeTiKVSingleGather is the type of TiKVSingleGather. TypeTiKVSingleGather = "TiKVSingleGather" // TypeIndexMerge is the type of IndexMergeReader