diff --git a/src/sql/engine/expr/ob_expr_join_filter.cpp b/src/sql/engine/expr/ob_expr_join_filter.cpp index 3634df7866..87dcea1f41 100644 --- a/src/sql/engine/expr/ob_expr_join_filter.cpp +++ b/src/sql/engine/expr/ob_expr_join_filter.cpp @@ -55,6 +55,7 @@ ObExprJoinFilter::ObExprJoinFilterContext::~ObExprJoinFilterContext() } hash_funcs_.reset(); cmp_funcs_.reset(); + cur_row_.reset(); } void ObExprJoinFilter::ObExprJoinFilterContext::reset_monitor_info() @@ -227,6 +228,51 @@ void ObExprJoinFilter::check_need_dynamic_diable_bf( } } +void ObExprJoinFilter::collect_sample_info_batch( + ObExprJoinFilter::ObExprJoinFilterContext &join_filter_ctx, + int64_t filter_count, int64_t total_count) +{ + if (!join_filter_ctx.dynamic_disable()) { + join_filter_ctx.partial_filter_count_ += filter_count; + join_filter_ctx.partial_total_count_ += total_count; + } + check_need_dynamic_diable_bf_batch(join_filter_ctx); +} + +void ObExprJoinFilter::check_need_dynamic_diable_bf_batch( + ObExprJoinFilter::ObExprJoinFilterContext &join_filter_ctx) +{ + if (join_filter_ctx.cur_pos_ >= join_filter_ctx.next_check_start_pos_ + && join_filter_ctx.need_reset_sample_info_) { + join_filter_ctx.partial_total_count_ = 0; + join_filter_ctx.partial_filter_count_ = 0; + join_filter_ctx.need_reset_sample_info_ = false; + if (join_filter_ctx.dynamic_disable()) { + join_filter_ctx.dynamic_disable_ = false; + } + } else if (join_filter_ctx.cur_pos_ >= + join_filter_ctx.next_check_start_pos_ + join_filter_ctx.window_size_) { + if (join_filter_ctx.partial_total_count_ - + join_filter_ctx.partial_filter_count_ < + join_filter_ctx.partial_filter_count_) { + // partial_filter_count_ / partial_total_count_ > 0.5 + // The optimizer choose the bloom filter when the filter threshold is larger than 0.6 + // 0.5 is a acceptable value + // if enabled, the slide window not needs to expand + join_filter_ctx.window_cnt_ = 0; + join_filter_ctx.next_check_start_pos_ = join_filter_ctx.cur_pos_; + } else { + join_filter_ctx.window_cnt_++; + join_filter_ctx.next_check_start_pos_ = join_filter_ctx.cur_pos_ + + (join_filter_ctx.window_size_ * join_filter_ctx.window_cnt_); + join_filter_ctx.dynamic_disable_ = true; + } + join_filter_ctx.partial_total_count_ = 0; + join_filter_ctx.partial_filter_count_ = 0; + join_filter_ctx.need_reset_sample_info_ = true; + } +} + int ObExprJoinFilter::eval_bloom_filter(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &res) { return eval_filter_internal(expr, ctx, res); diff --git a/src/sql/engine/expr/ob_expr_join_filter.h b/src/sql/engine/expr/ob_expr_join_filter.h index 09857b73a1..48ae9f78aa 100644 --- a/src/sql/engine/expr/ob_expr_join_filter.h +++ b/src/sql/engine/expr/ob_expr_join_filter.h @@ -40,8 +40,10 @@ public: n_times_(0), ready_ts_(0), next_check_start_pos_(0), window_cnt_(0), window_size_(0), partial_filter_count_(0), partial_total_count_(0), - cur_pos_(total_count_), flag_(0) + cur_pos_(total_count_), need_reset_sample_info_(false), flag_(0), + cur_row_() { + cur_row_.set_attr(ObMemAttr(MTL_ID(), "RfCurRow")); need_wait_rf_ = true; is_first_ = true; } @@ -70,6 +72,7 @@ public: int64_t partial_filter_count_; int64_t partial_total_count_; int64_t &cur_pos_; + bool need_reset_sample_info_; // use for check_need_dynamic_diable_bf_batch union { uint64_t flag_; struct { @@ -81,6 +84,7 @@ public: int32_t reserved_:28; }; }; + ObTMArray cur_row_; }; ObExprJoinFilter(); explicit ObExprJoinFilter(common::ObIAllocator& alloc); @@ -118,6 +122,9 @@ public: static void collect_sample_info( ObExprJoinFilter::ObExprJoinFilterContext *join_filter_ctx, bool is_match); + static void collect_sample_info_batch( + ObExprJoinFilter::ObExprJoinFilterContext &join_filter_ctx, + int64_t filter_count, int64_t total_count); private: static int check_rf_ready( ObExecContext &exec_ctx, @@ -125,6 +132,8 @@ private: static void check_need_dynamic_diable_bf( ObExprJoinFilter::ObExprJoinFilterContext *join_filter_ctx); + static void check_need_dynamic_diable_bf_batch( + ObExprJoinFilter::ObExprJoinFilterContext &join_filter_ctx); private: static const int64_t CHECK_TIMES = 127; private: diff --git a/src/sql/engine/join/ob_join_filter_op.cpp b/src/sql/engine/join/ob_join_filter_op.cpp index 175b2ade71..ad9106ed61 100644 --- a/src/sql/engine/join/ob_join_filter_op.cpp +++ b/src/sql/engine/join/ob_join_filter_op.cpp @@ -440,7 +440,9 @@ int ObJoinFilterOp::mark_not_need_send_bf_msg() } else { // only the msg is a shared and shuffled BLOOM_FILTER_MSG // let other worker threads stop trying send join_filter - shared_bf_msg->need_send_msg_ = false; + if (!*shared_bf_msg->create_finish_) { + shared_bf_msg->need_send_msg_ = false; + } } } } @@ -448,12 +450,17 @@ int ObJoinFilterOp::mark_not_need_send_bf_msg() return ret; } -// need add mark_not_need_send_bf_msg for shared shuffled bloom filter +// for create mode, need add mark_not_need_send_bf_msg for shared shuffled bloom filter +// for use mode, update_plan_monitor_info cause get_next_batch may not get iter end int ObJoinFilterOp::inner_drain_exch() { int ret = OB_SUCCESS; - if (MY_SPEC.is_create_mode()) { + if (row_reach_end_ || batch_reach_end_) { + // already iter end, not need to mark not send or update_plan_monitor_info again (already done in get_next_row/batch) + } else if (MY_SPEC.is_create_mode()) { ret = mark_not_need_send_bf_msg(); + } else if (MY_SPEC.is_use_mode()) { + ret = update_plan_monitor_info(); } return ret; } @@ -764,6 +771,8 @@ int ObJoinFilterOp::update_plan_monitor_info() op_monitor_info_.otherstat_5_value_ = MY_SPEC.filter_id_; op_monitor_info_.otherstat_6_id_ = ObSqlMonitorStatIds::JOIN_FILTER_LENGTH; op_monitor_info_.otherstat_6_value_ = MY_SPEC.filter_len_; + int64_t check_count = 0; + int64_t total_count = 0; for (int i = 0; i < MY_SPEC.rf_infos_.count() && OB_SUCC(ret); ++i) { if (OB_INVALID_ID != MY_SPEC.rf_infos_.at(i).filter_expr_id_) { ObExprJoinFilter::ObExprJoinFilterContext *join_filter_ctx = NULL; @@ -772,12 +781,16 @@ int ObJoinFilterOp::update_plan_monitor_info() LOG_TRACE("join filter expr ctx is null"); } else { op_monitor_info_.otherstat_1_value_ += join_filter_ctx->filter_count_; - op_monitor_info_.otherstat_2_value_ += join_filter_ctx->total_count_; - op_monitor_info_.otherstat_3_value_ += join_filter_ctx->check_count_; + total_count = max(total_count, join_filter_ctx->total_count_); + check_count = max(check_count, join_filter_ctx->check_count_); op_monitor_info_.otherstat_4_value_ = max(join_filter_ctx->ready_ts_, op_monitor_info_.otherstat_4_value_); } } } + if (OB_SUCC(ret)) { + op_monitor_info_.otherstat_2_value_ += total_count; + op_monitor_info_.otherstat_3_value_ += check_count; + } return ret; } @@ -854,6 +867,8 @@ int ObJoinFilterOp::open_join_filter_use() LOG_WARN("failed to assign hash_func"); } else if (OB_FAIL(join_filter_ctx->cmp_funcs_.assign(MY_SPEC.cmp_funcs_))) { LOG_WARN("failed to assign cmp_funcs_"); + } else if (OB_FAIL(join_filter_ctx->cur_row_.reserve(MY_SPEC.cmp_funcs_.count()))) { + LOG_WARN("failed to reserve cur_row_"); } } } else { diff --git a/src/sql/engine/px/ob_dfo.h b/src/sql/engine/px/ob_dfo.h index 02cfb50079..7f91d796e4 100644 --- a/src/sql/engine/px/ob_dfo.h +++ b/src/sql/engine/px/ob_dfo.h @@ -498,7 +498,8 @@ public: p2p_dh_loc_(nullptr), need_p2p_info_(false), p2p_dh_map_info_(), - coord_info_ptr_(nullptr) + coord_info_ptr_(nullptr), + force_bushy_(false) { } diff --git a/src/sql/engine/px/ob_dfo_mgr.cpp b/src/sql/engine/px/ob_dfo_mgr.cpp index bcbfd91291..f67d7c3638 100644 --- a/src/sql/engine/px/ob_dfo_mgr.cpp +++ b/src/sql/engine/px/ob_dfo_mgr.cpp @@ -419,6 +419,9 @@ int ObDfoMgr::init(ObExecContext &exec_ctx, } else if (OB_ISNULL(root_dfo_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("NULL dfo unexpected", K(ret)); + } else if (!px_coord_info.rf_dpd_info_.is_empty() + && OB_FAIL(px_coord_info.rf_dpd_info_.describe_dependency(root_dfo_))) { + LOG_WARN("failed to describe rf dependency"); } else if (OB_FAIL(ObDfoSchedOrderGenerator::generate_sched_order(*this))) { LOG_WARN("fail init dfo mgr", K(ret)); } else if (OB_FAIL(ObDfoSchedDepthGenerator::generate_sched_depth(exec_ctx, *this))) { @@ -521,10 +524,12 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx, } } else if (IS_PX_JOIN_FILTER(phy_op->get_type()) && NULL != parent_dfo) { const ObJoinFilterSpec *filter_spec = static_cast(phy_op); - if (filter_spec->is_create_mode() && !parent_dfo->force_bushy()) { - parent_dfo->set_force_bushy(true); + if (filter_spec->is_create_mode() && OB_FAIL(px_coord_info.rf_dpd_info_.rf_create_ops_.push_back(phy_op))) { + LOG_WARN("failed to push back create op"); + } else if (filter_spec->is_use_mode() && OB_FAIL(px_coord_info.rf_dpd_info_.rf_use_ops_.push_back(phy_op))) { + LOG_WARN("failed to push back use op"); } - if(filter_spec->is_shared_join_filter() && filter_spec->is_shuffle_) { + if (OB_SUCC(ret) && filter_spec->is_shared_join_filter() && filter_spec->is_shuffle_) { ObP2PDfoMapNode node; node.target_dfo_id_ = parent_dfo->get_dfo_id(); for (int i = 0; i < filter_spec->rf_infos_.count() && OB_SUCC(ret); ++i) { diff --git a/src/sql/engine/px/ob_granule_iterator_op.cpp b/src/sql/engine/px/ob_granule_iterator_op.cpp index b7cdefe634..55ce4bd180 100644 --- a/src/sql/engine/px/ob_granule_iterator_op.cpp +++ b/src/sql/engine/px/ob_granule_iterator_op.cpp @@ -39,6 +39,7 @@ ObGIOpInput::ObGIOpInput(ObExecContext &ctx, const ObOpSpec &spec) worker_id_(common::OB_INVALID_INDEX), pump_(nullptr), px_sequence_id_(OB_INVALID_ID), + rf_max_wait_time_(0), deserialize_allocator_(nullptr) {} @@ -176,6 +177,7 @@ ObGranuleIteratorOp::ObGranuleIteratorOp(ObExecContext &exec_ctx, const ObOpSpec total_count_(0), rf_msg_(NULL), rf_key_(), + rf_start_wait_time_(0), tablet2part_id_map_(), real_child_(NULL), is_parallel_runtime_filtered_(false) @@ -645,7 +647,9 @@ int ObGranuleIteratorOp::do_get_next_granule_task(bool &partition_pruning) LOG_WARN("fail to fetch rescan pw task infos", K(ret)); } } else if (OB_FAIL(fetch_normal_pw_task_infos(*op_ids_pointer, gi_prepare_map, gi_task_infos))) { - LOG_WARN("fail to fetch normal pw task infos", K(ret)); + if (OB_ITER_END != ret) { + LOG_WARN("fail to fetch normal pw task infos", K(ret)); + } } } @@ -663,13 +667,12 @@ int ObGranuleIteratorOp::do_get_next_granule_task(bool &partition_pruning) return ret; } - -int ObGranuleIteratorOp::do_join_filter_partition_pruning( - int64_t tablet_id, - bool &partition_pruning) -{ +int ObGranuleIteratorOp::wait_runtime_ready(bool &partition_pruning) { int ret = OB_SUCCESS; - bool is_match = false; + if (0 == rf_start_wait_time_) { + rf_start_wait_time_ = ObTimeUtility::current_time(); + } + ObGIOpInput *gi_input = static_cast(input_); while (OB_SUCC(ret) && (OB_ISNULL(rf_msg_) || !rf_msg_->check_ready())) { if (OB_ISNULL(rf_msg_) && OB_FAIL(PX_P2P_DH.atomic_get_msg(rf_key_, rf_msg_))) { if (OB_HASH_NOT_EXIST == ret) { @@ -686,13 +689,34 @@ int ObGranuleIteratorOp::do_join_filter_partition_pruning( if (OB_SUCC(ret)) { if (OB_ISNULL(rf_msg_) || !rf_msg_->check_ready()) { if (MY_SPEC.bf_info_.is_shuffle_) { + // For shuffled msg, no waiting time is required. partition_pruning = false; break; + } else { + // For local messages, the maximum waiting time does not exceed rf_max_wait_time. + int64_t cur_time = ObTimeUtility::current_time(); + if (cur_time - rf_start_wait_time_ > gi_input->get_rf_max_wait_time() * 1000) { + partition_pruning = false; + break; + } else { + ob_usleep(100); + } } } } } + return ret; +} +int ObGranuleIteratorOp::do_join_filter_partition_pruning( + int64_t tablet_id, + bool &partition_pruning) +{ + int ret = OB_SUCCESS; + bool is_match = false; + if (OB_FAIL(wait_runtime_ready(partition_pruning))) { + LOG_WARN("failed to wait wait_runtime_ready"); + } if (OB_SUCC(ret) && OB_NOT_NULL(rf_msg_) && rf_msg_->check_ready()) { uint64_t hash_val = ObExprJoinFilter::JOIN_FILTER_SEED; if (MY_SPEC.bf_info_.skip_subpart_) { diff --git a/src/sql/engine/px/ob_granule_iterator_op.h b/src/sql/engine/px/ob_granule_iterator_op.h index 109e7c4b44..82a580f26a 100644 --- a/src/sql/engine/px/ob_granule_iterator_op.h +++ b/src/sql/engine/px/ob_granule_iterator_op.h @@ -59,6 +59,8 @@ public: int64_t get_px_sequence_id() { return px_sequence_id_; } void set_px_sequence_id(int64_t id) { px_sequence_id_ = id; } int add_table_location_keys(common::ObIArray &tscs); + int64_t get_rf_max_wait_time() { return rf_max_wait_time_; } + void set_rf_max_wait_time(int64_t rf_max_wait_time) { rf_max_wait_time_ = rf_max_wait_time; } private: int deep_copy_range(ObIAllocator *allocator, const ObNewRange &src, ObNewRange &dst); public: @@ -72,6 +74,7 @@ public: //for partition pruning common::ObSEArray table_location_keys_; int64_t px_sequence_id_; + int64_t rf_max_wait_time_; private: common::ObIAllocator *deserialize_allocator_; }; @@ -211,6 +214,7 @@ private: bool enable_single_runtime_filter_pruning(); int do_single_runtime_filter_pruning(const ObGranuleTaskInfo &gi_task_info, bool &partition_pruning); int do_parallel_runtime_filter_pruning(); + int wait_runtime_ready(bool &partition_pruning); int do_join_filter_partition_pruning(int64_t tablet_id, bool &partition_pruning); int try_build_tablet2part_id_map(); //---end---- @@ -241,6 +245,7 @@ private: int64_t total_count_; // total partition count or block count processed, rescan included ObP2PDatahubMsgBase *rf_msg_; ObP2PDhKey rf_key_; + int64_t rf_start_wait_time_; ObPxTablet2PartIdMap tablet2part_id_map_; ObOperator *real_child_; bool is_parallel_runtime_filtered_; diff --git a/src/sql/engine/px/ob_px_scheduler.cpp b/src/sql/engine/px/ob_px_scheduler.cpp index 0849fc3c48..dd6a16bca7 100644 --- a/src/sql/engine/px/ob_px_scheduler.cpp +++ b/src/sql/engine/px/ob_px_scheduler.cpp @@ -40,6 +40,7 @@ #include "sql/engine/px/ob_px_sqc_proxy.h" #include "storage/tx/ob_trans_service.h" #include "share/detect/ob_detect_manager_utils.h" +#include "sql/engine/join/ob_join_filter_op.h" namespace oceanbase { @@ -716,6 +717,42 @@ int ObPxTerminateMsgProc::startup_msg_loop(ObExecContext &ctx) return ret; } +int RuntimeFilterDependencyInfo::describe_dependency(ObDfo *root_dfo) +{ + int ret = OB_SUCCESS; + // for each rf create op, find its pair rf use op, + // then get the lowest common ancestor of them, mark force_bushy of the dfo which the ancestor belongs to. + for (int64_t i = 0; i < rf_create_ops_.count() && OB_SUCC(ret); ++i) { + const ObJoinFilterSpec *create_op = static_cast(rf_create_ops_.at(i)); + for (int64_t j = 0; j < rf_use_ops_.count() && OB_SUCC(ret); ++j) { + const ObJoinFilterSpec *use_op = static_cast(rf_use_ops_.at(j)); + if (create_op->get_filter_id() == use_op->get_filter_id()) { + const ObOpSpec *ancestor_op = nullptr; + ObDfo *op_dfo = nullptr;; + if (OB_FAIL(LowestCommonAncestorFinder::find_op_common_ancestor(create_op, use_op, ancestor_op))) { + LOG_WARN("failed to find op common ancestor"); + } else if (OB_ISNULL(ancestor_op)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("op common ancestor not found"); + } else if (OB_FAIL(LowestCommonAncestorFinder::get_op_dfo(ancestor_op, root_dfo, op_dfo))) { + LOG_WARN("failed to find op common ancestor"); + } else if (OB_ISNULL(op_dfo)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("the dfo of ancestor_op not found"); + } else { + // Once the DFO which the ancestor belongs to has set the flag "force_bushy", + // the DfoTreeNormalizer will not attempt to transform a right-deep DFO tree + // into a left-deep DFO tree. Consequently, the "join filter create" operator + // can be scheduled earlier than the "join filter use" operator. + op_dfo->set_force_bushy(true); + } + break; + } + } + } + return ret; +} + int ObPxCoordInfo::init() { int ret = OB_SUCCESS; diff --git a/src/sql/engine/px/ob_px_scheduler.h b/src/sql/engine/px/ob_px_scheduler.h index 858a485e4e..3ca8f91e67 100644 --- a/src/sql/engine/px/ob_px_scheduler.h +++ b/src/sql/engine/px/ob_px_scheduler.h @@ -55,6 +55,26 @@ enum class TableAccessType { HAS_USER_TABLE }; +// for runtime filter, jf create op must scheduled earlier than jf use op +struct RuntimeFilterDependencyInfo +{ +public: + RuntimeFilterDependencyInfo() : rf_create_ops_(), rf_use_ops_() {} + ~RuntimeFilterDependencyInfo() = default; + void destroy() + { + rf_create_ops_.reset(); + rf_use_ops_.reset(); + } + inline bool is_empty() const { + return rf_create_ops_.empty() && rf_use_ops_.empty(); + } + int describe_dependency(ObDfo *root_dfo); +public: + ObTMArray rf_create_ops_; + ObTMArray rf_use_ops_; +}; + struct ObP2PDfoMapNode { ObP2PDfoMapNode() : target_dfo_id_(OB_INVALID_ID), addrs_() {} @@ -103,7 +123,8 @@ public: table_access_type_(TableAccessType::NO_TABLE), qc_detectable_id_(), p2p_dfo_map_(), - p2p_temp_table_info_() + p2p_temp_table_info_(), + rf_dpd_info_() {} virtual ~ObPxCoordInfo() {} virtual void destroy() @@ -112,6 +133,7 @@ public: piece_msg_ctx_mgr_.reset(); p2p_dfo_map_.destroy(); p2p_temp_table_info_.reset(); + rf_dpd_info_.destroy(); } void reset_for_rescan() { @@ -153,6 +175,7 @@ public: // key = p2p_dh_id value = dfo_id + target_addrs hash::ObHashMap p2p_dfo_map_; ObTempTableP2PInfo p2p_temp_table_info_; + RuntimeFilterDependencyInfo rf_dpd_info_; }; class ObDfoSchedulerBasic; diff --git a/src/sql/engine/px/ob_px_task_process.cpp b/src/sql/engine/px/ob_px_task_process.cpp index 57bf108dab..225fa0b083 100644 --- a/src/sql/engine/px/ob_px_task_process.cpp +++ b/src/sql/engine/px/ob_px_task_process.cpp @@ -613,6 +613,9 @@ int ObPxTaskProcess::OpPreparation::apply(ObExecContext &ctx, } else { input->set_worker_id(task_id_); input->set_px_sequence_id(task_->px_int_id_.px_interrupt_id_.first_); + if (OB_NOT_NULL(ctx.get_my_session())) { + input->set_rf_max_wait_time(ctx.get_my_session()->get_runtime_filter_wait_time_ms()); + } if (ObGranuleUtil::pwj_gi(gi->gi_attri_flag_)) { pw_gi_spec_ = gi; on_set_tscs_ = true; diff --git a/src/sql/engine/px/ob_px_util.cpp b/src/sql/engine/px/ob_px_util.cpp index a0249f245a..ea562128c9 100644 --- a/src/sql/engine/px/ob_px_util.cpp +++ b/src/sql/engine/px/ob_px_util.cpp @@ -3835,4 +3835,80 @@ bool ObPxCheckAlive::is_in_blacklist(const common::ObAddr &addr, int64_t server_ LOG_WARN("server in blacklist", K(addr), K(server_start_time), K(alive_data.start_service_time_)); } return in_blacklist; -} \ No newline at end of file +} + +int LowestCommonAncestorFinder::find_op_common_ancestor( + const ObOpSpec *left, const ObOpSpec *right, const ObOpSpec *&ancestor) +{ + int ret = OB_SUCCESS; + ObSEArray ancestors; + + const ObOpSpec *parent = left; + while (OB_NOT_NULL(parent) && OB_SUCC(ret)) { + if (OB_FAIL(ancestors.push_back(parent))) { + LOG_WARN("failed to push back"); + } else { + parent = parent->get_parent(); + } + } + + parent = right; + bool find = false; + while (OB_NOT_NULL(parent) && OB_SUCC(ret) && !find) { + for (int64_t i = 0; i < ancestors.count() && OB_SUCC(ret); ++i) { + if (parent == ancestors.at(i)) { + find = true; + ancestor = parent; + break; + } + } + parent = parent->get_parent(); + } + return ret; +} + +int LowestCommonAncestorFinder::get_op_dfo(const ObOpSpec *op, ObDfo *root_dfo, ObDfo *&op_dfo) +{ + int ret = OB_SUCCESS; + const ObOpSpec *parent = op; + const ObOpSpec *dfo_root_op = nullptr; + while (OB_NOT_NULL(parent) && OB_SUCC(ret)) { + if (IS_PX_COORD(parent->type_) || IS_PX_TRANSMIT(parent->type_)) { + dfo_root_op = parent; + break; + } else { + parent = parent->get_parent(); + } + } + ObDfo *dfo = nullptr; + bool find = false; + + ObSEArray dfo_queue; + int64_t cur_que_front = 0; + if (OB_FAIL(dfo_queue.push_back(root_dfo))) { + LOG_WARN("failed to push back"); + } + + while (cur_que_front < dfo_queue.count() && !find && OB_SUCC(ret)) { + int64_t cur_que_size = dfo_queue.count() - cur_que_front; + for (int64_t i = 0; i < cur_que_size && OB_SUCC(ret); ++i) { + dfo = dfo_queue.at(cur_que_front); + if (dfo->get_root_op_spec() == dfo_root_op) { + op_dfo = dfo; + find = true; + break; + } else { + // push child into the queue + for (int64_t child_idx = 0; OB_SUCC(ret) && child_idx < dfo->get_child_count(); ++child_idx) { + if (OB_FAIL(dfo_queue.push_back(dfo->get_child_dfos().at(child_idx)))) { + LOG_WARN("failed to push back child dfo"); + } + } + } + if (OB_SUCC(ret)) { + cur_que_front++; + } + } + } + return ret; +} diff --git a/src/sql/engine/px/ob_px_util.h b/src/sql/engine/px/ob_px_util.h index 6c4c53da7c..027d1fc8a4 100644 --- a/src/sql/engine/px/ob_px_util.h +++ b/src/sql/engine/px/ob_px_util.h @@ -665,6 +665,14 @@ static int get_location_addrs(const T &locations, return ret; } +class LowestCommonAncestorFinder +{ +public: + static int find_op_common_ancestor( + const ObOpSpec *left, const ObOpSpec *right, const ObOpSpec *&ancestor); + static int get_op_dfo(const ObOpSpec *op, ObDfo *root_dfo, ObDfo *&op_dfo); +}; + } } diff --git a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp index 047b1f3bfe..0c4a50d0dc 100644 --- a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp +++ b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.cpp @@ -188,18 +188,6 @@ int ObP2PDatahubManager::P2PMsgMergeCall::operator() (common::hash::HashMapPair< return ret; } -int ObP2PDatahubManager::P2PRegenerateCall::operator() (common::hash::HashMapPair &entry) -{ - int ret = OB_SUCCESS; - if (OB_FAIL(entry.second->regenerate())) { - LOG_WARN("fail to do regenerate", K(ret)); - } - ret_ = ret; - return ret; -} - - int ObP2PDatahubManager::send_local_msg(ObP2PDatahubMsgBase *msg) { int ret = OB_SUCCESS; @@ -212,7 +200,7 @@ int ObP2PDatahubManager::send_local_msg(ObP2PDatahubMsgBase *msg) msg->get_task_id(), ObTimeUtility::current_time(), msg->get_timeout_ts()); if (OB_FAIL(map_.set_refactored(dh_key, msg))) { - LOG_TRACE("fail to insert p2p dh msg", K(ret)); + LOG_TRACE("fail to insert p2p dh msg", K(ret), K(dh_key)); } else { msg->set_is_ready(true); } @@ -266,7 +254,10 @@ int ObP2PDatahubManager::erase_msg_if(ObP2PDhKey &dh_key, } PX_P2P_DH.free_msg(msg); } else { - ret = OB_ERR_UNEXPECTED; + // If erase failed, means other threads still referencing the msg. + // If the caller is an RPC thread, the clearing task will be delegated to DM; + // If the caller is DM, the retry policy is utilized to ensure that the message is deleted. + ret = OB_EAGAIN; LOG_WARN("failed to erase msg, other threads still referencing it", K(dh_key), K(need_unreg_dm)); } return ret; @@ -368,7 +359,7 @@ void ObP2PDatahubManager::P2PMsgGetCall::operator() (common::hash::HashMapPairinc_ref_count(); } else { - int ret = OB_UNEXPECT_INTERNAL_ERROR; + int ret = OB_ERR_UNEXPECTED; LOG_WARN("dh_msg_ is null", K(ret)); } } @@ -383,7 +374,7 @@ bool ObP2PDatahubManager::P2PMsgEraseIfCall::operator() (common::hash::HashMapPa need_erase = true; } } else { - int ret = OB_UNEXPECT_INTERNAL_ERROR; + int ret = OB_ERR_UNEXPECTED; LOG_WARN("dh_msg_ is null", K(ret)); } return need_erase; diff --git a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.h b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.h index 4820c5347f..7e560d414a 100644 --- a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.h +++ b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.h @@ -36,14 +36,7 @@ public: ObP2PDatahubMsgBase &dh_msg_; bool need_free_; }; - struct P2PRegenerateCall - { - P2PRegenerateCall(ObP2PDatahubMsgBase &db_msg) : ret_(OB_SUCCESS), dh_msg_(db_msg) {}; - ~P2PRegenerateCall() = default; - int operator() (common::hash::HashMapPair &entry); - int ret_; - ObP2PDatahubMsgBase &dh_msg_; - }; + struct P2PMsgGetCall { P2PMsgGetCall(ObP2PDatahubMsgBase *&db_msg) : dh_msg_(db_msg), ret_(OB_SUCCESS) {}; diff --git a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.cpp b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.cpp index cb89b51536..bbed676d26 100644 --- a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.cpp +++ b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.cpp @@ -93,8 +93,8 @@ int ObP2PDatahubMsgBase::process_receive_count(ObP2PDatahubMsgBase &msg) int ret = OB_SUCCESS; CK(msg.get_msg_receive_expect_cnt() > 0 && msg_receive_expect_cnt_ > 0); if (OB_SUCC(ret)) { - ATOMIC_AAF(&msg_receive_cur_cnt_, msg.get_msg_receive_cur_cnt()); - if (msg_receive_cur_cnt_ > msg_receive_expect_cnt_) { + int64_t cur_cnt = ATOMIC_AAF(&msg_receive_cur_cnt_, msg.get_msg_receive_cur_cnt()); + if (cur_cnt > msg_receive_expect_cnt_) { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected receive count", K(ret)); } @@ -105,10 +105,8 @@ int ObP2PDatahubMsgBase::process_receive_count(ObP2PDatahubMsgBase &msg) void ObP2PDatahubMsgBase::check_finish_receive() { - if (is_active_) { - if (msg_receive_cur_cnt_ == msg_receive_expect_cnt_) { - is_ready_ = true; - } + if (msg_receive_expect_cnt_ == ATOMIC_LOAD(&msg_receive_cur_cnt_)) { + is_ready_ = true; } } diff --git a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.h b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.h index bbb109ec31..77d42e4f95 100644 --- a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.h +++ b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.h @@ -86,7 +86,7 @@ public: virtual int process_msg_internal(bool &need_free); virtual int reuse() { return OB_SUCCESS; } virtual int regenerate() { return OB_SUCCESS; } - void check_finish_receive(); + virtual void check_finish_receive(); bool check_ready() const { return is_ready_; } ObP2PDatahubMsgType get_msg_type() const { return msg_type_; } void set_msg_type(ObP2PDatahubMsgType type) { msg_type_ = type; } @@ -137,7 +137,7 @@ protected: common::ObArenaAllocator allocator_; int64_t msg_receive_cur_cnt_; int64_t msg_receive_expect_cnt_; - bool is_active_; + bool is_active_; //only for ObRFInFilterMsg, when NDV>1024, set is_active_ = false; bool is_ready_; bool is_empty_; int64_t ref_count_; diff --git a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp index c3eeab42d5..9282ffa44c 100644 --- a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp +++ b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp @@ -207,7 +207,6 @@ int ObRFBloomFilterMsg::process_msg_internal(bool &need_free) int ret = OB_SUCCESS; ObP2PDhKey dh_key(p2p_datahub_id_, px_sequence_id_, task_id_); ObP2PDatahubManager::P2PMsgSetCall set_call(dh_key, *this); - ObP2PDatahubManager::P2PMsgMergeCall merge_call(*this); ObP2PDatahubManager::MsgMap &map = PX_P2P_DH.get_map(); start_time_ = ObTimeUtility::current_time(); @@ -231,10 +230,19 @@ int ObRFBloomFilterMsg::process_msg_internal(bool &need_free) // merge piece bloom filter if (OB_SUCC(ret) && need_merge) { - if (OB_FAIL(map.read_atomic(dh_key, merge_call))) { - if (OB_HASH_NOT_EXIST != ret) { - LOG_WARN("fail to merge p2p dh msg", K(ret)); - } + // for bloom filter msg, we can merge several msgs concurrently in an atomic manner without holding the map lock. + // thus, we need handle the reference count carefully here to make sure the msg not been destroyed during the merge process. + ObP2PDatahubMsgBase *rf_msg_in_map = nullptr; + ObRFBloomFilterMsg *bf_msg = nullptr; + if (OB_FAIL(PX_P2P_DH.atomic_get_msg(dh_key, rf_msg_in_map))) { // inc ref_count is integrated + LOG_WARN("fail to get msg", K(ret)); + } else if (FALSE_IT(bf_msg = static_cast(rf_msg_in_map))) { + } else if (OB_FAIL(bf_msg->atomic_merge(*this))) { + LOG_WARN("fail to merge p2p dh msg", K(ret)); + } + if (OB_NOT_NULL(rf_msg_in_map)) { + // after merge, dec ref_count + rf_msg_in_map->dec_ref_count(); } } if (OB_SUCC(ret) && !need_merge) { @@ -287,10 +295,11 @@ int ObRFBloomFilterMsg::process_first_phase_recieve_count( int ret = OB_SUCCESS; CK(msg.get_msg_receive_expect_cnt() > 0 && msg_receive_expect_cnt_ > 0); int64_t begin_idx = msg.bloom_filter_.get_begin_idx(); - ATOMIC_INC(&msg_receive_cur_cnt_); - if (msg_receive_cur_cnt_ > msg_receive_expect_cnt_) { + // msg_receive_cur_cnt_ is msg total cnt, msg_receive_expect_cnt_ equals to sqc_count * peice_count + int64_t received_cnt = ATOMIC_AAF(&msg_receive_cur_cnt_, 1); + if (received_cnt > msg_receive_expect_cnt_) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("fail to process recieve count", K(ret), K(msg_receive_cur_cnt_), + LOG_WARN("fail to process receive count", K(ret), K(received_cnt), K(msg_receive_expect_cnt_)); } else if (receive_count_array_.empty()) { ret = OB_ERR_UNEXPECTED; @@ -299,6 +308,7 @@ int ObRFBloomFilterMsg::process_first_phase_recieve_count( bool find = false; for (int i = 0; OB_SUCC(ret) && i < receive_count_array_.count(); ++i) { if (begin_idx == receive_count_array_.at(i).begin_idx_) { + // receive count of a specific peice msg, expect_first_phase_count_ equals to sqc count int64_t cur_count = ATOMIC_AAF(&receive_count_array_.at(i).reciv_count_, 1); first_phase_end = (cur_count == expect_first_phase_count_); find = true; @@ -445,6 +455,20 @@ int ObRFBloomFilterMsg::regenerate() return ret; } +int ObRFBloomFilterMsg::atomic_merge(ObP2PDatahubMsgBase &other_msg) +{ + int ret = OB_SUCCESS; + if (!other_msg.is_empty() && (OB_FAIL(merge(other_msg)))) { + LOG_WARN("fail to merge dh msg", K(ret)); + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(process_receive_count(other_msg))) { + LOG_WARN("fail to process receive count", K(ret)); + } + return ret; +} + +// the merge process of bloom_filter_ is atomic by using CAS int ObRFBloomFilterMsg::merge(ObP2PDatahubMsgBase &msg) { int ret = OB_SUCCESS; @@ -524,19 +548,24 @@ int ObRFBloomFilterMsg::might_contain_batch( ObBitVector &eval_flags = expr.get_evaluated_flags(ctx); uint64_t *hash_values = reinterpret_cast( ctx.frames_[expr.frame_idx_] + expr.res_buf_off_); + int64_t total_count = 0; + int64_t filter_count = 0; if (OB_UNLIKELY(is_empty_)) { if (OB_FAIL(ObBitVector::flip_foreach(skip, batch_size, [&](int64_t idx) __attribute__((always_inline)) { - int ret = OB_SUCCESS; - eval_flags.set(idx); results[idx].set_int(0); - ++filter_ctx.filter_count_; - ++filter_ctx.check_count_; - ++filter_ctx.total_count_; - return ret; + ++filter_count; + ++total_count; + return OB_SUCCESS; }))) { LOG_WARN("fail to do for each operation", K(ret)); } + if (OB_SUCC(ret)) { + eval_flags.set_all(true); + filter_ctx.filter_count_ += filter_count; + filter_ctx.check_count_ += total_count; + filter_ctx.total_count_ += total_count; + } } else { for (int64_t i = 0; OB_SUCC(ret) && i < expr.arg_cnt_; ++i) { ObExpr *e = expr.args_[i]; @@ -559,18 +588,21 @@ int ObRFBloomFilterMsg::might_contain_batch( }))) { } else if (OB_FAIL(ObBitVector::flip_foreach(skip, batch_size, [&](int64_t idx) __attribute__((always_inline)) { - ret = bloom_filter_.might_contain(hash_values[idx], is_match); - if (OB_SUCC(ret)) { - filter_ctx.filter_count_ += !is_match; - eval_flags.set(idx); + int tmp_ret = bloom_filter_.might_contain(hash_values[idx], is_match); + if (OB_SUCCESS == tmp_ret) { + filter_count += !is_match; + ++total_count; results[idx].set_int(is_match); - ObExprJoinFilter::collect_sample_info(&filter_ctx, is_match); - ++filter_ctx.check_count_; - ++filter_ctx.total_count_; } - return ret; + return tmp_ret; }))) { LOG_WARN("failed to process prefetch block", K(ret)); + } else { + eval_flags.set_all(true); + filter_ctx.filter_count_ += filter_count; + filter_ctx.check_count_ += total_count; + filter_ctx.total_count_ += total_count; + ObExprJoinFilter::collect_sample_info_batch(filter_ctx, filter_count, total_count); } } return ret; @@ -1163,6 +1195,61 @@ int ObRFRangeFilterMsg::might_contain(const ObExpr &expr, return ret; } +int ObRFRangeFilterMsg::do_might_contain_batch(const ObExpr &expr, + ObEvalCtx &ctx, + const ObBitVector &skip, + const int64_t batch_size, + ObExprJoinFilter::ObExprJoinFilterContext &filter_ctx) { + int ret = OB_SUCCESS; + int64_t filter_count = 0; + int64_t total_count = 0; + ObDatum *results = expr.locate_batch_datums(ctx); + for (int idx = 0; OB_SUCC(ret) && idx < expr.arg_cnt_; ++idx) { + if (OB_FAIL(expr.args_[idx]->eval_batch(ctx, skip, batch_size))) { + LOG_WARN("eval failed", K(ret)); + } + } + if (OB_SUCC(ret)) { + int cmp_min = 0; + int cmp_max = 0; + ObDatum *datum = nullptr; + bool is_match = true; + for (int64_t batch_i = 0; OB_SUCC(ret) && batch_i < batch_size; ++batch_i) { + if (skip.at(batch_i)) { + continue; + } + cmp_min = 0; + cmp_max = 0; + is_match = true; + total_count++; + for (int arg_i = 0; OB_SUCC(ret) && arg_i < expr.arg_cnt_; ++arg_i) { + datum = &expr.args_[arg_i]->locate_expr_datum(ctx, batch_i); + if (OB_FAIL(filter_ctx.cmp_funcs_.at(arg_i).cmp_func_(*datum, lower_bounds_.at(arg_i), cmp_min))) { + LOG_WARN("fail to compare value", K(ret)); + } else if (cmp_min < 0) { + filter_count++; + is_match = false; + break; + } else if (OB_FAIL(filter_ctx.cmp_funcs_.at(arg_i).cmp_func_(*datum, upper_bounds_.at(arg_i), cmp_max))) { + LOG_WARN("fail to compare value", K(ret)); + } else if (cmp_max > 0) { + filter_count++; + is_match = false; + break; + } + } + results[batch_i].set_int(is_match ? 1 : 0); + } + } + if (OB_SUCC(ret)) { + filter_ctx.filter_count_ += filter_count; + filter_ctx.total_count_ += total_count; + filter_ctx.check_count_ += total_count; + ObExprJoinFilter::collect_sample_info_batch(filter_ctx, filter_count, total_count); + } + return ret; +} + int ObRFRangeFilterMsg::might_contain_batch( const ObExpr &expr, ObEvalCtx &ctx, @@ -1175,25 +1262,15 @@ int ObRFRangeFilterMsg::might_contain_batch( ObBitVector &eval_flags = expr.get_evaluated_flags(ctx); ObEvalCtx::BatchInfoScopeGuard batch_info_guard(ctx); batch_info_guard.set_batch_size(batch_size); - for (int64_t i = 0; OB_SUCC(ret) && i < batch_size; ++i) { - if (skip.at(i)) { - continue; - } else if (OB_UNLIKELY(is_empty_)) { + if (OB_UNLIKELY(is_empty_)) { + for (int64_t i = 0; i < batch_size; i++) { results[i].set_int(0); - eval_flags.set(i); - ++filter_ctx.filter_count_; - ++filter_ctx.check_count_; - ++filter_ctx.total_count_; - } else { - batch_info_guard.set_batch_idx(i); - ObDatum &result = results[i]; - if (OB_FAIL(might_contain(expr, ctx, filter_ctx, result))) { - LOG_WARN("fail to check expr value", K(ret)); - } else { - ++filter_ctx.total_count_; - eval_flags.set(i); - } } + } else if (OB_FAIL(do_might_contain_batch(expr, ctx, skip, batch_size, filter_ctx))) { + LOG_WARN("failed to do_might_contain_batch"); + } + if (OB_SUCC(ret)) { + eval_flags.set_all(batch_size); } return ret; } @@ -1454,7 +1531,8 @@ int ObRFInFilterMsg::might_contain(const ObExpr &expr, ObDatum *datum = nullptr; bool is_match = true; uint64_t hash_val = ObExprJoinFilter::JOIN_FILTER_SEED; - ObSEArray cur_row; + ObIArray &cur_row = filter_ctx.cur_row_; + cur_row.reuse(); if (OB_UNLIKELY(!is_active_)) { res.set_int(1); } else if (OB_UNLIKELY(is_empty_)) { @@ -1512,6 +1590,77 @@ int ObRFInFilterMsg::reuse() return ret; } +void ObRFInFilterMsg::check_finish_receive() +{ + if (ATOMIC_LOAD(&is_active_)) { + if (msg_receive_expect_cnt_ == ATOMIC_LOAD(&msg_receive_cur_cnt_)) { + is_ready_ = true; + } + } +} + +int ObRFInFilterMsg::do_might_contain_batch(const ObExpr &expr, + ObEvalCtx &ctx, + const ObBitVector &skip, + const int64_t batch_size, + ObExprJoinFilter::ObExprJoinFilterContext &filter_ctx) { + int ret = OB_SUCCESS; + int64_t filter_count = 0; + int64_t total_count = 0; + uint64_t *right_hash_vals = reinterpret_cast( + ctx.frames_[expr.frame_idx_] + expr.res_buf_off_); + uint64_t seed = ObExprJoinFilter::JOIN_FILTER_SEED; + for (int idx = 0; OB_SUCC(ret) && idx < expr.arg_cnt_; ++idx) { + if (OB_FAIL(expr.args_[idx]->eval_batch(ctx, skip, batch_size))) { + LOG_WARN("eval failed", K(ret)); + } else { + const bool is_batch_seed = (idx > 0); + ObBatchDatumHashFunc hash_func = filter_ctx.hash_funcs_.at(idx).batch_hash_func_; + hash_func(right_hash_vals, + expr.args_[idx]->locate_batch_datums(ctx), expr.args_[idx]->is_batch_result(), + skip, batch_size, + is_batch_seed ? right_hash_vals : &seed, + is_batch_seed); + } + } + ObIArray &cur_row = filter_ctx.cur_row_; + ObRFInFilterNode node(&filter_ctx.cmp_funcs_, nullptr, &cur_row, 0); + ObDatum *res_datums = expr.locate_batch_datums(ctx); + for (int64_t batch_i = 0; OB_SUCC(ret) && batch_i < batch_size; ++batch_i) { + if (skip.at(batch_i)) { + continue; + } + cur_row.reuse(); + total_count++; + node.hash_val_ = right_hash_vals[batch_i]; + for (int64_t arg_i = 0; OB_SUCC(ret) && arg_i < expr.arg_cnt_; ++arg_i) { + if (OB_FAIL(cur_row.push_back(expr.args_[arg_i]->locate_expr_datum(ctx, batch_i)))) { + LOG_WARN("failed to push back datum", K(ret)); + } + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(rows_set_.exist_refactored(node))) { + if (OB_HASH_NOT_EXIST == ret) { + res_datums[batch_i].set_int(0); + filter_count++; + ret = OB_SUCCESS; + } else if (OB_HASH_EXIST == ret) { + res_datums[batch_i].set_int(1); + ret = OB_SUCCESS; + } else { + LOG_WARN("fail to check node", K(ret)); + } + } + } + if (OB_SUCC(ret)) { + filter_ctx.filter_count_ += filter_count; + filter_ctx.total_count_ += total_count; + filter_ctx.check_count_ += total_count; + ObExprJoinFilter::collect_sample_info_batch(filter_ctx, filter_count, total_count); + } + return ret; +} + int ObRFInFilterMsg::might_contain_batch( const ObExpr &expr, ObEvalCtx &ctx, @@ -1524,28 +1673,19 @@ int ObRFInFilterMsg::might_contain_batch( ObBitVector &eval_flags = expr.get_evaluated_flags(ctx); ObEvalCtx::BatchInfoScopeGuard batch_info_guard(ctx); batch_info_guard.set_batch_size(batch_size); - for (int64_t i = 0; OB_SUCC(ret) && i < batch_size; ++i) { - if (skip.at(i)) { - continue; - } else if (OB_UNLIKELY(!is_active_)) { - eval_flags.set(i); + if (!is_active_) { + for (int64_t i = 0; i < batch_size; i++) { results[i].set_int(1); - } else if (OB_UNLIKELY(is_empty_)) { - eval_flags.set(i); - results[i].set_int(0); - ++filter_ctx.filter_count_; - ++filter_ctx.check_count_; - ++filter_ctx.total_count_; - } else { - batch_info_guard.set_batch_idx(i); - ObDatum *result = &results[i]; - if (OB_FAIL(might_contain(expr, ctx, filter_ctx, *result))) { - LOG_WARN("fail to check expr value", K(ret)); - } else { - ++filter_ctx.total_count_; - eval_flags.set(i); - } } + } else if (OB_UNLIKELY(is_empty_)) { + for (int64_t i = 0; i < batch_size; i++) { + results[i].set_int(0); + } + } else if (OB_FAIL(do_might_contain_batch(expr, ctx, skip, batch_size, filter_ctx))) { + LOG_WARN("failed to do_might_contain_batch"); + } + if (OB_SUCC(ret)) { + eval_flags.set_all(batch_size); } return ret; } diff --git a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.h b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.h index 69d4e6d1ef..fd37f1f575 100644 --- a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.h +++ b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.h @@ -79,6 +79,7 @@ public: ObRFBloomFilterMsg &msg, bool &first_phase_end); virtual int process_msg_internal(bool &need_free); virtual int regenerate() override; + int atomic_merge(ObP2PDatahubMsgBase &other_msg); private: int calc_hash_value( const common::ObIArray &expr_array, @@ -162,6 +163,13 @@ private: int get_min(ObCmpFunc &func, ObDatum &l, ObDatum &r, int64_t &cell_size); int get_max(ObCmpFunc &func, ObDatum &l, ObDatum &r, int64_t &cell_size); int dynamic_copy_cell(const ObDatum &src, ObDatum &target, int64_t &cell_size); + // only used in might_contain_batch, + // without adding filter_count, total_count, check_count in filter_ctx + int do_might_contain_batch(const ObExpr &expr, + ObEvalCtx &ctx, + const ObBitVector &skip, + const int64_t batch_size, + ObExprJoinFilter::ObExprJoinFilterContext &filter_ctx); public: ObFixedArray lower_bounds_; @@ -221,9 +229,17 @@ public: ObEvalCtx &eval_ctx, uint64_t *batch_hash_values) override; virtual int reuse() override; + void check_finish_receive() override final; private: int append_row(); int insert_node(); + // only used in might_contain_batch, + // without adding filter_count, total_count, check_count in filter_ctx + int do_might_contain_batch(const ObExpr &expr, + ObEvalCtx &ctx, + const ObBitVector &skip, + const int64_t batch_size, + ObExprJoinFilter::ObExprJoinFilterContext &filter_ctx); public: hash::ObHashSet rows_set_; ObCmpFuncs cmp_funcs_; diff --git a/src/sql/optimizer/ob_px_resource_analyzer.h b/src/sql/optimizer/ob_px_resource_analyzer.h index 94119f07fb..044ec1d8d6 100644 --- a/src/sql/optimizer/ob_px_resource_analyzer.h +++ b/src/sql/optimizer/ob_px_resource_analyzer.h @@ -206,7 +206,7 @@ int DfoTreeNormalizer::normalize(T &root) int ret = OB_SUCCESS; int64_t non_leaf_cnt = 0; int64_t non_leaf_pos = -1; - bool need_force_bushy = false; + bool need_force_bushy = root.force_bushy(); ARRAY_FOREACH_X(root.child_dfos_, idx, cnt, OB_SUCC(ret)) { T *dfo = root.child_dfos_.at(idx); if (0 < dfo->get_child_count()) { @@ -215,9 +215,6 @@ int DfoTreeNormalizer::normalize(T &root) non_leaf_pos = idx; } } - if (dfo->force_bushy() && !need_force_bushy) { - need_force_bushy = true; - } } if (non_leaf_cnt > 1 || need_force_bushy) { // UPDATE: