diff --git a/pkg/executor/shuffle.go b/pkg/executor/shuffle.go index fdc8af813c..cb89325a1f 100644 --- a/pkg/executor/shuffle.go +++ b/pkg/executor/shuffle.go @@ -17,6 +17,7 @@ package executor import ( "context" "sync" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -30,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/util/channel" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/execdetails" + "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/twmb/murmur3" "go.uber.org/zap" @@ -97,6 +99,8 @@ type ShuffleExec struct { finishCh chan struct{} outputCh chan *shuffleOutput + + allSourceAndWorkerExitForTest atomic.Bool } type shuffleOutput struct { @@ -168,6 +172,15 @@ func (e *ShuffleExec) Close() error { if e.finishCh != nil { close(e.finishCh) } + + if e.outputCh != nil { + channel.Clear(e.outputCh) + } + + if intest.InTest && !e.allSourceAndWorkerExitForTest.Load() && e.prepared { + panic("there are still some running sources or workers") + } + for _, w := range e.workers { for _, r := range w.receivers { if r.inputCh != nil { @@ -179,9 +192,7 @@ func (e *ShuffleExec) Close() error { firstErr = err } } - if e.outputCh != nil { - channel.Clear(e.outputCh) - } + e.executed = false if e.RuntimeStats() != nil { @@ -205,7 +216,9 @@ func (e *ShuffleExec) Close() error { func (e *ShuffleExec) prepare4ParallelExec(ctx context.Context) { waitGroup := &sync.WaitGroup{} - waitGroup.Add(len(e.workers) + len(e.dataSources)) + num := len(e.workers) + len(e.dataSources) + waitGroup.Add(num) + e.allSourceAndWorkerExitForTest.Store(false) // create a goroutine for each dataSource to fetch and split data for i := range e.dataSources { go e.fetchDataAndSplit(ctx, i, waitGroup) @@ -220,6 +233,7 @@ func (e *ShuffleExec) prepare4ParallelExec(ctx context.Context) { func (e *ShuffleExec) waitWorkerAndCloseOutput(waitGroup *sync.WaitGroup) { waitGroup.Wait() + e.allSourceAndWorkerExitForTest.Store(true) close(e.outputCh) } diff --git a/pkg/executor/test/executor/BUILD.bazel b/pkg/executor/test/executor/BUILD.bazel index 30d4479a80..366993fafc 100644 --- a/pkg/executor/test/executor/BUILD.bazel +++ b/pkg/executor/test/executor/BUILD.bazel @@ -8,7 +8,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 49, + shard_count = 50, deps = [ "//pkg/config", "//pkg/config/kerneltype", diff --git a/pkg/executor/test/executor/executor_test.go b/pkg/executor/test/executor/executor_test.go index 672d49f124..0ab155a4f0 100644 --- a/pkg/executor/test/executor/executor_test.go +++ b/pkg/executor/test/executor/executor_test.go @@ -2650,3 +2650,17 @@ func TestIssue63329(t *testing.T) { tk.MustQuery("select /*+ READ_FROM_STORAGE(tiflash[t1], tiflash[t2]) */ * from t1 join t2 on t1.a=t2.x;").Check(testkit.Rows("1 1 1 1 1 1", "2 2 2 2 2 2")) } } + +func TestIssue52984(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(p int, o int, v int);") + tk.MustExec("insert into t values (0, 0, 0), (0, 786, 155), (1, 487, 577), (2, 787, 801), (3, 611, 179), (4, 298, 320), (0, 901, 802), (1, 69, 860), (2, 461, 279), (3, 885, 902), (4, 216, 997), (0, 291, 504), (1, 251, 289), (2, 194, 588), (3, 525, 491), (4, 371, 941), (0, 791, 663), (1, 333, 775), (2, 266, 924), (3, 157, 531), (4, 339, 933), (0, 972, 212), (1, 216, 585), (2, 844, 392), (3, 520, 788), (4, 716, 254), (0, 492, 370), (1, 597, 653), (2, 260, 241), (3, 708, 109), (4, 736, 943), (0, 434, 615), (1, 487, 777), (2, 378, 904), (3, 109, 0), (4, 466, 631), (0, 206, 406), (1, 768, 170), (2, 398, 448), (3, 722, 111), (4, 117, 812), (0, 386, 65), (1, 156, 540), (2, 536, 651), (3, 91, 836), (4, 53, 567), (0, 119, 897), (1, 457, 759), (2, 863, 236), (3, 932, 931), (4, 120, 249), (0, 520, 853), (1, 458, 446), (2, 311, 158), (3, 62, 408), (4, 423, 752), (0, 869, 941), (1, 999, 436), (2, 591, 662), (3, 686, 127), (4, 143, 82), (0, 36, 938), (1, 568, 443), (2, 485, 741), (3, 728, 116), (4, 462, 417), (0, 802, 733), (1, 834, 181), (2, 262, 481), (3, 637, 729), (4, 453, 18), (0, 232, 346), (1, 9, 327), (2, 249, 827), (3, 959, 679), (4, 333, 76), (0, 428, 216), (1, 449, 811), (2, 336, 338), (3, 951, 446), (4, 435, 860), (0, 406, 548), (1, 249, 114), (2, 785, 956), (3, 648, 978), (4, 141, 230), (0, 28, 209), (1, 577, 718), (2, 161, 386), (3, 439, 644), (4, 844, 401), (0, 746, 606), (1, 613, 441), (2, 907, 986), (3, 667, 323), (4, 715, 876), (0, 909, 152), (1, 294, 211), (2, 867, 516), (3, 372, 706), (4, 26, 907), (0, 870, 928);") + tk.MustExec("set tidb_max_chunk_size=32;") + + for range 10 { + tk.MustQuery("select p, o, v, sum(v) over w as 'sum' from t window w as (partition by p order by o rows between 0 preceding and 0 following) limit 10;") + } +}