From f2b702e07c1f97136efdfac87ec9489f09962c35 Mon Sep 17 00:00:00 2001 From: xzhangxian1008 Date: Thu, 27 Nov 2025 19:56:26 +0800 Subject: [PATCH] executor: call `Close` after the exist of all workers in `ShuffleExec` (#64660) close pingcap/tidb#52984 --- pkg/executor/shuffle.go | 22 +++++++++++++++++---- pkg/executor/test/executor/BUILD.bazel | 2 +- pkg/executor/test/executor/executor_test.go | 14 +++++++++++++ 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/pkg/executor/shuffle.go b/pkg/executor/shuffle.go index fdc8af813c..cb89325a1f 100644 --- a/pkg/executor/shuffle.go +++ b/pkg/executor/shuffle.go @@ -17,6 +17,7 @@ package executor import ( "context" "sync" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -30,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/util/channel" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/execdetails" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/twmb/murmur3" "go.uber.org/zap" @@ -97,6 +99,8 @@ type ShuffleExec struct { finishCh chan struct{} outputCh chan *shuffleOutput + + allSourceAndWorkerExitForTest atomic.Bool } type shuffleOutput struct { @@ -168,6 +172,15 @@ func (e *ShuffleExec) Close() error { if e.finishCh != nil { close(e.finishCh) } + + if e.outputCh != nil { + channel.Clear(e.outputCh) + } + + if intest.InTest && !e.allSourceAndWorkerExitForTest.Load() && e.prepared { + panic("there are still some running sources or workers") + } + for _, w := range e.workers { for _, r := range w.receivers { if r.inputCh != nil { @@ -179,9 +192,7 @@ func (e *ShuffleExec) Close() error { firstErr = err } } - if e.outputCh != nil { - channel.Clear(e.outputCh) - } + e.executed = false if e.RuntimeStats() != nil { @@ -205,7 +216,9 @@ func (e *ShuffleExec) Close() error { func (e *ShuffleExec) prepare4ParallelExec(ctx context.Context) { waitGroup := &sync.WaitGroup{} - waitGroup.Add(len(e.workers) + len(e.dataSources)) + num := len(e.workers) + len(e.dataSources) + waitGroup.Add(num) + e.allSourceAndWorkerExitForTest.Store(false) // create a goroutine for each dataSource to fetch and split data for i := range e.dataSources { go e.fetchDataAndSplit(ctx, i, waitGroup) @@ -220,6 +233,7 @@ func (e *ShuffleExec) prepare4ParallelExec(ctx context.Context) { func (e *ShuffleExec) waitWorkerAndCloseOutput(waitGroup *sync.WaitGroup) { waitGroup.Wait() + e.allSourceAndWorkerExitForTest.Store(true) close(e.outputCh) } diff --git a/pkg/executor/test/executor/BUILD.bazel b/pkg/executor/test/executor/BUILD.bazel index 30d4479a80..366993fafc 100644 --- a/pkg/executor/test/executor/BUILD.bazel +++ b/pkg/executor/test/executor/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 49, + shard_count = 50, deps = [ "//pkg/config", "//pkg/config/kerneltype", diff --git a/pkg/executor/test/executor/executor_test.go b/pkg/executor/test/executor/executor_test.go index 672d49f124..0ab155a4f0 100644 --- a/pkg/executor/test/executor/executor_test.go +++ b/pkg/executor/test/executor/executor_test.go @@ -2650,3 +2650,17 @@ func TestIssue63329(t *testing.T) { tk.MustQuery("select /*+ READ_FROM_STORAGE(tiflash[t1], tiflash[t2]) */ * from t1 join t2 on t1.a=t2.x;").Check(testkit.Rows("1 1 1 1 1 1", "2 2 2 2 2 2")) } } + +func TestIssue52984(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(p int, o int, v int);") + tk.MustExec("insert into t values (0, 0, 0), (0, 786, 155), (1, 487, 577), (2, 787, 801), (3, 611, 179), (4, 298, 320), (0, 901, 802), (1, 69, 860), (2, 461, 279), (3, 885, 902), (4, 216, 997), (0, 291, 504), (1, 251, 289), (2, 194, 588), (3, 525, 491), (4, 371, 941), (0, 791, 663), (1, 333, 775), (2, 266, 924), (3, 157, 531), (4, 339, 933), (0, 972, 212), (1, 216, 585), (2, 844, 392), (3, 520, 788), (4, 716, 254), (0, 492, 370), (1, 597, 653), (2, 260, 241), (3, 708, 109), (4, 736, 943), (0, 434, 615), (1, 487, 777), (2, 378, 904), (3, 109, 0), (4, 466, 631), (0, 206, 406), (1, 768, 170), (2, 398, 448), (3, 722, 111), (4, 117, 812), (0, 386, 65), (1, 156, 540), (2, 536, 651), (3, 91, 836), (4, 53, 567), (0, 119, 897), (1, 457, 759), (2, 863, 236), (3, 932, 931), (4, 120, 249), (0, 520, 853), (1, 458, 446), (2, 311, 158), (3, 62, 408), (4, 423, 752), (0, 869, 941), (1, 999, 436), (2, 591, 662), (3, 686, 127), (4, 143, 82), (0, 36, 938), (1, 568, 443), (2, 485, 741), (3, 728, 116), (4, 462, 417), (0, 802, 733), (1, 834, 181), (2, 262, 481), (3, 637, 729), (4, 453, 18), (0, 232, 346), (1, 9, 327), (2, 249, 827), (3, 959, 679), (4, 333, 76), (0, 428, 216), (1, 449, 811), (2, 336, 338), (3, 951, 446), (4, 435, 860), (0, 406, 548), (1, 249, 114), (2, 785, 956), (3, 648, 978), (4, 141, 230), (0, 28, 209), (1, 577, 718), (2, 161, 386), (3, 439, 644), (4, 844, 401), (0, 746, 606), (1, 613, 441), (2, 907, 986), (3, 667, 323), (4, 715, 876), (0, 909, 152), (1, 294, 211), (2, 867, 516), (3, 372, 706), (4, 26, 907), (0, 870, 928);") + tk.MustExec("set tidb_max_chunk_size=32;") + + for range 10 { + tk.MustQuery("select p, o, v, sum(v) over w as 'sum' from t window w as (partition by p order by o rows between 0 preceding and 0 following) limit 10;") + } +}