executor: set the correct resultCh for task when indexHashJoin keep order (#18840)

* executor: set the correct resultCh for task when indexHashJoin keep order (#18757)

* update test

Co-authored-by: ti-srebot <66930949+ti-srebot@users.noreply.github.com>
This commit is contained in:
HuaiyuXu
2020-09-05 17:24:57 +08:00
committed by GitHub
parent 7f56b302d5
commit 78166329e2
2 changed files with 53 additions and 4 deletions

View File

@ -320,10 +320,11 @@ func (ow *indexHashJoinOuterWorker) run(ctx context.Context) {
})
if err != nil {
task = &indexHashJoinTask{err: err}
ow.pushToChan(ctx, task, ow.innerCh)
if ow.keepOuterOrder {
task.keepOuterOrder, task.resultCh = true, make(chan *indexHashJoinResult, 1)
ow.pushToChan(ctx, task, ow.taskCh)
}
ow.pushToChan(ctx, task, ow.innerCh)
return
}
if task == nil {
@ -452,13 +453,14 @@ func (iw *indexHashJoinInnerWorker) run(ctx context.Context, cancelFunc context.
if !ok {
break
}
// We need to init resultCh before the err is returned.
if task.keepOuterOrder {
resultCh = task.resultCh
}
if task.err != nil {
joinResult.err = task.err
break
}
if task.keepOuterOrder {
resultCh = task.resultCh
}
err := iw.handleTask(ctx, task, joinResult, h, resultCh)
if err != nil {
joinResult.err = err

View File

@ -1422,6 +1422,53 @@ func (s *seqTestSuite) TestOOMPanicInHashJoinWhenFetchBuildRows(c *C) {
c.Assert(err.Error(), Equals, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")
}
func (s *seqTestSuite) TestIssue18744(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec(`use test;`)
tk.MustExec(`drop table if exists t, t1;`)
tk.MustExec(`CREATE TABLE t (
id int(11) NOT NULL,
a bigint(20) DEFAULT NULL,
b char(20) DEFAULT NULL,
c datetime DEFAULT NULL,
d double DEFAULT NULL,
e json DEFAULT NULL,
f decimal(40,6) DEFAULT NULL,
PRIMARY KEY (id),
KEY a (a),
KEY b (b),
KEY c (c),
KEY d (d),
KEY f (f)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;`)
tk.MustExec(`CREATE TABLE t1 (
id int(11) NOT NULL,
a bigint(20) DEFAULT NULL,
b char(20) DEFAULT NULL,
c datetime DEFAULT NULL,
d double DEFAULT NULL,
e json DEFAULT NULL,
f decimal(40,6) DEFAULT NULL,
PRIMARY KEY (id),
KEY a (a),
KEY b (b),
KEY c (c),
KEY d (d),
KEY f (f)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;`)
tk.MustExec(`insert into t1(id) values(0),(1),(2);`)
tk.MustExec(`insert into t values(0, 2010, "2010-01-01 01:01:00" , "2010-01-01 01:01:00" , 2010 , 2010 , 2010.000000);`)
tk.MustExec(`insert into t values(1 , NULL , NULL , NULL , NULL , NULL , NULL);`)
tk.MustExec(`insert into t values(2 , 2012 , "2012-01-01 01:01:00" , "2012-01-01 01:01:00" , 2012 , 2012 , 2012.000000);`)
tk.MustExec(`set tidb_index_lookup_join_concurrency=1`)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr", "return"), IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/testIndexHashJoinOuterWorkerErr"), IsNil)
}()
err := tk.QueryToErr(`select /*+ inl_hash_join(t2) */ t1.id, t2.id from t1 join t t2 on t1.a = t2.a order by t1.a ASC limit 1;`)
c.Assert(err.Error(), Equals, "mockIndexHashJoinOuterWorkerErr")
}
func (s *seqTestSuite) TestIssue19410(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")