Fix RF performance degradation.
This commit is contained in:
@ -16,6 +16,7 @@
|
||||
#include "sql/optimizer/ob_log_exchange.h"
|
||||
#include "sql/optimizer/ob_log_table_scan.h"
|
||||
#include "sql/optimizer/ob_log_del_upd.h"
|
||||
#include "sql/optimizer/ob_log_join_filter.h"
|
||||
#include "common/ob_smart_call.h"
|
||||
|
||||
using namespace oceanbase::common;
|
||||
@ -254,6 +255,16 @@ int ObPxResourceAnalyzer::do_split(
|
||||
LOG_WARN("fail create qc for rescan op", K(ret));
|
||||
}
|
||||
} else {
|
||||
if (OB_SUCC(ret)) {
|
||||
if (log_op_def::LOG_JOIN_FILTER == root_op.get_type()) {
|
||||
if (OB_NOT_NULL(parent_dfo)) {
|
||||
ObLogJoinFilter &log_join_filter = static_cast<ObLogJoinFilter &>(root_op);
|
||||
if (log_join_filter.is_create_filter() && !parent_dfo->force_bushy()) {
|
||||
parent_dfo->force_bushy_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (log_op_def::LOG_EXCHANGE == root_op.get_type() &&
|
||||
static_cast<const ObLogExchange&>(root_op).is_px_producer()) {
|
||||
DfoInfo *dfo = nullptr;
|
||||
@ -297,7 +308,7 @@ int ObPxResourceAnalyzer::do_split(
|
||||
if (OB_FAIL(parent_dfo->get_child(0, child_dfo))) {
|
||||
LOG_WARN("get child dfo failed", K(ret));
|
||||
} else {
|
||||
for (ObHashSet<ObAddr>::const_iterator it = child_dfo->location_addr_.begin();
|
||||
for (ObHashSet<ObAddr>::const_iterator it = child_dfo->location_addr_.begin();
|
||||
OB_SUCC(ret) && it != child_dfo->location_addr_.end(); ++it) {
|
||||
if (OB_FAIL(parent_dfo->location_addr_.set_refactored(it->first))){
|
||||
LOG_WARN("set refactored failed", K(ret), K(it->first));
|
||||
@ -332,7 +343,7 @@ int ObPxResourceAnalyzer::create_dfo(DfoInfo *&dfo, int64_t dop)
|
||||
int ObPxResourceAnalyzer::get_dfo_addr_set(const ObLogicalOperator &root_op, ObHashSet<ObAddr> &addr_set)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if ((root_op.is_table_scan() && !root_op.get_contains_fake_cte()) ||
|
||||
if ((root_op.is_table_scan() && !root_op.get_contains_fake_cte()) ||
|
||||
(root_op.is_dml_operator() && (static_cast<const ObLogDelUpd&>(root_op)).is_pdml())) {
|
||||
const ObTablePartitionInfo *tbl_part_info = nullptr;
|
||||
if (root_op.is_table_scan()) {
|
||||
@ -398,11 +409,11 @@ int ObPxResourceAnalyzer::walk_through_px_trees(
|
||||
if (max_parallel_thread_map.created()) {
|
||||
max_parallel_thread_map.clear();
|
||||
max_parallel_group_map.clear();
|
||||
} else if (OB_FAIL(max_parallel_thread_map.create(bucket_size,
|
||||
} else if (OB_FAIL(max_parallel_thread_map.create(bucket_size,
|
||||
ObModIds::OB_SQL_PX,
|
||||
ObModIds::OB_SQL_PX))){
|
||||
LOG_WARN("create hash map failed", K(ret));
|
||||
} else if (OB_FAIL(max_parallel_group_map.create(bucket_size,
|
||||
} else if (OB_FAIL(max_parallel_group_map.create(bucket_size,
|
||||
ObModIds::OB_SQL_PX,
|
||||
ObModIds::OB_SQL_PX))){
|
||||
LOG_WARN("create hash map failed", K(ret));
|
||||
@ -475,7 +486,7 @@ int ObPxResourceAnalyzer::walk_through_dfo_tree(
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("NULL ptr", K(ret));
|
||||
} else if (OB_FAIL(DfoTreeNormalizer<DfoInfo>::normalize(*px_root.root_dfo_))) {
|
||||
LOG_WARN("fail normalize px tree", K(ret));
|
||||
LOG_WARN("fail normalize px tree", K(ret));
|
||||
} else if (OB_FAIL(sched_order_gen.generate(*px_root.root_dfo_, edges))) {
|
||||
LOG_WARN("fail generate sched order", K(ret));
|
||||
} else if (OB_FAIL(current_thread_map.create(bucket_size,
|
||||
@ -557,7 +568,7 @@ int ObPxResourceAnalyzer::px_tree_append(ObHashMap<ObAddr, int64_t> &max_paralle
|
||||
ObHashMap<ObAddr, int64_t> ¶llel_count)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
for (ObHashMap<ObAddr, int64_t>::const_iterator it = parallel_count.begin();
|
||||
for (ObHashMap<ObAddr, int64_t>::const_iterator it = parallel_count.begin();
|
||||
OB_SUCC(ret) && it != parallel_count.end(); ++it) {
|
||||
bool is_exist = true;
|
||||
int64_t dop = 0;
|
||||
|
||||
Reference in New Issue
Block a user