From 80ea2389d8db32ce448f02ae2d632ae3f7cf22cd Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Thu, 9 Dec 2021 14:17:57 +0800 Subject: [PATCH] executor: HashJoinExec checks the buildError even if the probeSide is empty (#30471) --- executor/executor_test.go | 14 ++++++++++++++ executor/join.go | 13 ++++++++++++- executor/shuffle.go | 28 +++++++++++++++++++++------- 3 files changed, 47 insertions(+), 8 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index c821901d5a..f503504d73 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -9487,3 +9487,17 @@ func (s *testSerialSuite) TestIssue28650(c *C) { }() } } + +func (s *testSerialSuite) TestIssue30289(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + fpName := "github.com/pingcap/tidb/executor/issue30289" + c.Assert(failpoint.Enable(fpName, `return(true)`), IsNil) + defer func() { + c.Assert(failpoint.Disable(fpName), IsNil) + }() + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + err := tk.QueryToErr("select /*+ hash_join(t1) */ * from t t1 join t t2 on t1.a=t2.a") + c.Assert(err.Error(), Matches, "issue30289 build return error") +} diff --git a/executor/join.go b/executor/join.go index 2b97c0b1f9..c28d1e1079 100644 --- a/executor/join.go +++ b/executor/join.go @@ -214,9 +214,13 @@ func (e *HashJoinExec) fetchProbeSideChunks(ctx context.Context) { return } if !hasWaitedForBuild { + failpoint.Inject("issue30289", func(val failpoint.Value) { + if val.(bool) { + probeSideResult.Reset() + } + }) if probeSideResult.NumRows() == 0 && !e.useOuterToBuild { e.finished.Store(true) - return } emptyBuild, buildErr := e.wait4BuildSide() if buildErr != nil { @@ -258,6 +262,13 @@ func (e *HashJoinExec) wait4BuildSide() (emptyBuild bool, err error) { func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chunk.Chunk, doneCh <-chan struct{}) { defer close(chkCh) var err error + failpoint.Inject("issue30289", func(val failpoint.Value) { + if val.(bool) { + err = errors.Errorf("issue30289 build return error") + e.buildFinished <- errors.Trace(err) + return + } + }) for { if e.finished.Load().(bool) { return diff --git a/executor/shuffle.go b/executor/shuffle.go index a71e388c02..9143afd032 100644 --- a/executor/shuffle.go +++ b/executor/shuffle.go @@ -142,17 +142,29 @@ func (e *ShuffleExec) Close() error { if !e.prepared { for _, w := range e.workers { for _, r := range w.receivers { - close(r.inputHolderCh) - close(r.inputCh) + if r.inputHolderCh != nil { + close(r.inputHolderCh) + } + if r.inputCh != nil { + close(r.inputCh) + } + } + if w.outputHolderCh != nil { + close(w.outputHolderCh) } - close(w.outputHolderCh) } - close(e.outputCh) + if e.outputCh != nil { + close(e.outputCh) + } + } + if e.finishCh != nil { + close(e.finishCh) } - close(e.finishCh) for _, w := range e.workers { for _, r := range w.receivers { - for range r.inputCh { + if r.inputCh != nil { + for range r.inputCh { + } } } // close child executor of each worker @@ -160,7 +172,9 @@ func (e *ShuffleExec) Close() error { firstErr = err } } - for range e.outputCh { // workers exit before `e.outputCh` is closed. + if e.outputCh != nil { + for range e.outputCh { // workers exit before `e.outputCh` is closed. + } } e.executed = false