executor,planner/core,util/plancodec: extend executor.ShuffleExec and planner.core.PhysicalShuffle to support multiple data sources (#20942)
This commit is contained in:
@ -66,7 +66,7 @@ Projection_7 10000.00 root Column#6
|
||||
explain select sum(a) over(partition by a order by b) from t;
|
||||
id estRows task access object operator info
|
||||
Projection_7 10000.00 root Column#6
|
||||
└─Shuffle_12 10000.00 root execution info: concurrency:4, data source:TableReader_10
|
||||
└─Shuffle_12 10000.00 root execution info: concurrency:4, data sources:[TableReader_10]
|
||||
└─Window_8 10000.00 root sum(cast(test.t.a, decimal(32,0) BINARY))->Column#6 over(partition by test.t.a order by test.t.b range between unbounded preceding and current row)
|
||||
└─Sort_11 10000.00 root test.t.a, test.t.b
|
||||
└─TableReader_10 10000.00 root data:TableFullScan_9
|
||||
@ -74,7 +74,7 @@ Projection_7 10000.00 root Column#6
|
||||
explain select sum(a) over(partition by a order by b rows unbounded preceding) from t;
|
||||
id estRows task access object operator info
|
||||
Projection_7 10000.00 root Column#6
|
||||
└─Shuffle_12 10000.00 root execution info: concurrency:4, data source:TableReader_10
|
||||
└─Shuffle_12 10000.00 root execution info: concurrency:4, data sources:[TableReader_10]
|
||||
└─Window_8 10000.00 root sum(cast(test.t.a, decimal(32,0) BINARY))->Column#6 over(partition by test.t.a order by test.t.b rows between unbounded preceding and current row)
|
||||
└─Sort_11 10000.00 root test.t.a, test.t.b
|
||||
└─TableReader_10 10000.00 root data:TableFullScan_9
|
||||
@ -82,7 +82,7 @@ Projection_7 10000.00 root Column#6
|
||||
explain select sum(a) over(partition by a order by b rows between 1 preceding and 1 following) from t;
|
||||
id estRows task access object operator info
|
||||
Projection_7 10000.00 root Column#6
|
||||
└─Shuffle_12 10000.00 root execution info: concurrency:4, data source:TableReader_10
|
||||
└─Shuffle_12 10000.00 root execution info: concurrency:4, data sources:[TableReader_10]
|
||||
└─Window_8 10000.00 root sum(cast(test.t.a, decimal(32,0) BINARY))->Column#6 over(partition by test.t.a order by test.t.b rows between 1 preceding and 1 following)
|
||||
└─Sort_11 10000.00 root test.t.a, test.t.b
|
||||
└─TableReader_10 10000.00 root data:TableFullScan_9
|
||||
@ -90,7 +90,7 @@ Projection_7 10000.00 root Column#6
|
||||
explain select sum(a) over(partition by a order by b range between 1 preceding and 1 following) from t;
|
||||
id estRows task access object operator info
|
||||
Projection_7 10000.00 root Column#6
|
||||
└─Shuffle_12 10000.00 root execution info: concurrency:4, data source:TableReader_10
|
||||
└─Shuffle_12 10000.00 root execution info: concurrency:4, data sources:[TableReader_10]
|
||||
└─Window_8 10000.00 root sum(cast(test.t.a, decimal(32,0) BINARY))->Column#6 over(partition by test.t.a order by test.t.b range between 1 preceding and 1 following)
|
||||
└─Sort_11 10000.00 root test.t.a, test.t.b
|
||||
└─TableReader_10 10000.00 root data:TableFullScan_9
|
||||
@ -98,7 +98,7 @@ Projection_7 10000.00 root Column#6
|
||||
explain select sum(a) over(partition by a order by c range between interval '2:30' minute_second preceding and interval '2:30' minute_second following) from t;
|
||||
id estRows task access object operator info
|
||||
Projection_7 10000.00 root Column#6
|
||||
└─Shuffle_12 10000.00 root execution info: concurrency:4, data source:TableReader_10
|
||||
└─Shuffle_12 10000.00 root execution info: concurrency:4, data sources:[TableReader_10]
|
||||
└─Window_8 10000.00 root sum(cast(test.t.a, decimal(32,0) BINARY))->Column#6 over(partition by test.t.a order by test.t.c range between interval "2:30" "MINUTE_SECOND" preceding and interval "2:30" "MINUTE_SECOND" following)
|
||||
└─Sort_11 10000.00 root test.t.a, test.t.c
|
||||
└─TableReader_10 10000.00 root data:TableFullScan_9
|
||||
@ -119,7 +119,7 @@ analyze table t1;
|
||||
explain select sum(a) over(partition by b) from t1;
|
||||
id estRows task access object operator info
|
||||
Projection_7 3.00 root Column#4
|
||||
└─Shuffle_12 3.00 root execution info: concurrency:2, data source:TableReader_10
|
||||
└─Shuffle_12 3.00 root execution info: concurrency:2, data sources:[TableReader_10]
|
||||
└─Window_8 3.00 root sum(cast(test.t1.a, decimal(32,0) BINARY))->Column#4 over(partition by test.t1.b)
|
||||
└─Sort_11 3.00 root test.t1.b
|
||||
└─TableReader_10 3.00 root data:TableFullScan_9
|
||||
|
||||
@ -30,6 +30,7 @@ import (
|
||||
"github.com/pingcap/tidb/expression"
|
||||
"github.com/pingcap/tidb/expression/aggregation"
|
||||
"github.com/pingcap/tidb/planner/core"
|
||||
plannercore "github.com/pingcap/tidb/planner/core"
|
||||
"github.com/pingcap/tidb/planner/property"
|
||||
"github.com/pingcap/tidb/planner/util"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
@ -497,11 +498,11 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
|
||||
}
|
||||
|
||||
plan = core.PhysicalShuffle{
|
||||
Concurrency: concurrency,
|
||||
Tail: tail,
|
||||
DataSource: src,
|
||||
SplitterType: core.PartitionHashSplitterType,
|
||||
HashByItems: byItems,
|
||||
Concurrency: concurrency,
|
||||
Tails: []plannercore.PhysicalPlan{tail},
|
||||
DataSources: []plannercore.PhysicalPlan{src},
|
||||
SplitterType: core.PartitionHashSplitterType,
|
||||
HashByItemArrays: [][]expression.Expression{byItems},
|
||||
}.Init(ctx, nil, 0)
|
||||
plan.SetChildren(win)
|
||||
} else {
|
||||
|
||||
@ -216,8 +216,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
|
||||
return b.buildWindow(v)
|
||||
case *plannercore.PhysicalShuffle:
|
||||
return b.buildShuffle(v)
|
||||
case *plannercore.PhysicalShuffleDataSourceStub:
|
||||
return b.buildShuffleDataSourceStub(v)
|
||||
case *plannercore.PhysicalShuffleReceiverStub:
|
||||
return b.buildShuffleReceiverStub(v)
|
||||
case *plannercore.SQLBindPlan:
|
||||
return b.buildSQLBindExec(v)
|
||||
case *plannercore.SplitRegion:
|
||||
@ -3670,40 +3670,57 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec
|
||||
|
||||
func (b *executorBuilder) buildShuffle(v *plannercore.PhysicalShuffle) *ShuffleExec {
|
||||
base := newBaseExecutor(b.ctx, v.Schema(), v.ID())
|
||||
shuffle := &ShuffleExec{baseExecutor: base,
|
||||
concurrency: v.Concurrency,
|
||||
shuffle := &ShuffleExec{
|
||||
baseExecutor: base,
|
||||
concurrency: v.Concurrency,
|
||||
}
|
||||
|
||||
switch v.SplitterType {
|
||||
case plannercore.PartitionHashSplitterType:
|
||||
shuffle.splitter = &partitionHashSplitter{
|
||||
byItems: v.HashByItems,
|
||||
numWorkers: shuffle.concurrency,
|
||||
splitters := make([]partitionSplitter, len(v.HashByItemArrays))
|
||||
for i, hashByItemArray := range v.HashByItemArrays {
|
||||
hashSplitter := &partitionHashSplitter{
|
||||
byItems: hashByItemArray,
|
||||
numWorkers: shuffle.concurrency,
|
||||
}
|
||||
copy(hashSplitter.byItems, hashByItemArray)
|
||||
splitters[i] = hashSplitter
|
||||
}
|
||||
shuffle.splitters = splitters
|
||||
default:
|
||||
panic("Not implemented. Should not reach here.")
|
||||
}
|
||||
|
||||
shuffle.dataSource = b.build(v.DataSource)
|
||||
if b.err != nil {
|
||||
return nil
|
||||
shuffle.dataSources = make([]Executor, len(v.DataSources))
|
||||
for i, dataSource := range v.DataSources {
|
||||
shuffle.dataSources[i] = b.build(dataSource)
|
||||
if b.err != nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// head & tail of physical plans' chain within "partition".
|
||||
var head, tail = v.Children()[0], v.Tail
|
||||
|
||||
head := v.Children()[0]
|
||||
shuffle.workers = make([]*shuffleWorker, shuffle.concurrency)
|
||||
for i := range shuffle.workers {
|
||||
w := &shuffleWorker{
|
||||
baseExecutor: newBaseExecutor(b.ctx, v.DataSource.Schema(), v.DataSource.ID()),
|
||||
receivers := make([]*shuffleReceiver, len(v.DataSources))
|
||||
for j, dataSource := range v.DataSources {
|
||||
receivers[j] = &shuffleReceiver{
|
||||
baseExecutor: newBaseExecutor(b.ctx, dataSource.Schema(), dataSource.ID()),
|
||||
}
|
||||
}
|
||||
|
||||
stub := plannercore.PhysicalShuffleDataSourceStub{
|
||||
Worker: (unsafe.Pointer)(w),
|
||||
}.Init(b.ctx, v.DataSource.Stats(), v.DataSource.SelectBlockOffset(), nil)
|
||||
stub.SetSchema(v.DataSource.Schema())
|
||||
w := &shuffleWorker{
|
||||
receivers: receivers,
|
||||
}
|
||||
|
||||
for j, dataSource := range v.DataSources {
|
||||
stub := plannercore.PhysicalShuffleReceiverStub{
|
||||
Receiver: (unsafe.Pointer)(receivers[j]),
|
||||
}.Init(b.ctx, dataSource.Stats(), dataSource.SelectBlockOffset(), nil)
|
||||
stub.SetSchema(dataSource.Schema())
|
||||
v.Tails[j].SetChildren(stub)
|
||||
}
|
||||
|
||||
tail.SetChildren(stub)
|
||||
w.childExec = b.build(head)
|
||||
if b.err != nil {
|
||||
return nil
|
||||
@ -3715,8 +3732,8 @@ func (b *executorBuilder) buildShuffle(v *plannercore.PhysicalShuffle) *ShuffleE
|
||||
return shuffle
|
||||
}
|
||||
|
||||
func (b *executorBuilder) buildShuffleDataSourceStub(v *plannercore.PhysicalShuffleDataSourceStub) *shuffleWorker {
|
||||
return (*shuffleWorker)(v.Worker)
|
||||
func (b *executorBuilder) buildShuffleReceiverStub(v *plannercore.PhysicalShuffleReceiverStub) *shuffleReceiver {
|
||||
return (*shuffleReceiver)(v.Receiver)
|
||||
}
|
||||
|
||||
func (b *executorBuilder) buildSQLBindExec(v *plannercore.SQLBindPlan) Executor {
|
||||
|
||||
@ -29,10 +29,11 @@ import (
|
||||
)
|
||||
|
||||
// ShuffleExec is the executor to run other executors in a parallel manner.
|
||||
// 1. It fetches chunks from `DataSource`.
|
||||
// 2. It splits tuples from `DataSource` into N partitions (Only "split by hash" is implemented so far).
|
||||
// 3. It invokes N workers in parallel, assign each partition as input to each worker and execute child executors.
|
||||
// 4. It collects outputs from each worker, then sends outputs to its parent.
|
||||
// 1. It fetches chunks from M `DataSources` (value of M depends on the actual executor, e.g. M = 1 for WindowExec, M = 2 for MergeJoinExec).
|
||||
// 2. It splits tuples from each `DataSource` into N partitions (Only "split by hash" is implemented so far).
|
||||
// 3. It invokes N workers in parallel, each one has M `receiver` to receive partitions from `DataSources`
|
||||
// 4. It assigns partitions received as input to each worker and executes child executors.
|
||||
// 5. It collects outputs from each worker, then sends outputs to its parent.
|
||||
//
|
||||
// +-------------+
|
||||
// +-------| Main Thread |
|
||||
@ -80,8 +81,9 @@ type ShuffleExec struct {
|
||||
prepared bool
|
||||
executed bool
|
||||
|
||||
splitter partitionSplitter
|
||||
dataSource Executor
|
||||
// each dataSource has a corresponding spliter
|
||||
splitters []partitionSplitter
|
||||
dataSources []Executor
|
||||
|
||||
finishCh chan struct{}
|
||||
outputCh chan *shuffleOutput
|
||||
@ -95,8 +97,11 @@ type shuffleOutput struct {
|
||||
|
||||
// Open implements the Executor Open interface.
|
||||
func (e *ShuffleExec) Open(ctx context.Context) error {
|
||||
if err := e.dataSource.Open(ctx); err != nil {
|
||||
return err
|
||||
for _, s := range e.dataSources {
|
||||
if err := s.Open(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
if err := e.baseExecutor.Open(ctx); err != nil {
|
||||
return err
|
||||
@ -109,8 +114,11 @@ func (e *ShuffleExec) Open(ctx context.Context) error {
|
||||
for _, w := range e.workers {
|
||||
w.finishCh = e.finishCh
|
||||
|
||||
w.inputCh = make(chan *chunk.Chunk, 1)
|
||||
w.inputHolderCh = make(chan *chunk.Chunk, 1)
|
||||
for _, r := range w.receivers {
|
||||
r.inputCh = make(chan *chunk.Chunk, 1)
|
||||
r.inputHolderCh = make(chan *chunk.Chunk, 1)
|
||||
}
|
||||
|
||||
w.outputCh = e.outputCh
|
||||
w.outputHolderCh = make(chan *chunk.Chunk, 1)
|
||||
|
||||
@ -118,7 +126,9 @@ func (e *ShuffleExec) Open(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
w.inputHolderCh <- newFirstChunk(e.dataSource)
|
||||
for i, r := range w.receivers {
|
||||
r.inputHolderCh <- newFirstChunk(e.dataSources[i])
|
||||
}
|
||||
w.outputHolderCh <- newFirstChunk(e)
|
||||
}
|
||||
|
||||
@ -129,15 +139,19 @@ func (e *ShuffleExec) Open(ctx context.Context) error {
|
||||
func (e *ShuffleExec) Close() error {
|
||||
if !e.prepared {
|
||||
for _, w := range e.workers {
|
||||
close(w.inputHolderCh)
|
||||
close(w.inputCh)
|
||||
for _, r := range w.receivers {
|
||||
close(r.inputHolderCh)
|
||||
close(r.inputCh)
|
||||
}
|
||||
close(w.outputHolderCh)
|
||||
}
|
||||
close(e.outputCh)
|
||||
}
|
||||
close(e.finishCh)
|
||||
for _, w := range e.workers {
|
||||
for range w.inputCh {
|
||||
for _, r := range w.receivers {
|
||||
for range r.inputCh {
|
||||
}
|
||||
}
|
||||
}
|
||||
for range e.outputCh { // workers exit before `e.outputCh` is closed.
|
||||
@ -150,16 +164,27 @@ func (e *ShuffleExec) Close() error {
|
||||
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
|
||||
}
|
||||
|
||||
err := e.dataSource.Close()
|
||||
err1 := e.baseExecutor.Close()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
// close dataSources
|
||||
errArr := make([]error, len(e.dataSources))
|
||||
for i, dataSource := range e.dataSources {
|
||||
errArr[i] = dataSource.Close()
|
||||
}
|
||||
return errors.Trace(err1)
|
||||
// close baseExecutor
|
||||
baseCloseErr := e.baseExecutor.Close()
|
||||
// check close error
|
||||
for _, err := range errArr {
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
return errors.Trace(baseCloseErr)
|
||||
}
|
||||
|
||||
func (e *ShuffleExec) prepare4ParallelExec(ctx context.Context) {
|
||||
go e.fetchDataAndSplit(ctx)
|
||||
// create a goroutine for each dataSource to fetch and split data
|
||||
for i := range e.dataSources {
|
||||
go e.fetchDataAndSplit(ctx, i)
|
||||
}
|
||||
|
||||
waitGroup := &sync.WaitGroup{}
|
||||
waitGroup.Add(len(e.workers))
|
||||
@ -213,25 +238,25 @@ func recoveryShuffleExec(output chan *shuffleOutput, r interface{}) {
|
||||
logutil.BgLogger().Error("shuffle panicked", zap.Error(err), zap.Stack("stack"))
|
||||
}
|
||||
|
||||
func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context) {
|
||||
func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context, dataSourceIndex int) {
|
||||
var (
|
||||
err error
|
||||
workerIndices []int
|
||||
)
|
||||
results := make([]*chunk.Chunk, len(e.workers))
|
||||
chk := newFirstChunk(e.dataSource)
|
||||
chk := newFirstChunk(e.dataSources[dataSourceIndex])
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
recoveryShuffleExec(e.outputCh, r)
|
||||
}
|
||||
for _, w := range e.workers {
|
||||
close(w.inputCh)
|
||||
close(w.receivers[dataSourceIndex].inputCh)
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
err = Next(ctx, e.dataSource, chk)
|
||||
err = Next(ctx, e.dataSources[dataSourceIndex], chk)
|
||||
if err != nil {
|
||||
e.outputCh <- &shuffleOutput{err: err}
|
||||
return
|
||||
@ -240,7 +265,7 @@ func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context) {
|
||||
break
|
||||
}
|
||||
|
||||
workerIndices, err = e.splitter.split(e.ctx, chk, workerIndices)
|
||||
workerIndices, err = e.splitters[dataSourceIndex].split(e.ctx, chk, workerIndices)
|
||||
if err != nil {
|
||||
e.outputCh <- &shuffleOutput{err: err}
|
||||
return
|
||||
@ -254,47 +279,40 @@ func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context) {
|
||||
select {
|
||||
case <-e.finishCh:
|
||||
return
|
||||
case results[workerIdx] = <-w.inputHolderCh:
|
||||
case results[workerIdx] = <-w.receivers[dataSourceIndex].inputHolderCh:
|
||||
break
|
||||
}
|
||||
}
|
||||
results[workerIdx].AppendRow(chk.GetRow(i))
|
||||
if results[workerIdx].IsFull() {
|
||||
w.inputCh <- results[workerIdx]
|
||||
w.receivers[dataSourceIndex].inputCh <- results[workerIdx]
|
||||
results[workerIdx] = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
for i, w := range e.workers {
|
||||
if results[i] != nil {
|
||||
w.inputCh <- results[i]
|
||||
w.receivers[dataSourceIndex].inputCh <- results[i]
|
||||
results[i] = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var _ Executor = &shuffleWorker{}
|
||||
var _ Executor = &shuffleReceiver{}
|
||||
|
||||
// shuffleWorker is the multi-thread worker executing child executors within "partition".
|
||||
type shuffleWorker struct {
|
||||
// shuffleReceiver receives chunk from dataSource through inputCh
|
||||
type shuffleReceiver struct {
|
||||
baseExecutor
|
||||
childExec Executor
|
||||
|
||||
finishCh <-chan struct{}
|
||||
executed bool
|
||||
|
||||
// Workers get inputs from dataFetcherThread by `inputCh`,
|
||||
// and output results to main thread by `outputCh`.
|
||||
// `inputHolderCh` and `outputHolderCh` are "Chunk Holder" channels of `inputCh` and `outputCh` respectively,
|
||||
// which give the `*Chunk` back, to implement the data transport in a streaming manner.
|
||||
inputCh chan *chunk.Chunk
|
||||
inputHolderCh chan *chunk.Chunk
|
||||
outputCh chan *shuffleOutput
|
||||
outputHolderCh chan *chunk.Chunk
|
||||
inputCh chan *chunk.Chunk
|
||||
inputHolderCh chan *chunk.Chunk
|
||||
}
|
||||
|
||||
// Open implements the Executor Open interface.
|
||||
func (e *shuffleWorker) Open(ctx context.Context) error {
|
||||
func (e *shuffleReceiver) Open(ctx context.Context) error {
|
||||
if err := e.baseExecutor.Open(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -303,13 +321,13 @@ func (e *shuffleWorker) Open(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// Close implements the Executor Close interface.
|
||||
func (e *shuffleWorker) Close() error {
|
||||
func (e *shuffleReceiver) Close() error {
|
||||
return errors.Trace(e.baseExecutor.Close())
|
||||
}
|
||||
|
||||
// Next implements the Executor Next interface.
|
||||
// It is called by `Tail` executor within "shuffle", to fetch data from `DataSource` by `inputCh`.
|
||||
func (e *shuffleWorker) Next(ctx context.Context, req *chunk.Chunk) error {
|
||||
func (e *shuffleReceiver) Next(ctx context.Context, req *chunk.Chunk) error {
|
||||
req.Reset()
|
||||
if e.executed {
|
||||
return nil
|
||||
@ -329,6 +347,19 @@ func (e *shuffleWorker) Next(ctx context.Context, req *chunk.Chunk) error {
|
||||
}
|
||||
}
|
||||
|
||||
// shuffleWorker is the multi-thread worker executing child executors within "partition".
|
||||
type shuffleWorker struct {
|
||||
childExec Executor
|
||||
|
||||
finishCh <-chan struct{}
|
||||
|
||||
// each receiver corresponse to a dataSource
|
||||
receivers []*shuffleReceiver
|
||||
|
||||
outputCh chan *shuffleOutput
|
||||
outputHolderCh chan *chunk.Chunk
|
||||
}
|
||||
|
||||
func (e *shuffleWorker) run(ctx context.Context, waitGroup *sync.WaitGroup) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
|
||||
@ -788,8 +788,13 @@ func (p *PhysicalWindow) ExplainInfo() string {
|
||||
|
||||
// ExplainInfo implements Plan interface.
|
||||
func (p *PhysicalShuffle) ExplainInfo() string {
|
||||
explainIds := make([]fmt.Stringer, len(p.DataSources))
|
||||
for i := range p.DataSources {
|
||||
explainIds[i] = p.DataSources[i].ExplainID()
|
||||
}
|
||||
|
||||
buffer := bytes.NewBufferString("")
|
||||
fmt.Fprintf(buffer, "execution info: concurrency:%v, data source:%v", p.Concurrency, p.DataSource.ExplainID())
|
||||
fmt.Fprintf(buffer, "execution info: concurrency:%v, data sources:%v", p.Concurrency, explainIds)
|
||||
return buffer.String()
|
||||
}
|
||||
|
||||
|
||||
@ -216,9 +216,9 @@ func (p PhysicalShuffle) Init(ctx sessionctx.Context, stats *property.StatsInfo,
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes PhysicalShuffleDataSourceStub.
|
||||
func (p PhysicalShuffleDataSourceStub) Init(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalShuffleDataSourceStub {
|
||||
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeShuffleDataSourceStub, &p, offset)
|
||||
// Init initializes PhysicalShuffleReceiverStub.
|
||||
func (p PhysicalShuffleReceiverStub) Init(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalShuffleReceiverStub {
|
||||
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeShuffleReceiver, &p, offset)
|
||||
p.childrenReqProps = props
|
||||
p.stats = stats
|
||||
return &p
|
||||
|
||||
@ -60,7 +60,7 @@ var (
|
||||
_ PhysicalPlan = &PhysicalUnionScan{}
|
||||
_ PhysicalPlan = &PhysicalWindow{}
|
||||
_ PhysicalPlan = &PhysicalShuffle{}
|
||||
_ PhysicalPlan = &PhysicalShuffleDataSourceStub{}
|
||||
_ PhysicalPlan = &PhysicalShuffleReceiverStub{}
|
||||
_ PhysicalPlan = &BatchPointGetPlan{}
|
||||
)
|
||||
|
||||
@ -1235,7 +1235,7 @@ func (p *PhysicalWindow) ExtractCorrelatedCols() []*expression.CorrelatedColumn
|
||||
}
|
||||
|
||||
// PhysicalShuffle represents a shuffle plan.
|
||||
// `Tail` and `DataSource` are the last plan within and the first plan following the "shuffle", respectively,
|
||||
// `Tails` and `DataSources` are the last plan within and the first plan following the "shuffle", respectively,
|
||||
// to build the child executors chain.
|
||||
// Take `Window` operator for example:
|
||||
// Shuffle -> Window -> Sort -> DataSource, will be separated into:
|
||||
@ -1246,11 +1246,12 @@ type PhysicalShuffle struct {
|
||||
basePhysicalPlan
|
||||
|
||||
Concurrency int
|
||||
Tail PhysicalPlan
|
||||
DataSource PhysicalPlan
|
||||
Tails []PhysicalPlan
|
||||
DataSources []PhysicalPlan
|
||||
|
||||
SplitterType PartitionSplitterType
|
||||
HashByItems []expression.Expression
|
||||
// each DataSource has an array of HashByItems
|
||||
HashByItemArrays [][]expression.Expression
|
||||
}
|
||||
|
||||
// PartitionSplitterType is the type of `Shuffle` executor splitter, which splits data source into partitions.
|
||||
@ -1261,13 +1262,13 @@ const (
|
||||
PartitionHashSplitterType = iota
|
||||
)
|
||||
|
||||
// PhysicalShuffleDataSourceStub represents a data source stub of `PhysicalShuffle`,
|
||||
// PhysicalShuffleReceiverStub represents a receiver stub of `PhysicalShuffle`,
|
||||
// and actually, is executed by `executor.shuffleWorker`.
|
||||
type PhysicalShuffleDataSourceStub struct {
|
||||
type PhysicalShuffleReceiverStub struct {
|
||||
physicalSchemaProducer
|
||||
|
||||
// Worker points to `executor.shuffleWorker`.
|
||||
Worker unsafe.Pointer
|
||||
// Worker points to `executor.shuffleReceiver`.
|
||||
Receiver unsafe.Pointer
|
||||
}
|
||||
|
||||
// CollectPlanStatsVersion uses to collect the statistics version of the plan.
|
||||
|
||||
@ -126,11 +126,11 @@ func optimizeByShuffle4Window(pp *PhysicalWindow, ctx sessionctx.Context) *Physi
|
||||
}
|
||||
reqProp := &property.PhysicalProperty{ExpectedCnt: math.MaxFloat64}
|
||||
shuffle := PhysicalShuffle{
|
||||
Concurrency: concurrency,
|
||||
Tail: tail,
|
||||
DataSource: dataSource,
|
||||
SplitterType: PartitionHashSplitterType,
|
||||
HashByItems: byItems,
|
||||
Concurrency: concurrency,
|
||||
Tails: []PhysicalPlan{tail},
|
||||
DataSources: []PhysicalPlan{dataSource},
|
||||
SplitterType: PartitionHashSplitterType,
|
||||
HashByItemArrays: [][]expression.Expression{byItems},
|
||||
}.Init(ctx, pp.statsInfo(), pp.SelectBlockOffset(), reqProp)
|
||||
return shuffle
|
||||
}
|
||||
|
||||
@ -498,11 +498,15 @@ func (p *PhysicalShuffle) ResolveIndices() (err error) {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i := range p.HashByItems {
|
||||
// "Shuffle" get value of items from `DataSource`, other than children[0].
|
||||
p.HashByItems[i], err = p.HashByItems[i].ResolveIndices(p.DataSource.Schema())
|
||||
if err != nil {
|
||||
return err
|
||||
// There may be one or more DataSource
|
||||
for i := range p.HashByItemArrays {
|
||||
// Each DataSource has an array of HashByItems
|
||||
for j := range p.HashByItemArrays[i] {
|
||||
// "Shuffle" get value of items from `DataSource`, other than children[0].
|
||||
p.HashByItemArrays[i][j], err = p.HashByItemArrays[i][j].ResolveIndices(p.DataSources[i].Schema())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
|
||||
@ -266,8 +266,8 @@ func toString(in Plan, strs []string, idxs []int) ([]string, []int) {
|
||||
str = fmt.Sprintf("Window(%s)", x.ExplainInfo())
|
||||
case *PhysicalShuffle:
|
||||
str = fmt.Sprintf("Partition(%s)", x.ExplainInfo())
|
||||
case *PhysicalShuffleDataSourceStub:
|
||||
str = fmt.Sprintf("PartitionDataSourceStub(%s)", x.ExplainInfo())
|
||||
case *PhysicalShuffleReceiverStub:
|
||||
str = fmt.Sprintf("PartitionReceiverStub(%s)", x.ExplainInfo())
|
||||
case *PointGetPlan:
|
||||
str = fmt.Sprintf("PointGet(")
|
||||
if x.IndexInfo != nil {
|
||||
|
||||
2
planner/core/testdata/plan_suite_out.json
vendored
2
planner/core/testdata/plan_suite_out.json
vendored
@ -1882,7 +1882,7 @@
|
||||
},
|
||||
{
|
||||
"SQL": "select lead(a, 1) over (partition by b) as c from t",
|
||||
"Best": "TableReader(Table(t))->Sort->Window(lead(test.t.a, 1)->Column#14 over(partition by test.t.b))->Partition(execution info: concurrency:4, data source:TableReader_10)->Projection"
|
||||
"Best": "TableReader(Table(t))->Sort->Window(lead(test.t.a, 1)->Column#14 over(partition by test.t.b))->Partition(execution info: concurrency:4, data sources:[TableReader_10])->Projection"
|
||||
}
|
||||
]
|
||||
},
|
||||
|
||||
@ -257,8 +257,8 @@
|
||||
"Name": "TestWindowParallelFunction",
|
||||
"Cases": [
|
||||
"TableReader(Table(t))->Window(avg(cast(test.t.a, decimal(65,30) BINARY))->Column#14 over(partition by test.t.a))->Projection",
|
||||
"TableReader(Table(t))->Sort->Window(avg(cast(test.t.a, decimal(65,30) BINARY))->Column#14 over(partition by test.t.b))->Partition(execution info: concurrency:4, data source:TableReader_10)->Projection",
|
||||
"IndexReader(Index(t.f)[[NULL,+inf]])->Projection->Sort->Window(avg(cast(Column#16, decimal(24,4) BINARY))->Column#17 over(partition by Column#15))->Partition(execution info: concurrency:4, data source:Projection_8)->Projection",
|
||||
"TableReader(Table(t))->Sort->Window(avg(cast(test.t.a, decimal(65,30) BINARY))->Column#14 over(partition by test.t.b))->Partition(execution info: concurrency:4, data sources:[TableReader_10])->Projection",
|
||||
"IndexReader(Index(t.f)[[NULL,+inf]])->Projection->Sort->Window(avg(cast(Column#16, decimal(24,4) BINARY))->Column#17 over(partition by Column#15))->Partition(execution info: concurrency:4, data sources:[Projection_8])->Projection",
|
||||
"TableReader(Table(t))->Sort->Window(avg(cast(test.t.a, decimal(65,30) BINARY))->Column#14 over(order by test.t.a, test.t.b desc range between unbounded preceding and current row))->Projection",
|
||||
"TableReader(Table(t))->Window(avg(cast(test.t.a, decimal(65,30) BINARY))->Column#14 over(partition by test.t.a))->Projection",
|
||||
"[planner:1054]Unknown column 'z' in 'field list'",
|
||||
@ -300,7 +300,7 @@
|
||||
"[planner:1210]Incorrect arguments to nth_value",
|
||||
"[planner:1210]Incorrect arguments to ntile",
|
||||
"IndexReader(Index(t.f)[[NULL,+inf]])->Window(ntile(<nil>)->Column#14 over())->Projection",
|
||||
"TableReader(Table(t))->Sort->Window(avg(cast(test.t.a, decimal(65,30) BINARY))->Column#14 over(partition by test.t.b))->Partition(execution info: concurrency:4, data source:TableReader_10)->Projection",
|
||||
"TableReader(Table(t))->Sort->Window(avg(cast(test.t.a, decimal(65,30) BINARY))->Column#14 over(partition by test.t.b))->Partition(execution info: concurrency:4, data sources:[TableReader_10])->Projection",
|
||||
"TableReader(Table(t))->Window(nth_value(test.t.i_date, 1)->Column#14 over())->Projection",
|
||||
"TableReader(Table(t))->Window(sum(cast(test.t.b, decimal(65,0) BINARY))->Column#15, sum(cast(test.t.c, decimal(65,0) BINARY))->Column#16 over(order by test.t.a range between unbounded preceding and current row))->Projection",
|
||||
"[planner:3593]You cannot use the window function 'sum' in this context.'",
|
||||
@ -308,7 +308,7 @@
|
||||
"[planner:3593]You cannot use the window function 'row_number' in this context.'",
|
||||
"TableReader(Table(t))->Sort->Window(sum(cast(test.t.c, decimal(65,0) BINARY))->Column#17 over(partition by test.t.a order by test.t.c range between unbounded preceding and current row))->Sort->Window(sum(cast(test.t.b, decimal(65,0) BINARY))->Column#18 over(order by test.t.a, test.t.b, test.t.c range between unbounded preceding and current row))->Window(sum(cast(test.t.a, decimal(65,0) BINARY))->Column#19 over(partition by test.t.a order by test.t.b range between unbounded preceding and current row))->Window(sum(cast(test.t.d, decimal(65,0) BINARY))->Column#20 over())->Projection",
|
||||
"[planner:3587]Window 'w1' with RANGE N PRECEDING/FOLLOWING frame requires exactly one ORDER BY expression, of numeric or temporal type",
|
||||
"TableReader(Table(t))->Sort->Window(dense_rank()->Column#14 over(partition by test.t.b order by test.t.a desc, test.t.b desc))->Partition(execution info: concurrency:4, data source:TableReader_9)->Projection",
|
||||
"TableReader(Table(t))->Sort->Window(dense_rank()->Column#14 over(partition by test.t.b order by test.t.a desc, test.t.b desc))->Partition(execution info: concurrency:4, data sources:[TableReader_9])->Projection",
|
||||
"[planner:3587]Window 'w1' with RANGE N PRECEDING/FOLLOWING frame requires exactly one ORDER BY expression, of numeric or temporal type",
|
||||
"[planner:3585]Window 'w1': frame end cannot be UNBOUNDED PRECEDING.",
|
||||
"[planner:3584]Window 'w1': frame start cannot be UNBOUNDED FOLLOWING.",
|
||||
@ -322,7 +322,7 @@
|
||||
"[planner:3586]Window 'w': frame start or end is negative, NULL or of non-integral type",
|
||||
"[planner:3586]Window 'w': frame start or end is negative, NULL or of non-integral type",
|
||||
"[planner:3586]Window 'w': frame start or end is negative, NULL or of non-integral type",
|
||||
"TableReader(Table(t))->Sort->Window(row_number()->Column#14 over(partition by test.t.b))->Partition(execution info: concurrency:4, data source:TableReader_10)->Projection"
|
||||
"TableReader(Table(t))->Sort->Window(row_number()->Column#14 over(partition by test.t.b))->Partition(execution info: concurrency:4, data sources:[TableReader_10])->Projection"
|
||||
]
|
||||
},
|
||||
{
|
||||
|
||||
@ -92,8 +92,8 @@ const (
|
||||
TypeWindow = "Window"
|
||||
// TypeShuffle is the type of Shuffle.
|
||||
TypeShuffle = "Shuffle"
|
||||
// TypeShuffleDataSourceStub is the type of Shuffle.
|
||||
TypeShuffleDataSourceStub = "ShuffleDataSourceStub"
|
||||
// TypeShuffleReceiver is the type of Shuffle.
|
||||
TypeShuffleReceiver = "ShuffleReceiver"
|
||||
// TypeTiKVSingleGather is the type of TiKVSingleGather.
|
||||
TypeTiKVSingleGather = "TiKVSingleGather"
|
||||
// TypeIndexMerge is the type of IndexMergeReader
|
||||
|
||||
Reference in New Issue
Block a user