From 78166329e2708e8a44e578f597a2f4884a563e8c Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Sat, 5 Sep 2020 17:24:57 +0800 Subject: [PATCH] executor: set the correct resultCh for task when indexHashJoin keep order (#18840) * executor: set the correct resultCh for task when indexHashJoin keep order (#18757) * update test Co-authored-by: ti-srebot <66930949+ti-srebot@users.noreply.github.com> --- executor/index_lookup_hash_join.go | 10 +++--- executor/seqtest/seq_executor_test.go | 47 +++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/executor/index_lookup_hash_join.go b/executor/index_lookup_hash_join.go index d0ca46488c..49eb1bb2a9 100644 --- a/executor/index_lookup_hash_join.go +++ b/executor/index_lookup_hash_join.go @@ -320,10 +320,11 @@ func (ow *indexHashJoinOuterWorker) run(ctx context.Context) { }) if err != nil { task = &indexHashJoinTask{err: err} - ow.pushToChan(ctx, task, ow.innerCh) if ow.keepOuterOrder { + task.keepOuterOrder, task.resultCh = true, make(chan *indexHashJoinResult, 1) ow.pushToChan(ctx, task, ow.taskCh) } + ow.pushToChan(ctx, task, ow.innerCh) return } if task == nil { @@ -452,13 +453,14 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context. if !ok { break } + // We need to init resultCh before the err is returned. + if task.keepOuterOrder { + resultCh = task.resultCh + } if task.err != nil { joinResult.err = task.err break } - if task.keepOuterOrder { - resultCh = task.resultCh - } err := iw.handleTask(ctx, task, joinResult, h, resultCh) if err != nil { joinResult.err = err diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index 7e61308f7c..2249c2ce53 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -1422,6 +1422,53 @@ func (s *seqTestSuite) TestOOMPanicInHashJoinWhenFetchBuildRows(c *C) { c.Assert(err.Error(), Equals, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]") } +func (s *seqTestSuite) TestIssue18744(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec(`use test;`) + tk.MustExec(`drop table if exists t, t1;`) + tk.MustExec(`CREATE TABLE t ( + id int(11) NOT NULL, + a bigint(20) DEFAULT NULL, + b char(20) DEFAULT NULL, + c datetime DEFAULT NULL, + d double DEFAULT NULL, + e json DEFAULT NULL, + f decimal(40,6) DEFAULT NULL, + PRIMARY KEY (id), + KEY a (a), + KEY b (b), + KEY c (c), + KEY d (d), + KEY f (f) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;`) + tk.MustExec(`CREATE TABLE t1 ( + id int(11) NOT NULL, + a bigint(20) DEFAULT NULL, + b char(20) DEFAULT NULL, + c datetime DEFAULT NULL, + d double DEFAULT NULL, + e json DEFAULT NULL, + f decimal(40,6) DEFAULT NULL, + PRIMARY KEY (id), + KEY a (a), + KEY b (b), + KEY c (c), + KEY d (d), + KEY f (f) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;`) + tk.MustExec(`insert into t1(id) values(0),(1),(2);`) + tk.MustExec(`insert into t values(0, 2010, "2010-01-01 01:01:00" , "2010-01-01 01:01:00" , 2010 , 2010 , 2010.000000);`) + tk.MustExec(`insert into t values(1 , NULL , NULL , NULL , NULL , NULL , NULL);`) + tk.MustExec(`insert into t values(2 , 2012 , "2012-01-01 01:01:00" , "2012-01-01 01:01:00" , 2012 , 2012 , 2012.000000);`) + tk.MustExec(`set tidb_index_lookup_join_concurrency=1`) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr", "return"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr"), IsNil) + }() + err := tk.QueryToErr(`select /*+ inl_hash_join(t2) */ t1.id, t2.id from t1 join t t2 on t1.a = t2.a order by t1.a ASC limit 1;`) + c.Assert(err.Error(), Equals, "mockIndexHashJoinOuterWorkerErr") +} + func (s *seqTestSuite) TestIssue19410(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test")