diff --git a/src/sql/code_generator/ob_static_engine_cg.cpp b/src/sql/code_generator/ob_static_engine_cg.cpp index 137249989a..106fb314ae 100644 --- a/src/sql/code_generator/ob_static_engine_cg.cpp +++ b/src/sql/code_generator/ob_static_engine_cg.cpp @@ -3374,12 +3374,10 @@ int ObStaticEngineCG::generate_spec(ObLogJoinFilter &op, ObJoinFilterSpec &spec, } bool enable_extract_query_range = true; -#ifdef ERRSIM int tmp_ret = OB_E(EventTable::EN_PX_DISABLE_RUNTIME_FILTER_EXTRACT_QUERY_RANGE) OB_SUCCESS; if (OB_SUCCESS != tmp_ret) { enable_extract_query_range = false; } -#endif if (OB_FAIL(ret)) { } else if (enable_extract_query_range && OB_FAIL(spec.px_query_range_info_.init( @@ -3387,6 +3385,8 @@ int ObStaticEngineCG::generate_spec(ObLogJoinFilter &op, ObJoinFilterSpec &spec, op.get_rf_prefix_col_idxs(), prefix_col_obj_metas))) { LOG_WARN("failed to init px_query_range_info_ in join filter spec"); } + LOG_TRACE("cg runtime filter extract query range", K(enable_extract_query_range), + K(op.get_op_id()), K(op.get_rf_prefix_col_idxs())); } } return ret; diff --git a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.cpp b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.cpp index b1d65a366f..1f60548dc3 100644 --- a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.cpp +++ b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.cpp @@ -17,6 +17,7 @@ #include "sql/engine/px/p2p_datahub/ob_p2p_dh_rpc_process.h" #include "sql/engine/px/p2p_datahub/ob_p2p_dh_mgr.h" #include "share/detect/ob_detect_manager_utils.h" +#include "sql/engine/px/p2p_datahub/ob_runtime_filter_query_range.h" using namespace oceanbase; using namespace common; using namespace sql; @@ -199,6 +200,39 @@ int ObP2PDatahubMsgBase::preset_not_match(IntegerFixedVec *res_vec, const EvalBo return ret; } +int ObP2PDatahubMsgBase::fill_empty_query_range(const ObPxQueryRangeInfo &query_range_info, + common::ObIAllocator &allocator, ObNewRange &query_range) +{ + int ret = OB_SUCCESS; + query_range.table_id_ = query_range_info.table_id_; + + ObObj *start = NULL; + ObObj *end = NULL; + int64_t range_column_cnt = query_range_info.range_column_cnt_; + if (OB_ISNULL(start = static_cast( + allocator.alloc(sizeof(ObObj) * range_column_cnt)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("alloc memory for start_obj failed", K(ret)); + } else if (OB_ISNULL(end = static_cast( + allocator.alloc(sizeof(ObObj) * range_column_cnt)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("alloc memory for end_obj failed", K(ret)); + } else { + // fill all coloumns with (max, min) + for (int64_t i = 0; i < range_column_cnt; ++i) { + new (start + i) ObObj(); + new (end + i) ObObj(); + (start + i)->set_max_value(); + (end + i)->set_min_value(); + } + ObRowkey start_key(start, range_column_cnt); + ObRowkey end_key(end, range_column_cnt); + query_range.start_key_ = start_key; + query_range.end_key_ = end_key; + } + return ret; +} + ObP2PDatahubMsgGuard::ObP2PDatahubMsgGuard(ObP2PDatahubMsgBase *msg) : msg_(msg) { // one for dh map hold msg and one for we use msg to reg dm diff --git a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.h b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.h index 21a0887246..5955806d56 100644 --- a/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.h +++ b/src/sql/engine/px/p2p_datahub/ob_p2p_dh_msg.h @@ -26,6 +26,7 @@ namespace sql { class ObBatchRows; +class ObPxQueryRangeInfo; class ObP2PDatahubMsgBase { OB_UNIS_VERSION_V(1); @@ -180,6 +181,10 @@ public: int64_t &total_count, int64_t &filter_count); int preset_not_match(IntegerFixedVec *res_vec, const EvalBound &bound); TO_STRING_KV(K(p2p_datahub_id_), K_(px_sequence_id), K(tenant_id_), K(timeout_ts_), K(is_active_), K(msg_type_)); +protected: + int fill_empty_query_range(const ObPxQueryRangeInfo &query_range_info, + common::ObIAllocator &allocator, ObNewRange &query_range); + protected: common::ObCurTraceId::TraceId trace_id_; int64_t p2p_datahub_id_; diff --git a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp index ae351ebda3..3fa2ae3a03 100644 --- a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp +++ b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp @@ -1289,9 +1289,11 @@ int ObRFRangeFilterMsg::prepare_query_range() is_query_range_ready_ = false; } else if (is_empty_) { // make empty range - query_range_.table_id_ = query_range_info_.table_id_; - query_range_.set_false_range(); - is_query_range_ready_ = true; + if (OB_FAIL(fill_empty_query_range(query_range_info_, query_range_allocator_, query_range_))) { + LOG_WARN("faild to fill_empty_query_range"); + } else { + is_query_range_ready_ = true; + } } else { // only extract the first column int64_t prefix_col_idx = query_range_info_.prefix_col_idxs_.at(0); @@ -1309,11 +1311,11 @@ int ObRFRangeFilterMsg::prepare_query_range() if (OB_ISNULL(start = static_cast( query_range_allocator_.alloc(sizeof(ObObj) * range_column_cnt)))) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_ERROR("alloc memory for start_obj failed", K(ret)); + LOG_WARN("alloc memory for start_obj failed", K(ret)); } else if (OB_ISNULL(end = static_cast( query_range_allocator_.alloc(sizeof(ObObj) * range_column_cnt)))) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_ERROR("alloc memory for end_obj failed", K(ret)); + LOG_WARN("alloc memory for end_obj failed", K(ret)); } else { new(start) ObObj(); new(end) ObObj(); @@ -1336,7 +1338,7 @@ int ObRFRangeFilterMsg::prepare_query_range() is_query_range_ready_ = true; } } - LOG_TRACE("range filter prepare query range", K(ret), K(is_query_range_ready_), K(query_range_)); + LOG_TRACE("range filter prepare query range", K(ret), K(is_query_range_ready_), K(query_range_), K(query_range_info_), K(is_empty_)); return ret; } @@ -2129,9 +2131,9 @@ int ObRFInFilterMsg::prepare_query_ranges() } else if (is_empty_) { // make empty range ObNewRange query_range; - query_range.table_id_ = query_range_info_.table_id_; - query_range.set_false_range(); - if (OB_FAIL(query_range_.push_back(query_range))) { + if (OB_FAIL(fill_empty_query_range(query_range_info_, query_range_allocator_, query_range))) { + LOG_WARN("faild to fill_empty_query_range"); + } else if (OB_FAIL(query_range_.push_back(query_range))) { LOG_WARN("failed to push back query_range"); } else { is_query_range_ready_ = true; @@ -2149,7 +2151,7 @@ int ObRFInFilterMsg::prepare_query_ranges() // we need to deduplicate to avoid duplicate range ret = process_query_ranges_with_deduplicate(); } - LOG_TRACE("in filter prepare query range", K(ret), K(is_query_range_ready_), K(query_range_)); + LOG_TRACE("in filter prepare query range", K(ret), K(is_query_range_ready_), K(query_range_), K(query_range_info_), K(is_empty_)); return ret; } @@ -2305,11 +2307,11 @@ int ObRFInFilterMsg::generate_one_range(int row_idx) if (OB_ISNULL(start = static_cast( query_range_allocator_.alloc(sizeof(ObObj) * range_column_cnt)))) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_ERROR("alloc memory for start_obj failed", K(ret)); + LOG_WARN("alloc memory for start_obj failed", K(ret)); } else if (OB_ISNULL(end = static_cast( query_range_allocator_.alloc(sizeof(ObObj) * range_column_cnt)))) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_ERROR("alloc memory for end_obj failed", K(ret)); + LOG_WARN("alloc memory for end_obj failed", K(ret)); } for (int64_t j = 0; j < prefix_col_idxs.count() && OB_SUCC(ret); ++j) { int64_t col_idx = prefix_col_idxs.at(j); diff --git a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_vec_msg.cpp b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_vec_msg.cpp index 2c8a452376..d8b76ecbe2 100644 --- a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_vec_msg.cpp +++ b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_vec_msg.cpp @@ -190,7 +190,6 @@ OB_DEF_SERIALIZE(ObRFInFilterVecMsg) OB_DEF_DESERIALIZE(ObRFInFilterVecMsg) { int ret = OB_SUCCESS; - int64_t row_cnt = row_store_.get_row_cnt(); BASE_DESER((ObRFInFilterVecMsg, ObP2PDatahubMsgBase)); OB_UNIS_DECODE(max_in_num_); OB_UNIS_DECODE(need_null_cmp_flags_); @@ -199,6 +198,7 @@ OB_DEF_DESERIALIZE(ObRFInFilterVecMsg) OB_UNIS_DECODE(build_row_meta_); if (OB_SUCC(ret) && is_active_) { OB_UNIS_DECODE(row_store_); + int64_t row_cnt = row_store_.get_row_cnt(); int64_t buckets_cnt = max(row_cnt, 1); if (OB_FAIL(rows_set_.create(buckets_cnt * 2, "RFDEInFilter", @@ -829,9 +829,11 @@ int ObRFRangeFilterVecMsg::prepare_query_range() is_query_range_ready_ = false; } else if (is_empty_) { // make empty range - query_range_.table_id_ = query_range_info_.table_id_; - query_range_.set_false_range(); - is_query_range_ready_ = true; + if (OB_FAIL(fill_empty_query_range(query_range_info_, query_range_allocator_, query_range_))) { + LOG_WARN("faild to fill_empty_query_range"); + } else { + is_query_range_ready_ = true; + } } else { // only extract the first column int64_t prefix_col_idx = query_range_info_.prefix_col_idxs_.at(0); @@ -849,11 +851,11 @@ int ObRFRangeFilterVecMsg::prepare_query_range() if (OB_ISNULL(start = static_cast( query_range_allocator_.alloc(sizeof(ObObj) * range_column_cnt)))) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_ERROR("alloc memory for start_obj failed", K(ret)); + LOG_WARN("alloc memory for start_obj failed", K(ret)); } else if (OB_ISNULL(end = static_cast( query_range_allocator_.alloc(sizeof(ObObj) * range_column_cnt)))) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_ERROR("alloc memory for end_obj failed", K(ret)); + LOG_WARN("alloc memory for end_obj failed", K(ret)); } else { new(start) ObObj(); new(end) ObObj(); @@ -1726,9 +1728,9 @@ int ObRFInFilterVecMsg::prepare_query_ranges() } else if (is_empty_) { // make empty range ObNewRange query_range; - query_range.table_id_ = query_range_info_.table_id_; - query_range.set_false_range(); - if (OB_FAIL(query_range_.push_back(query_range))) { + if (OB_FAIL(fill_empty_query_range(query_range_info_, query_range_allocator_, query_range))) { + LOG_WARN("faild to fill_empty_query_range"); + } else if (OB_FAIL(query_range_.push_back(query_range))) { LOG_WARN("failed to push back query_range"); } else { is_query_range_ready_ = true; @@ -1948,11 +1950,11 @@ int ObRFInFilterVecMsg::generate_one_range(int row_idx) if (OB_ISNULL(start = static_cast( query_range_allocator_.alloc(sizeof(ObObj) * range_column_cnt)))) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_ERROR("alloc memory for start_obj failed", K(ret)); + LOG_WARN("alloc memory for start_obj failed", K(ret)); } else if (OB_ISNULL(end = static_cast( query_range_allocator_.alloc(sizeof(ObObj) * range_column_cnt)))) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_ERROR("alloc memory for end_obj failed", K(ret)); + LOG_WARN("alloc memory for end_obj failed", K(ret)); } for (int64_t j = 0; j < prefix_col_idxs.count() && OB_SUCC(ret); ++j) { int64_t col_idx = prefix_col_idxs.at(j); diff --git a/src/sql/optimizer/ob_logical_operator.cpp b/src/sql/optimizer/ob_logical_operator.cpp index d8cf982f43..48d067fa9c 100644 --- a/src/sql/optimizer/ob_logical_operator.cpp +++ b/src/sql/optimizer/ob_logical_operator.cpp @@ -4950,6 +4950,7 @@ int ObLogicalOperator::try_prepare_rf_query_range_info( const JoinFilterInfo &info) { int ret = OB_SUCCESS; + int64_t op_id = -1; bool can_extract_query_range = false; ObSEArray prefix_col_idxs; @@ -4969,6 +4970,7 @@ int ObLogicalOperator::try_prepare_rf_query_range_info( LOG_WARN("scan_node type dismatch", K(ret), K(scan_node->get_type())); } else { ObLogTableScan *table_scan = static_cast(scan_node); + op_id = table_scan->get_op_id(); ObLogJoinFilter *join_filter_create = static_cast(join_filter_create_op); int64_t range_column_cnt = table_scan->get_range_columns().count(); join_filter_create->set_probe_table_id(info.ref_table_id_); @@ -5001,7 +5003,8 @@ int ObLogicalOperator::try_prepare_rf_query_range_info( } } } - LOG_TRACE("check runtime filter can extract query range", K(ret), K(can_extract_query_range), K(prefix_col_idxs)); + LOG_TRACE("check runtime filter can extract query range", K(ret), K(op_id), + K(can_extract_query_range), K(prefix_col_idxs)); return ret; }