executor: call Close after the exist of all workers in ShuffleExec (#64660)

close pingcap/tidb#52984
This commit is contained in:
xzhangxian1008
2025-11-27 19:56:26 +08:00
committed by GitHub
parent d670bf9d01
commit f2b702e07c
3 changed files with 33 additions and 5 deletions

View File

@ -17,6 +17,7 @@ package executor
import (
"context"
"sync"
"sync/atomic"
"time"
"github.com/pingcap/errors"
@ -30,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/util/channel"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/twmb/murmur3"
"go.uber.org/zap"
@ -97,6 +99,8 @@ type ShuffleExec struct {
finishCh chan struct{}
outputCh chan *shuffleOutput
allSourceAndWorkerExitForTest atomic.Bool
}
type shuffleOutput struct {
@ -168,6 +172,15 @@ func (e *ShuffleExec) Close() error {
if e.finishCh != nil {
close(e.finishCh)
}
if e.outputCh != nil {
channel.Clear(e.outputCh)
}
if intest.InTest && !e.allSourceAndWorkerExitForTest.Load() && e.prepared {
panic("there are still some running sources or workers")
}
for _, w := range e.workers {
for _, r := range w.receivers {
if r.inputCh != nil {
@ -179,9 +192,7 @@ func (e *ShuffleExec) Close() error {
firstErr = err
}
}
if e.outputCh != nil {
channel.Clear(e.outputCh)
}
e.executed = false
if e.RuntimeStats() != nil {
@ -205,7 +216,9 @@ func (e *ShuffleExec) Close() error {
func (e *ShuffleExec) prepare4ParallelExec(ctx context.Context) {
waitGroup := &sync.WaitGroup{}
waitGroup.Add(len(e.workers) + len(e.dataSources))
num := len(e.workers) + len(e.dataSources)
waitGroup.Add(num)
e.allSourceAndWorkerExitForTest.Store(false)
// create a goroutine for each dataSource to fetch and split data
for i := range e.dataSources {
go e.fetchDataAndSplit(ctx, i, waitGroup)
@ -220,6 +233,7 @@ func (e *ShuffleExec) prepare4ParallelExec(ctx context.Context) {
func (e *ShuffleExec) waitWorkerAndCloseOutput(waitGroup *sync.WaitGroup) {
waitGroup.Wait()
e.allSourceAndWorkerExitForTest.Store(true)
close(e.outputCh)
}

View File

@ -8,7 +8,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 49,
shard_count = 50,
deps = [
"//pkg/config",
"//pkg/config/kerneltype",

View File

@ -2650,3 +2650,17 @@ func TestIssue63329(t *testing.T) {
tk.MustQuery("select /*+ READ_FROM_STORAGE(tiflash[t1], tiflash[t2]) */ * from t1 join t2 on t1.a=t2.x;").Check(testkit.Rows("1 1 1 1 1 1", "2 2 2 2 2 2"))
}
}
func TestIssue52984(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(p int, o int, v int);")
tk.MustExec("insert into t values (0, 0, 0), (0, 786, 155), (1, 487, 577), (2, 787, 801), (3, 611, 179), (4, 298, 320), (0, 901, 802), (1, 69, 860), (2, 461, 279), (3, 885, 902), (4, 216, 997), (0, 291, 504), (1, 251, 289), (2, 194, 588), (3, 525, 491), (4, 371, 941), (0, 791, 663), (1, 333, 775), (2, 266, 924), (3, 157, 531), (4, 339, 933), (0, 972, 212), (1, 216, 585), (2, 844, 392), (3, 520, 788), (4, 716, 254), (0, 492, 370), (1, 597, 653), (2, 260, 241), (3, 708, 109), (4, 736, 943), (0, 434, 615), (1, 487, 777), (2, 378, 904), (3, 109, 0), (4, 466, 631), (0, 206, 406), (1, 768, 170), (2, 398, 448), (3, 722, 111), (4, 117, 812), (0, 386, 65), (1, 156, 540), (2, 536, 651), (3, 91, 836), (4, 53, 567), (0, 119, 897), (1, 457, 759), (2, 863, 236), (3, 932, 931), (4, 120, 249), (0, 520, 853), (1, 458, 446), (2, 311, 158), (3, 62, 408), (4, 423, 752), (0, 869, 941), (1, 999, 436), (2, 591, 662), (3, 686, 127), (4, 143, 82), (0, 36, 938), (1, 568, 443), (2, 485, 741), (3, 728, 116), (4, 462, 417), (0, 802, 733), (1, 834, 181), (2, 262, 481), (3, 637, 729), (4, 453, 18), (0, 232, 346), (1, 9, 327), (2, 249, 827), (3, 959, 679), (4, 333, 76), (0, 428, 216), (1, 449, 811), (2, 336, 338), (3, 951, 446), (4, 435, 860), (0, 406, 548), (1, 249, 114), (2, 785, 956), (3, 648, 978), (4, 141, 230), (0, 28, 209), (1, 577, 718), (2, 161, 386), (3, 439, 644), (4, 844, 401), (0, 746, 606), (1, 613, 441), (2, 907, 986), (3, 667, 323), (4, 715, 876), (0, 909, 152), (1, 294, 211), (2, 867, 516), (3, 372, 706), (4, 26, 907), (0, 870, 928);")
tk.MustExec("set tidb_max_chunk_size=32;")
for range 10 {
tk.MustQuery("select p, o, v, sum(v) over w as 'sum' from t window w as (partition by p order by o rows between 0 preceding and 0 following) limit 10;")
}
}