diff --git a/src/gausskernel/runtime/executor/nodeHash.cpp b/src/gausskernel/runtime/executor/nodeHash.cpp index 48455de24..faf5c3449 100755 --- a/src/gausskernel/runtime/executor/nodeHash.cpp +++ b/src/gausskernel/runtime/executor/nodeHash.cpp @@ -1579,7 +1579,7 @@ static void ExecParallelHashIncreaseNumBuckets(HashJoinTable hashtable) Assert(batchno == 0); /* add the tuple to the proper bucket */ - ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], hashTuple, hashTuple->next.shared); + ExecParallelHashPushTuple(&hashtable->buckets.shared[bucketno], hashTuple, hashTuple); /* advance index past the tuple */ idx += MAXALIGN(HJTUPLE_OVERHEAD + HJTUPLE_MINTUPLE(hashTuple)->t_len); @@ -2031,11 +2031,19 @@ bool ExecParallelScanHashBucket(HashJoinState* hjstate, ExprContext* econtext) /* reset temp memory each time to avoid leaks from qual expr */ ResetExprContext(econtext); - if (ExecQual(hjclauses, econtext, false)) { + if (ExecQual(hjclauses, econtext, false) || + (hjstate->js.nulleqqual != NIL && ExecQual(hjstate->js.nulleqqual, econtext, false))) { hjstate->hj_CurTuple = hashTuple; return true; } } + /* + * For right Semi/Anti join, we delete mathced tuples in HashTable to make next matching faster, + * so pointer hj_PreTuple is designed to follow the hj_CurTuple and to help us to clear the HashTable. + */ + if (hjstate->js.jointype == JOIN_RIGHT_SEMI || hjstate->js.jointype == JOIN_RIGHT_ANTI) { + hjstate->hj_PreTuple = hashTuple; + } hashTuple = ExecParallelHashNextTuple(hashtable, hashTuple); } @@ -3132,11 +3140,8 @@ void ExecHashTableDetach(HashJoinTable hashtable) */ static inline HashJoinTuple ExecParallelHashFirstTuple(HashJoinTable hashtable, int bucketno) { - HashJoinTuple tuple; - ParallelHashJoinState* parallelState = hashtable->parallel_state; - Assert(parallelState); - tuple = (HashJoinTuple)dsa_get_address(hashtable->area, hashtable->buckets.shared[bucketno]); - return tuple; + Assert(hashtable->parallel_state); + return (HashJoinTuple)dsa_get_address(hashtable->area, hashtable->buckets.shared[bucketno]); } /* diff --git a/src/test/regress/expected/parallel_hashjoin.out b/src/test/regress/expected/parallel_hashjoin.out index 163aa7531..157b1906b 100644 --- a/src/test/regress/expected/parallel_hashjoin.out +++ b/src/test/regress/expected/parallel_hashjoin.out @@ -94,6 +94,63 @@ select * from parallel_hashjoin_test_a full join parallel_hashjoin_test_b on par 10 | 10 (10 rows) +-- parallel increase hash buckets +DROP TABLE IF EXISTS par_hash_incr_bucket_a; +NOTICE: table "par_hash_incr_bucket_a" does not exist, skipping +DROP TABLE IF EXISTS par_hash_incr_bucket_b; +NOTICE: table "par_hash_incr_bucket_b" does not exist, skipping +DROP TABLE IF EXISTS par_hash_incr_bucket_c; +NOTICE: table "par_hash_incr_bucket_c" does not exist, skipping +DROP TABLE IF EXISTS par_hash_incr_bucket_d; +NOTICE: table "par_hash_incr_bucket_d" does not exist, skipping +create table par_hash_incr_bucket_a(a int,b int,c int,d int,e int); +create table par_hash_incr_bucket_b(a int,b int,c int,d int,e int); +create table par_hash_incr_bucket_c(a int,b int,c int,d int,e int); +create table par_hash_incr_bucket_d(a int,b int,c int,d int,e int); +insert into par_hash_incr_bucket_a select n, n , n , n , n from generate_series(1,100000) n; +insert into par_hash_incr_bucket_b select n, n , n , n , n from generate_series(1,100000) n; +insert into par_hash_incr_bucket_c select n, n , n , n , n from generate_series(1,100000) n; +insert into par_hash_incr_bucket_d select n, n , n , n , n from generate_series(1,100000) n; +explain (costs off) select count(*) from par_hash_incr_bucket_a cross join par_hash_incr_bucket_b cross join par_hash_incr_bucket_c cross join par_hash_incr_bucket_d +where par_hash_incr_bucket_c.a = par_hash_incr_bucket_d.b + and par_hash_incr_bucket_a.c = par_hash_incr_bucket_b.d + and par_hash_incr_bucket_b.d = par_hash_incr_bucket_c.a + and par_hash_incr_bucket_b.e %2 =0; + QUERY PLAN +-------------------------------------------------------------------------------------------- + Aggregate + -> Hash Join + Hash Cond: (par_hash_incr_bucket_a.c = par_hash_incr_bucket_d.b) + -> Gather + Number of Workers: 2 + -> Parallel Hash Join + Hash Cond: (par_hash_incr_bucket_a.c = par_hash_incr_bucket_c.a) + -> Parallel Hash Join + Hash Cond: (par_hash_incr_bucket_a.c = par_hash_incr_bucket_b.d) + -> Parallel Seq Scan on par_hash_incr_bucket_a + -> Parallel Hash + -> Parallel Seq Scan on par_hash_incr_bucket_b + Filter: ((e % 2) = 0) + -> Parallel Hash + -> Parallel Seq Scan on par_hash_incr_bucket_c + -> Hash + -> Seq Scan on par_hash_incr_bucket_d +(17 rows) + +select count(*) from par_hash_incr_bucket_a cross join par_hash_incr_bucket_b cross join par_hash_incr_bucket_c cross join par_hash_incr_bucket_d +where par_hash_incr_bucket_c.a = par_hash_incr_bucket_d.b + and par_hash_incr_bucket_a.c = par_hash_incr_bucket_b.d + and par_hash_incr_bucket_b.d = par_hash_incr_bucket_c.a + and par_hash_incr_bucket_b.e %2 =0; + count +------- + 50000 +(1 row) + +DROP TABLE IF EXISTS par_hash_incr_bucket_a; +DROP TABLE IF EXISTS par_hash_incr_bucket_b; +DROP TABLE IF EXISTS par_hash_incr_bucket_c; +DROP TABLE IF EXISTS par_hash_incr_bucket_d; drop table parallel_hashjoin_test_a; drop table parallel_hashjoin_test_b; reset parallel_setup_cost; diff --git a/src/test/regress/sql/parallel_hashjoin.sql b/src/test/regress/sql/parallel_hashjoin.sql index 10210218b..cd3cd041c 100644 --- a/src/test/regress/sql/parallel_hashjoin.sql +++ b/src/test/regress/sql/parallel_hashjoin.sql @@ -19,6 +19,34 @@ select * from parallel_hashjoin_test_a left outer join parallel_hashjoin_test_b explain (costs off)select * from parallel_hashjoin_test_a full join parallel_hashjoin_test_b on parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id order by parallel_hashjoin_test_a.id limit 10; select * from parallel_hashjoin_test_a full join parallel_hashjoin_test_b on parallel_hashjoin_test_a.id = parallel_hashjoin_test_b.id order by parallel_hashjoin_test_a.id limit 10; +-- parallel increase hash buckets +DROP TABLE IF EXISTS par_hash_incr_bucket_a; +DROP TABLE IF EXISTS par_hash_incr_bucket_b; +DROP TABLE IF EXISTS par_hash_incr_bucket_c; +DROP TABLE IF EXISTS par_hash_incr_bucket_d; +create table par_hash_incr_bucket_a(a int,b int,c int,d int,e int); +create table par_hash_incr_bucket_b(a int,b int,c int,d int,e int); +create table par_hash_incr_bucket_c(a int,b int,c int,d int,e int); +create table par_hash_incr_bucket_d(a int,b int,c int,d int,e int); +insert into par_hash_incr_bucket_a select n, n , n , n , n from generate_series(1,100000) n; +insert into par_hash_incr_bucket_b select n, n , n , n , n from generate_series(1,100000) n; +insert into par_hash_incr_bucket_c select n, n , n , n , n from generate_series(1,100000) n; +insert into par_hash_incr_bucket_d select n, n , n , n , n from generate_series(1,100000) n; +explain (costs off) select count(*) from par_hash_incr_bucket_a cross join par_hash_incr_bucket_b cross join par_hash_incr_bucket_c cross join par_hash_incr_bucket_d +where par_hash_incr_bucket_c.a = par_hash_incr_bucket_d.b + and par_hash_incr_bucket_a.c = par_hash_incr_bucket_b.d + and par_hash_incr_bucket_b.d = par_hash_incr_bucket_c.a + and par_hash_incr_bucket_b.e %2 =0; +select count(*) from par_hash_incr_bucket_a cross join par_hash_incr_bucket_b cross join par_hash_incr_bucket_c cross join par_hash_incr_bucket_d +where par_hash_incr_bucket_c.a = par_hash_incr_bucket_d.b + and par_hash_incr_bucket_a.c = par_hash_incr_bucket_b.d + and par_hash_incr_bucket_b.d = par_hash_incr_bucket_c.a + and par_hash_incr_bucket_b.e %2 =0; +DROP TABLE IF EXISTS par_hash_incr_bucket_a; +DROP TABLE IF EXISTS par_hash_incr_bucket_b; +DROP TABLE IF EXISTS par_hash_incr_bucket_c; +DROP TABLE IF EXISTS par_hash_incr_bucket_d; + drop table parallel_hashjoin_test_a; drop table parallel_hashjoin_test_b; reset parallel_setup_cost; @@ -26,3 +54,5 @@ reset min_parallel_table_scan_size; reset parallel_tuple_cost; reset enable_nestloop; reset force_parallel_mode; + +