fix union all strict pw check logic
This commit is contained in:
@ -2142,21 +2142,48 @@ int ObSelectLogPlan::check_if_union_all_match_partition_wise(const ObIArray<ObLo
|
||||
bool &is_partition_wise)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
EqualSets equal_sets;
|
||||
EqualSets first_equal_sets;
|
||||
ObShardingInfo *first_sharding = NULL;
|
||||
ObShardingInfo *child_sharding = NULL;
|
||||
const ObSelectStmt *child_stmt = NULL;
|
||||
ObSEArray<ObRawExpr*, 4> first_select_exprs;
|
||||
ObSEArray<ObRawExpr*, 4> child_select_exprs;
|
||||
is_partition_wise = true;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && is_partition_wise && i < child_ops.count(); i++) {
|
||||
equal_sets.reset();
|
||||
child_select_exprs.reset();
|
||||
if (OB_ISNULL(child_ops.at(i)) ||
|
||||
OB_ISNULL(child_sharding = child_ops.at(i)->get_sharding())) {
|
||||
OB_ISNULL(child_sharding = child_ops.at(i)->get_sharding()) ||
|
||||
OB_ISNULL(child_ops.at(i)->get_plan()) ||
|
||||
OB_ISNULL(child_ops.at(i)->get_plan()->get_stmt()) ||
|
||||
!child_ops.at(i)->get_plan()->get_stmt()->is_select_stmt() ||
|
||||
OB_ISNULL(child_stmt = static_cast<const ObSelectStmt*>(child_ops.at(i)->get_plan()->get_stmt()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unexpected null", K(child_ops.at(i)), K(child_sharding), K(ret));
|
||||
} else if (child_ops.at(i)->is_exchange_allocated()) {
|
||||
is_partition_wise = false;
|
||||
} else if (i == 0) {
|
||||
first_sharding = child_sharding;
|
||||
} else if (OB_FAIL(ObShardingInfo::is_physically_equal_partitioned(*first_sharding,
|
||||
*child_sharding,
|
||||
is_partition_wise))) {
|
||||
first_equal_sets = child_ops.at(i)->get_output_equal_sets();
|
||||
if (OB_FAIL(child_stmt->get_select_exprs(first_select_exprs))) {
|
||||
LOG_WARN("failed to get select exprs", K(ret));
|
||||
}
|
||||
} else if (OB_FAIL(child_stmt->get_select_exprs(child_select_exprs))) {
|
||||
LOG_WARN("failed to get select exprs", K(ret));
|
||||
} else if (first_select_exprs.count() != child_select_exprs.count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("select exprs count doesn't match", K(first_select_exprs), K(child_select_exprs));
|
||||
} else if (OB_FAIL(append(equal_sets, first_equal_sets))) {
|
||||
LOG_WARN("failed to append equal sets", K(ret));
|
||||
} else if (OB_FAIL(append(equal_sets, child_ops.at(i)->get_output_equal_sets()))) {
|
||||
LOG_WARN("failed to append equal sets", K(ret));
|
||||
} else if (OB_FAIL(ObShardingInfo::check_if_match_partition_wise(equal_sets,
|
||||
first_select_exprs,
|
||||
child_select_exprs,
|
||||
first_sharding,
|
||||
child_sharding,
|
||||
is_partition_wise))) {
|
||||
LOG_WARN("failed to check if union all match strict partition-wise", K(ret));
|
||||
} else {
|
||||
LOG_TRACE("succ to check union all matching pw", K(is_partition_wise));
|
||||
|
||||
@ -725,31 +725,6 @@ int ObShardingInfo::check_if_match_repart_or_rehash(const EqualSets &equal_sets,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObShardingInfo::is_physically_equal_partitioned(const ObShardingInfo &left_sharding,
|
||||
const ObShardingInfo &right_sharding,
|
||||
bool &is_equal_partition)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_equal_partition = false;
|
||||
PwjTable l_table;
|
||||
PwjTable r_table;
|
||||
ObStrictPwjComparer pwj_comparer;
|
||||
if (OB_FAIL(l_table.init(left_sharding))) {
|
||||
LOG_WARN("failed to init pwj table with sharding info", K(ret));
|
||||
} else if (OB_FAIL(r_table.init(right_sharding))) {
|
||||
LOG_WARN("failed to init pwj table with sharding info", K(ret));
|
||||
} else if (OB_FAIL(pwj_comparer.add_table(l_table, is_equal_partition))) {
|
||||
LOG_WARN("failed to add table", K(ret));
|
||||
} else if (!is_equal_partition) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(pwj_comparer.add_table(r_table, is_equal_partition))) {
|
||||
LOG_WARN("failed to add table", K(ret));
|
||||
} else {
|
||||
// do nothing
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObShardingInfo::is_physically_equal_serverlist(ObIArray<ObAddr> &left_server_list,
|
||||
ObIArray<ObAddr> &right_server_list,
|
||||
bool &is_equal_serverlist)
|
||||
|
||||
@ -182,10 +182,6 @@ public:
|
||||
const common::ObIArray<ObRawExpr *> &target_part_keys,
|
||||
bool &is_match_join_keys);
|
||||
|
||||
static int is_physically_equal_partitioned(const ObShardingInfo &left_sharding,
|
||||
const ObShardingInfo &right_sharding,
|
||||
bool &is_equal_partition);
|
||||
|
||||
static int is_physically_both_shuffled_serverlist(ObIArray<ObAddr> &left_server_list,
|
||||
ObIArray<ObAddr> &right_server_list,
|
||||
bool &is_both_shuffled_serverlist);
|
||||
|
||||
@ -6568,11 +6568,13 @@ Query Plan
|
||||
|3 | EXCHANGE IN DISTR | |1 |26 |
|
||||
|4 | EXCHANGE OUT DISTR (HASH)|:EX10000|1 |26 |
|
||||
|5 | HASH DISTINCT | |1 |26 |
|
||||
|6 | PX PARTITION ITERATOR | |3 |26 |
|
||||
|7 | UNION ALL | |3 |26 |
|
||||
|6 | UNION ALL | |3 |26 |
|
||||
|7 | PX PARTITION ITERATOR | |1 |9 |
|
||||
|8 | TABLE SCAN |t1 |1 |9 |
|
||||
|9 | TABLE SCAN |t2 |1 |9 |
|
||||
|10| TABLE SCAN |t3 |1 |9 |
|
||||
|9 | PX PARTITION ITERATOR | |1 |9 |
|
||||
|10| TABLE SCAN |t2 |1 |9 |
|
||||
|11| PX PARTITION ITERATOR | |1 |9 |
|
||||
|12| TABLE SCAN |t3 |1 |9 |
|
||||
=================================================================
|
||||
Outputs & filters:
|
||||
-------------------------------------
|
||||
@ -6587,17 +6589,21 @@ Outputs & filters:
|
||||
5 - output([UNION([1])]), filter(nil), rowset=256
|
||||
distinct([UNION([1])])
|
||||
6 - output([UNION([1])]), filter(nil), rowset=256
|
||||
partition wise, force partition granule
|
||||
7 - output([UNION([1])]), filter(nil), rowset=256
|
||||
7 - output([t1.a]), filter(nil), rowset=256
|
||||
affinitize, partition wise, force partition granule
|
||||
8 - output([t1.a]), filter(nil), rowset=256
|
||||
access([t1.a]), partitions(p[0-4])
|
||||
is_index_back=false, is_global_index=false,
|
||||
range_key([t1.__pk_increment]), range(MIN ; MAX)always true
|
||||
9 - output([t2.a]), filter(nil), rowset=256
|
||||
affinitize, partition wise, force partition granule
|
||||
10 - output([t2.a]), filter(nil), rowset=256
|
||||
access([t2.a]), partitions(p[0-4])
|
||||
is_index_back=false, is_global_index=false,
|
||||
range_key([t2.__pk_increment]), range(MIN ; MAX)always true
|
||||
10 - output([t3.b]), filter(nil), rowset=256
|
||||
11 - output([t3.b]), filter(nil), rowset=256
|
||||
affinitize, partition wise, force partition granule
|
||||
12 - output([t3.b]), filter(nil), rowset=256
|
||||
access([t3.b]), partitions(p[0-4])
|
||||
is_index_back=false, is_global_index=false,
|
||||
range_key([t3.__pk_increment]), range(MIN ; MAX)always true
|
||||
@ -6968,13 +6974,15 @@ Query Plan
|
||||
|7 | EXCHANGE IN DISTR | |1 |26 |
|
||||
|8 | EXCHANGE OUT DISTR (HASH)|:EX10000|1 |26 |
|
||||
|9 | HASH DISTINCT | |1 |26 |
|
||||
|10| PX PARTITION ITERATOR | |3 |26 |
|
||||
|11| UNION ALL | |3 |26 |
|
||||
|10| UNION ALL | |3 |26 |
|
||||
|11| PX PARTITION ITERATOR | |1 |9 |
|
||||
|12| TABLE SCAN |t1 |1 |9 |
|
||||
|13| TABLE SCAN |t2 |1 |9 |
|
||||
|14| TABLE SCAN |t3 |1 |9 |
|
||||
|15| PX PARTITION ITERATOR | |1 |9 |
|
||||
|16| TABLE SCAN |t5 |1 |9 |
|
||||
|13| PX PARTITION ITERATOR | |1 |9 |
|
||||
|14| TABLE SCAN |t2 |1 |9 |
|
||||
|15| PX PARTITION ITERATOR | |1 |9 |
|
||||
|16| TABLE SCAN |t3 |1 |9 |
|
||||
|17| PX PARTITION ITERATOR | |1 |9 |
|
||||
|18| TABLE SCAN |t5 |1 |9 |
|
||||
=====================================================================
|
||||
Outputs & filters:
|
||||
-------------------------------------
|
||||
@ -6996,23 +7004,27 @@ Outputs & filters:
|
||||
9 - output([UNION([1])]), filter(nil), rowset=256
|
||||
distinct([UNION([1])])
|
||||
10 - output([UNION([1])]), filter(nil), rowset=256
|
||||
partition wise, force partition granule
|
||||
11 - output([UNION([1])]), filter(nil), rowset=256
|
||||
11 - output([t1.a]), filter(nil), rowset=256
|
||||
affinitize, partition wise, force partition granule
|
||||
12 - output([t1.a]), filter(nil), rowset=256
|
||||
access([t1.a]), partitions(p[0-4])
|
||||
is_index_back=false, is_global_index=false,
|
||||
range_key([t1.__pk_increment]), range(MIN ; MAX)always true
|
||||
13 - output([t2.a]), filter(nil), rowset=256
|
||||
affinitize, partition wise, force partition granule
|
||||
14 - output([t2.a]), filter(nil), rowset=256
|
||||
access([t2.a]), partitions(p[0-4])
|
||||
is_index_back=false, is_global_index=false,
|
||||
range_key([t2.__pk_increment]), range(MIN ; MAX)always true
|
||||
14 - output([t3.b]), filter(nil), rowset=256
|
||||
15 - output([t3.b]), filter(nil), rowset=256
|
||||
affinitize, partition wise, force partition granule
|
||||
16 - output([t3.b]), filter(nil), rowset=256
|
||||
access([t3.b]), partitions(p[0-4])
|
||||
is_index_back=false, is_global_index=false,
|
||||
range_key([t3.__pk_increment]), range(MIN ; MAX)always true
|
||||
15 - output([t5.a], [t5.b], [t5.c]), filter(nil), rowset=256
|
||||
17 - output([t5.a], [t5.b], [t5.c]), filter(nil), rowset=256
|
||||
affinitize, force partition granule
|
||||
16 - output([t5.a], [t5.b], [t5.c]), filter(nil), rowset=256
|
||||
18 - output([t5.a], [t5.b], [t5.c]), filter(nil), rowset=256
|
||||
access([t5.a], [t5.b], [t5.c]), partitions(p[0-4])
|
||||
is_index_back=false, is_global_index=false,
|
||||
range_key([t5.__pk_increment]), range(MIN ; MAX)always true
|
||||
@ -7367,34 +7379,40 @@ Outputs & filters:
|
||||
range_key([t2.__pk_increment]), range(MIN ; MAX)always true
|
||||
explain select a from t1 union all select a from t2 union all select b from t3;
|
||||
Query Plan
|
||||
===========================================================
|
||||
|ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)|
|
||||
-----------------------------------------------------------
|
||||
|0 |PX COORDINATOR | |3 |27 |
|
||||
|1 | EXCHANGE OUT DISTR |:EX10000|3 |27 |
|
||||
|2 | PX PARTITION ITERATOR| |3 |26 |
|
||||
|3 | UNION ALL | |3 |26 |
|
||||
|4 | TABLE SCAN |t1 |1 |9 |
|
||||
|5 | TABLE SCAN |t2 |1 |9 |
|
||||
|6 | TABLE SCAN |t3 |1 |9 |
|
||||
===========================================================
|
||||
============================================================
|
||||
|ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)|
|
||||
------------------------------------------------------------
|
||||
|0 |PX COORDINATOR | |3 |27 |
|
||||
|1 | EXCHANGE OUT DISTR |:EX10000|3 |27 |
|
||||
|2 | UNION ALL | |3 |26 |
|
||||
|3 | PX PARTITION ITERATOR| |1 |9 |
|
||||
|4 | TABLE SCAN |t1 |1 |9 |
|
||||
|5 | PX PARTITION ITERATOR| |1 |9 |
|
||||
|6 | TABLE SCAN |t2 |1 |9 |
|
||||
|7 | PX PARTITION ITERATOR| |1 |9 |
|
||||
|8 | TABLE SCAN |t3 |1 |9 |
|
||||
============================================================
|
||||
Outputs & filters:
|
||||
-------------------------------------
|
||||
0 - output([INTERNAL_FUNCTION(UNION([1]))]), filter(nil), rowset=256
|
||||
1 - output([INTERNAL_FUNCTION(UNION([1]))]), filter(nil), rowset=256
|
||||
dop=1
|
||||
2 - output([UNION([1])]), filter(nil), rowset=256
|
||||
partition wise, force partition granule
|
||||
3 - output([UNION([1])]), filter(nil), rowset=256
|
||||
3 - output([t1.a]), filter(nil), rowset=256
|
||||
affinitize, partition wise, force partition granule
|
||||
4 - output([t1.a]), filter(nil), rowset=256
|
||||
access([t1.a]), partitions(p[0-4])
|
||||
is_index_back=false, is_global_index=false,
|
||||
range_key([t1.__pk_increment]), range(MIN ; MAX)always true
|
||||
5 - output([t2.a]), filter(nil), rowset=256
|
||||
affinitize, partition wise, force partition granule
|
||||
6 - output([t2.a]), filter(nil), rowset=256
|
||||
access([t2.a]), partitions(p[0-4])
|
||||
is_index_back=false, is_global_index=false,
|
||||
range_key([t2.__pk_increment]), range(MIN ; MAX)always true
|
||||
6 - output([t3.b]), filter(nil), rowset=256
|
||||
7 - output([t3.b]), filter(nil), rowset=256
|
||||
affinitize, partition wise, force partition granule
|
||||
8 - output([t3.b]), filter(nil), rowset=256
|
||||
access([t3.b]), partitions(p[0-4])
|
||||
is_index_back=false, is_global_index=false,
|
||||
range_key([t3.__pk_increment]), range(MIN ; MAX)always true
|
||||
@ -7742,11 +7760,13 @@ Query Plan
|
||||
|5 | EXCHANGE IN DISTR | |3 |27 |
|
||||
|6 | EXCHANGE OUT DISTR (PKEY)|:EX10000|3 |27 |
|
||||
|7 | SUBPLAN SCAN |t4 |3 |26 |
|
||||
|8 | PX PARTITION ITERATOR | |3 |26 |
|
||||
|9 | UNION ALL | |3 |26 |
|
||||
|8 | UNION ALL | |3 |26 |
|
||||
|9 | PX PARTITION ITERATOR | |1 |9 |
|
||||
|10| TABLE SCAN |t1 |1 |9 |
|
||||
|11| TABLE SCAN |t2 |1 |9 |
|
||||
|12| TABLE SCAN |t3 |1 |9 |
|
||||
|11| PX PARTITION ITERATOR | |1 |9 |
|
||||
|12| TABLE SCAN |t2 |1 |9 |
|
||||
|13| PX PARTITION ITERATOR | |1 |9 |
|
||||
|14| TABLE SCAN |t3 |1 |9 |
|
||||
=================================================================
|
||||
Outputs & filters:
|
||||
-------------------------------------
|
||||
@ -7767,17 +7787,21 @@ Outputs & filters:
|
||||
7 - output([t4.a]), filter(nil), rowset=256
|
||||
access([t4.a])
|
||||
8 - output([UNION([1])]), filter(nil), rowset=256
|
||||
partition wise, force partition granule
|
||||
9 - output([UNION([1])]), filter(nil), rowset=256
|
||||
9 - output([t1.a]), filter(nil), rowset=256
|
||||
affinitize, partition wise, force partition granule
|
||||
10 - output([t1.a]), filter(nil), rowset=256
|
||||
access([t1.a]), partitions(p[0-4])
|
||||
is_index_back=false, is_global_index=false,
|
||||
range_key([t1.__pk_increment]), range(MIN ; MAX)always true
|
||||
11 - output([t2.a]), filter(nil), rowset=256
|
||||
affinitize, partition wise, force partition granule
|
||||
12 - output([t2.a]), filter(nil), rowset=256
|
||||
access([t2.a]), partitions(p[0-4])
|
||||
is_index_back=false, is_global_index=false,
|
||||
range_key([t2.__pk_increment]), range(MIN ; MAX)always true
|
||||
12 - output([t3.b]), filter(nil), rowset=256
|
||||
13 - output([t3.b]), filter(nil), rowset=256
|
||||
affinitize, partition wise, force partition granule
|
||||
14 - output([t3.b]), filter(nil), rowset=256
|
||||
access([t3.b]), partitions(p[0-4])
|
||||
is_index_back=false, is_global_index=false,
|
||||
range_key([t3.__pk_increment]), range(MIN ; MAX)always true
|
||||
|
||||
Reference in New Issue
Block a user