diff --git a/src/sql/engine/px/ob_dfo_mgr.cpp b/src/sql/engine/px/ob_dfo_mgr.cpp index f67d7c3638..f5d0228f09 100644 --- a/src/sql/engine/px/ob_dfo_mgr.cpp +++ b/src/sql/engine/px/ob_dfo_mgr.cpp @@ -518,6 +518,8 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx, gi_spec->bf_info_.p2p_dh_id_, node))) { LOG_WARN("fail to set p2p dh id to map", K(ret)); + } else if (OB_FAIL(px_coord_info.rf_dpd_info_.rf_use_ops_.push_back(phy_op))) { + LOG_WARN("failed to push back parition filter gi op"); } else { parent_dfo->set_need_p2p_info(true); } diff --git a/src/sql/engine/px/ob_px_bloom_filter.cpp b/src/sql/engine/px/ob_px_bloom_filter.cpp index e097486928..39c1910744 100644 --- a/src/sql/engine/px/ob_px_bloom_filter.cpp +++ b/src/sql/engine/px/ob_px_bloom_filter.cpp @@ -482,7 +482,7 @@ OB_DEF_SERIALIZE_SIZE(ObPxBloomFilter) int ObPxBFStaticInfo::init(int64_t tenant_id, int64_t filter_id, int64_t server_id, bool is_shared, bool skip_subpart, int64_t p2p_dh_id, - bool is_shuffle) + bool is_shuffle, ObLogJoinFilter *log_join_filter_create_op) { int ret = OB_SUCCESS; if (is_inited_){ @@ -496,6 +496,7 @@ int ObPxBFStaticInfo::init(int64_t tenant_id, int64_t filter_id, skip_subpart_ = skip_subpart; p2p_dh_id_ = p2p_dh_id; is_shuffle_ = is_shuffle; + log_join_filter_create_op_ = log_join_filter_create_op; is_inited_ = true; } return ret; diff --git a/src/sql/engine/px/ob_px_bloom_filter.h b/src/sql/engine/px/ob_px_bloom_filter.h index f1a4bbfc80..c52592b18c 100644 --- a/src/sql/engine/px/ob_px_bloom_filter.h +++ b/src/sql/engine/px/ob_px_bloom_filter.h @@ -130,6 +130,8 @@ public: DISALLOW_COPY_AND_ASSIGN(ObPxBloomFilter); }; +class ObLogJoinFilter; + struct ObPxBFStaticInfo { OB_UNIS_VERSION(1); @@ -138,12 +140,12 @@ public: : is_inited_(false), tenant_id_(common::OB_INVALID_TENANT_ID), filter_id_(common::OB_INVALID_ID), server_id_(common::OB_INVALID_ID), is_shared_(false), skip_subpart_(false), - p2p_dh_id_(OB_INVALID_ID), is_shuffle_(false) + p2p_dh_id_(OB_INVALID_ID), is_shuffle_(false), log_join_filter_create_op_(nullptr) {} int init(int64_t tenant_id, int64_t filter_id, int64_t server_id, bool is_shared, bool skip_subpart, int64_t p2p_dh_id, - bool is_shuffle); + bool is_shuffle, ObLogJoinFilter *log_join_filter_create_op); bool is_inited_; int64_t tenant_id_; int64_t filter_id_; @@ -155,6 +157,7 @@ public: TO_STRING_KV(K(is_inited_), K(tenant_id_), K(filter_id_), K(server_id_), K(is_shared_), K(skip_subpart_), K(is_shuffle_), K(p2p_dh_id_)); + ObLogJoinFilter *log_join_filter_create_op_; // not need to serialize, only used in optimizor }; class ObPXBloomFilterHashWrapper diff --git a/src/sql/engine/px/ob_px_scheduler.cpp b/src/sql/engine/px/ob_px_scheduler.cpp index 40530b87ca..0009a5067f 100644 --- a/src/sql/engine/px/ob_px_scheduler.cpp +++ b/src/sql/engine/px/ob_px_scheduler.cpp @@ -727,11 +727,20 @@ int RuntimeFilterDependencyInfo::describe_dependency(ObDfo *root_dfo) for (int64_t i = 0; i < rf_create_ops_.count() && OB_SUCC(ret); ++i) { const ObJoinFilterSpec *create_op = static_cast(rf_create_ops_.at(i)); for (int64_t j = 0; j < rf_use_ops_.count() && OB_SUCC(ret); ++j) { - const ObJoinFilterSpec *use_op = static_cast(rf_use_ops_.at(j)); - if (create_op->get_filter_id() == use_op->get_filter_id()) { + int64_t use_filter_id = common::OB_INVALID_ID; + if (IS_PX_GI(rf_use_ops_.at(j)->get_type())) { + const ObGranuleIteratorSpec *use_op = + static_cast(rf_use_ops_.at(j)); + use_filter_id = use_op->bf_info_.filter_id_; + } else { + const ObJoinFilterSpec *use_op = static_cast(rf_use_ops_.at(j)); + use_filter_id = use_op->get_filter_id(); + } + if (create_op->get_filter_id() == use_filter_id) { const ObOpSpec *ancestor_op = nullptr; ObDfo *op_dfo = nullptr;; - if (OB_FAIL(LowestCommonAncestorFinder::find_op_common_ancestor(create_op, use_op, ancestor_op))) { + if (OB_FAIL(LowestCommonAncestorFinder::find_op_common_ancestor( + create_op, rf_use_ops_.at(j), ancestor_op))) { LOG_WARN("failed to find op common ancestor"); } else if (OB_ISNULL(ancestor_op)) { ret = OB_ERR_UNEXPECTED; diff --git a/src/sql/optimizer/ob_log_join_filter.h b/src/sql/optimizer/ob_log_join_filter.h index 36711ff5c5..b16199d7dd 100644 --- a/src/sql/optimizer/ob_log_join_filter.h +++ b/src/sql/optimizer/ob_log_join_filter.h @@ -30,8 +30,9 @@ public: ObLogJoinFilter(ObLogPlan &plan) : ObLogicalOperator(plan), is_create_(false), filter_id_(common::OB_INVALID_ID), - filter_len_(0), join_exprs_(), - is_use_filter_shuffle_(false), + filter_len_(0), paired_join_filter_(nullptr), + join_exprs_(), is_use_filter_shuffle_(false), + join_filter_cmp_funcs_(), join_filter_exprs_(), join_filter_types_(), p2p_sequence_ids_(), @@ -60,6 +61,8 @@ public: } } inline int64_t get_filter_length() const { return filter_len_; } + inline void set_paired_join_filter(ObLogicalOperator *paired_join_filter) { paired_join_filter_ = paired_join_filter; } + inline ObLogicalOperator *get_paired_join_filter() const { return paired_join_filter_; } inline void set_is_use_filter_shuffle(bool flag) { is_use_filter_shuffle_ = flag; } inline bool is_use_filter_shuffle() { return is_use_filter_shuffle_; } inline bool is_partition_filter() const @@ -109,6 +112,9 @@ private: bool is_create_; //判断是否是create算子 int64_t filter_id_; //设置filter_id int64_t filter_len_; //设置filter长度 + // if this is a join filter create op, the paired_join_filter_ is join filter use op, vice versa + // if this is a partition join filter create op, the paired_join_filter_ is partition filter gi + ObLogicalOperator *paired_join_filter_; //equal join condition expr common::ObSEArray join_exprs_; bool is_use_filter_shuffle_; // 标记use端filter是否有shuffle diff --git a/src/sql/optimizer/ob_logical_operator.cpp b/src/sql/optimizer/ob_logical_operator.cpp index c4bba8deac..4c2f569e71 100644 --- a/src/sql/optimizer/ob_logical_operator.cpp +++ b/src/sql/optimizer/ob_logical_operator.cpp @@ -4034,6 +4034,8 @@ int ObLogicalOperator::allocate_granule_nodes_above(AllocGIContext &ctx) } else { gi_op->set_tablet_id_expr(tablet_id_expr); gi_op->set_join_filter_info(table_scan->get_join_filter_info()); + ObLogJoinFilter *jf_create_op = gi_op->get_join_filter_info().log_join_filter_create_op_; + jf_create_op->set_paired_join_filter(gi_op); gi_op->add_flag(GI_USE_PARTITION_FILTER); } } else if (LOG_GROUP_BY == get_type()) { @@ -4768,7 +4770,7 @@ int ObLogicalOperator::allocate_partition_join_filter(const ObIArrayis_shared_join_filter(), info.skip_subpart_, join_filter_create->get_p2p_sequence_ids().at(0), - right_has_exchange)); + right_has_exchange, join_filter_create)); scan_op->set_join_filter_info(bf_info); scan_op->set_part_join_filter_created(true); filter_id++; @@ -4821,6 +4823,8 @@ int ObLogicalOperator::allocate_normal_join_filter(const ObIArray(filter_create); join_filter_use = static_cast(filter_use); + join_filter_create->set_paired_join_filter(join_filter_use); + join_filter_use->set_paired_join_filter(join_filter_create); join_filter_create->set_is_create_filter(true); join_filter_use->set_is_create_filter(false); join_filter_create->set_filter_id(filter_id); diff --git a/src/sql/optimizer/ob_logical_operator.h b/src/sql/optimizer/ob_logical_operator.h index a1365bcb01..ff879995c3 100644 --- a/src/sql/optimizer/ob_logical_operator.h +++ b/src/sql/optimizer/ob_logical_operator.h @@ -829,6 +829,11 @@ public: return parent_; } + inline const ObLogicalOperator *get_parent() const + { + return parent_; + } + int get_parent(ObLogicalOperator *root, ObLogicalOperator *&parent); /** * 目前优化器使用两阶段来生成计划: @@ -1022,7 +1027,7 @@ public: id_ = id; } - inline uint64_t get_op_id() { return op_id_; } + inline uint64_t get_op_id() const { return op_id_; } inline void set_op_id(uint64_t op_id) { op_id_ = op_id; } inline bool is_partition_wise() const { return is_partition_wise_; } inline void set_is_partition_wise(bool is_partition_wise) diff --git a/src/sql/optimizer/ob_px_resource_analyzer.cpp b/src/sql/optimizer/ob_px_resource_analyzer.cpp index 6498e054b3..0a50575c66 100644 --- a/src/sql/optimizer/ob_px_resource_analyzer.cpp +++ b/src/sql/optimizer/ob_px_resource_analyzer.cpp @@ -72,6 +72,42 @@ int SchedOrderGenerator::generate( // =================================================================================== +int LogRuntimeFilterDependencyInfo::describe_dependency(DfoInfo *root_dfo) +{ + int ret = OB_SUCCESS; + // for each rf create op, find its pair rf use op, + // then get the lowest common ancestor of them, mark force_bushy of the dfo which the ancestor belongs to. + for (int64_t i = 0; i < rf_create_ops_.count() && OB_SUCC(ret); ++i) { + const ObLogJoinFilter *create_op = static_cast(rf_create_ops_.at(i)); + const ObLogicalOperator *use_op = create_op->get_paired_join_filter(); + if (OB_ISNULL(use_op)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("use_op is null"); + } else { + const ObLogicalOperator *ancestor_op = nullptr; + DfoInfo *op_dfo = nullptr;; + if (OB_FAIL(LogLowestCommonAncestorFinder::find_op_common_ancestor(create_op, use_op, ancestor_op))) { + LOG_WARN("failed to find op common ancestor"); + } else if (OB_ISNULL(ancestor_op)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("op common ancestor not found"); + } else if (OB_FAIL(LogLowestCommonAncestorFinder::get_op_dfo(ancestor_op, root_dfo, op_dfo))) { + LOG_WARN("failed to find op common ancestor"); + } else if (OB_ISNULL(op_dfo)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("the dfo of ancestor_op not found"); + } else { + // Once the DFO which the ancestor belongs to has set the flag "force_bushy", + // the DfoTreeNormalizer will not attempt to transform a right-deep DFO tree + // into a left-deep DFO tree. Consequently, the "join filter create" operator + // can be scheduled earlier than the "join filter use" operator. + op_dfo->set_force_bushy(true); + } + } + } + return ret; +} + int DfoInfo::add_child(DfoInfo *child) { @@ -255,20 +291,17 @@ int ObPxResourceAnalyzer::do_split( LOG_WARN("fail create qc for rescan op", K(ret)); } } else { - if (OB_SUCC(ret)) { - if (log_op_def::LOG_JOIN_FILTER == root_op.get_type()) { - if (OB_NOT_NULL(parent_dfo)) { - ObLogJoinFilter &log_join_filter = static_cast(root_op); - if (log_join_filter.is_create_filter() && !parent_dfo->force_bushy()) { - parent_dfo->force_bushy_ = true; - } - } + if (OB_FAIL(ret)) { + } else if (log_op_def::LOG_JOIN_FILTER == root_op.get_type()) { + ObLogJoinFilter &log_join_filter = static_cast(root_op); + if (log_join_filter.is_create_filter() + && OB_FAIL(px_info.rf_dpd_info_.rf_create_ops_.push_back(&root_op))) { + LOG_WARN("failed to push_back log join filter create", K(ret)); } - } - if (log_op_def::LOG_EXCHANGE == root_op.get_type() && - static_cast(root_op).is_px_producer()) { + } else if (log_op_def::LOG_EXCHANGE == root_op.get_type() + && static_cast(root_op).is_px_producer()) { DfoInfo *dfo = nullptr; - if (OB_FAIL(create_dfo(dfo, static_cast(root_op).get_parallel()))) { + if (OB_FAIL(create_dfo(dfo, root_op))) { LOG_WARN("fail create dfo", K(ret)); } else { if (OB_FAIL(dfo->location_addr_.create(hash::cal_next_prime(10), "PxResourceBucket", "PxResourceNode"))) { @@ -324,9 +357,10 @@ int ObPxResourceAnalyzer::do_split( return ret; } -int ObPxResourceAnalyzer::create_dfo(DfoInfo *&dfo, int64_t dop) +int ObPxResourceAnalyzer::create_dfo(DfoInfo *&dfo, ObLogicalOperator &root_op) { int ret = OB_SUCCESS; + int64_t dop = static_cast(root_op).get_parallel(); void *mem_ptr = dfo_allocator_.alloc(sizeof(DfoInfo)); if (OB_ISNULL(mem_ptr)) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -335,6 +369,7 @@ int ObPxResourceAnalyzer::create_dfo(DfoInfo *&dfo, int64_t dop) ret = OB_ERR_UNEXPECTED; LOG_WARN("Null ptr unexpected", KP(mem_ptr), K(ret)); } else { + dfo->set_root_op(&root_op); dfo->set_dop(dop); } return ret; @@ -428,7 +463,9 @@ int ObPxResourceAnalyzer::walk_through_px_trees( int64_t group_count = 0; thread_map.clear(); group_map.clear(); - if (OB_FAIL(walk_through_dfo_tree(px_info, thread_count, group_count, thread_map, group_map))) { + if (OB_FAIL(px_info.rf_dpd_info_.describe_dependency(px_info.root_dfo_))) { + LOG_WARN("failed to describe dependency"); + } else if (OB_FAIL(walk_through_dfo_tree(px_info, thread_count, group_count, thread_map, group_map))) { LOG_WARN("fail calc px thread group count", K(i), "total", px_trees.count(), K(ret)); } else if (OB_ISNULL(px_info.root_op_)) { ret = OB_ERR_UNEXPECTED; @@ -715,3 +752,79 @@ int ObPxResourceAnalyzer::update_max_thead_group_info( } return ret; } + +int LogLowestCommonAncestorFinder::find_op_common_ancestor( + const ObLogicalOperator *left, const ObLogicalOperator *right, const ObLogicalOperator *&ancestor) +{ + int ret = OB_SUCCESS; + ObSEArray ancestors; + + const ObLogicalOperator *parent = left; + while (OB_NOT_NULL(parent) && OB_SUCC(ret)) { + if (OB_FAIL(ancestors.push_back(parent))) { + LOG_WARN("failed to push back"); + } else { + parent = parent->get_parent(); + } + } + + parent = right; + bool find = false; + while (OB_NOT_NULL(parent) && OB_SUCC(ret) && !find) { + for (int64_t i = 0; i < ancestors.count() && OB_SUCC(ret); ++i) { + if (parent == ancestors.at(i)) { + find = true; + ancestor = parent; + break; + } + } + parent = parent->get_parent(); + } + return ret; +} +int LogLowestCommonAncestorFinder::get_op_dfo(const ObLogicalOperator *op, DfoInfo *root_dfo, DfoInfo *&op_dfo) +{ + int ret = OB_SUCCESS; + const ObLogicalOperator *parent = op; + const ObLogicalOperator *dfo_root_op = nullptr; + while (OB_NOT_NULL(parent) && OB_SUCC(ret)) { + if (log_op_def::LOG_EXCHANGE == parent->get_type() && + static_cast(*parent).is_px_producer()) { + dfo_root_op = parent; + break; + } else { + parent = parent->get_parent(); + } + } + DfoInfo *dfo = nullptr; + bool find = false; + + ObSEArray dfo_queue; + int64_t cur_que_front = 0; + if (OB_FAIL(dfo_queue.push_back(root_dfo))) { + LOG_WARN("failed to push back"); + } + + while (cur_que_front < dfo_queue.count() && !find && OB_SUCC(ret)) { + int64_t cur_que_size = dfo_queue.count() - cur_que_front; + for (int64_t i = 0; i < cur_que_size && OB_SUCC(ret); ++i) { + dfo = dfo_queue.at(cur_que_front); + if (dfo->get_root_op() == dfo_root_op) { + op_dfo = dfo; + find = true; + break; + } else { + // push child into the queue + for (int64_t child_idx = 0; OB_SUCC(ret) && child_idx < dfo->get_child_count(); ++child_idx) { + if (OB_FAIL(dfo_queue.push_back(dfo->child_dfos_.at(child_idx)))) { + LOG_WARN("failed to push back child dfo"); + } + } + } + if (OB_SUCC(ret)) { + cur_que_front++; + } + } + } + return ret; +} diff --git a/src/sql/optimizer/ob_px_resource_analyzer.h b/src/sql/optimizer/ob_px_resource_analyzer.h index 044ec1d8d6..a43f21908c 100644 --- a/src/sql/optimizer/ob_px_resource_analyzer.h +++ b/src/sql/optimizer/ob_px_resource_analyzer.h @@ -43,7 +43,8 @@ struct DfoInfo { status_(DfoStatus::INIT), dop_(0), location_addr_(), - force_bushy_(false) + force_bushy_(false), + root_op_(nullptr) {} DfoInfo *parent_; DfoInfo *depend_sibling_; @@ -52,6 +53,7 @@ struct DfoInfo { int64_t dop_; ObHashSet location_addr_; bool force_bushy_; + ObLogicalOperator *root_op_; void reset() { @@ -61,7 +63,11 @@ struct DfoInfo { child_dfos_.reset(); location_addr_.destroy(); } - bool force_bushy() { return force_bushy_; } + + inline void set_root_op(ObLogicalOperator *root_op) { root_op_ = root_op;} + inline ObLogicalOperator *get_root_op() { return root_op_;} + inline void set_force_bushy(bool flag) { force_bushy_ = flag; } + inline bool force_bushy() { return force_bushy_; } bool has_sibling() const { return nullptr != depend_sibling_; } void set_depend_sibling(DfoInfo *sibling) { depend_sibling_ = sibling; } inline bool has_child() const { return child_dfos_.count() > 0; } @@ -96,12 +102,30 @@ struct DfoInfo { TO_STRING_KV(K_(status), K_(dop)); }; +struct LogRuntimeFilterDependencyInfo +{ +public: + LogRuntimeFilterDependencyInfo() : rf_create_ops_() {} + ~LogRuntimeFilterDependencyInfo() = default; + void destroy() + { + rf_create_ops_.reset(); + } + inline bool is_empty() const { + return rf_create_ops_.empty(); + } + int describe_dependency(DfoInfo *root_dfo); +public: + ObTMArray rf_create_ops_; +}; + class ObLogExchange; struct PxInfo { PxInfo() : root_op_(nullptr), root_dfo_(nullptr), threads_(0), - acc_threads_(0) {} + acc_threads_(0), rf_dpd_info_() {} PxInfo(ObLogExchange *root_op, DfoInfo *root_dfo) - : root_op_(root_op), root_dfo_(root_dfo), threads_(0), acc_threads_(0) {} + : root_op_(root_op), root_dfo_(root_dfo), threads_(0), acc_threads_(0), rf_dpd_info_() + {} void reset_dfo() { if (OB_NOT_NULL(root_dfo_)) { @@ -113,6 +137,7 @@ struct PxInfo { DfoInfo *root_dfo_; int64_t threads_; // 记录当前 PX 需要的线程组数 int64_t acc_threads_; // 记录当前 PX 计划以及它下面的嵌套 PX 计划线程组数之和 + LogRuntimeFilterDependencyInfo rf_dpd_info_; TO_STRING_KV(K_(threads), K_(acc_threads)); }; @@ -159,6 +184,7 @@ private: ObHashMap &max_parallel_thread_map, ObHashMap &max_parallel_group_map); int create_dfo(DfoInfo *&dfo, int64_t dop); + int create_dfo(DfoInfo *&dfo, ObLogicalOperator &root_op); int get_dfo_addr_set(const ObLogicalOperator &root_op, ObHashSet &addr_set); int px_tree_append(ObHashMap &max_parallel_count, ObHashMap ¶llel_count); @@ -280,6 +306,15 @@ int DfoTreeNormalizer::normalize(T &root) return ret; } +class LogLowestCommonAncestorFinder +{ +public: + // for optimizer + static int find_op_common_ancestor( + const ObLogicalOperator *left, const ObLogicalOperator *right, const ObLogicalOperator *&ancestor); + static int get_op_dfo(const ObLogicalOperator *op, DfoInfo *root_dfo, DfoInfo *&op_dfo); +}; + }/* ns sql */ }/* ns oceanbase */