From b8cc51f8a16592ad3611df65cdf6ffbc27f4491c Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 16 Jun 2023 09:12:16 +0000 Subject: [PATCH] Fix runtime range filter correct problem. --- .../code_generator/ob_static_engine_cg.cpp | 66 ++++++--- src/sql/engine/expr/ob_expr_join_filter.cpp | 23 +-- src/sql/engine/expr/ob_expr_join_filter.h | 12 +- src/sql/engine/join/ob_join_filter_op.cpp | 25 ++-- src/sql/engine/join/ob_join_filter_op.h | 3 +- .../px/p2p_datahub/ob_runtime_filter_msg.cpp | 137 ++++++------------ .../px/p2p_datahub/ob_runtime_filter_msg.h | 7 +- src/sql/optimizer/ob_log_join_filter.h | 6 + src/sql/optimizer/ob_logical_operator.cpp | 34 ++++- 9 files changed, 146 insertions(+), 167 deletions(-) diff --git a/src/sql/code_generator/ob_static_engine_cg.cpp b/src/sql/code_generator/ob_static_engine_cg.cpp index 83970abe8..bbae91ed2 100644 --- a/src/sql/code_generator/ob_static_engine_cg.cpp +++ b/src/sql/code_generator/ob_static_engine_cg.cpp @@ -2508,9 +2508,7 @@ int ObStaticEngineCG::generate_spec(ObLogJoinFilter &op, ObJoinFilterSpec &spec, LOG_WARN("fail to generate calc part id expr", K(ret), KP(op.get_tablet_id_expr())); } else if (OB_FAIL(spec.hash_funcs_.init(op.get_join_exprs().count()))) { LOG_WARN("failed to init join keys", K(ret)); - } else if (OB_FAIL(spec.null_first_cmp_funcs_.init(op.get_join_exprs().count()))) { - LOG_WARN("failed to init cmp funcs", K(ret)); - } else if (OB_FAIL(spec.null_last_cmp_funcs_.init(op.get_join_exprs().count()))) { + } else if (OB_FAIL(spec.cmp_funcs_.init(op.get_join_exprs().count()))) { LOG_WARN("failed to init cmp funcs", K(ret)); } else if (OB_FAIL(generate_rt_exprs(op.get_join_exprs(), spec.join_keys_))) { LOG_WARN("failed to generate rt exprs", K(ret)); @@ -2524,25 +2522,49 @@ int ObStaticEngineCG::generate_spec(ObLogJoinFilter &op, ObJoinFilterSpec &spec, LOG_WARN("failed to push back hash func", K(ret)); } } else { - for (int64_t i = 0; i < spec.join_keys_.count() && OB_SUCC(ret); ++i) { - ObExpr *join_expr = spec.join_keys_.at(i); - ObHashFunc hash_func; - ObCmpFunc null_first_cmp; - ObCmpFunc null_last_cmp; - null_first_cmp.cmp_func_ = join_expr->basic_funcs_->null_first_cmp_; - null_last_cmp.cmp_func_ = join_expr->basic_funcs_->null_last_cmp_; - set_murmur_hash_func(hash_func, join_expr->basic_funcs_); - if (OB_ISNULL(hash_func.hash_func_) || OB_ISNULL(hash_func.batch_hash_func_) || - OB_ISNULL(null_first_cmp.cmp_func_) || - OB_ISNULL(null_last_cmp.cmp_func_ )) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("hash func is null, check datatype is valid", K(ret)); - } else if (OB_FAIL(spec.hash_funcs_.push_back(hash_func))) { - LOG_WARN("failed to push back hash func", K(ret)); - } else if (OB_FAIL(spec.null_first_cmp_funcs_.push_back(null_first_cmp))) { - LOG_WARN("failed to push back null first cmp func", K(ret)); - } else if (OB_FAIL(spec.null_last_cmp_funcs_.push_back(null_last_cmp))) { - LOG_WARN("failed to push back null last cmp func", K(ret)); + // for create filter op, the compare funcs are only used for comparing left join key + // the compare funcs will be stored in rf msg finally + if (op.is_create_filter()) { + for (int64_t i = 0; i < spec.join_keys_.count() && OB_SUCC(ret); ++i) { + ObExpr *join_expr = spec.join_keys_.at(i); + ObHashFunc hash_func; + ObCmpFunc null_first_cmp; + ObCmpFunc null_last_cmp; + null_first_cmp.cmp_func_ = join_expr->basic_funcs_->null_first_cmp_; + null_last_cmp.cmp_func_ = join_expr->basic_funcs_->null_last_cmp_; + set_murmur_hash_func(hash_func, join_expr->basic_funcs_); + if (OB_ISNULL(hash_func.hash_func_) || OB_ISNULL(hash_func.batch_hash_func_) || + OB_ISNULL(null_first_cmp.cmp_func_) || + OB_ISNULL(null_last_cmp.cmp_func_ )) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("hash func or cmp func is null, check datatype is valid", K(ret)); + } else if (OB_FAIL(spec.hash_funcs_.push_back(hash_func))) { + LOG_WARN("failed to push back hash func", K(ret)); + } else if (lib::is_mysql_mode() && OB_FAIL(spec.cmp_funcs_.push_back(null_first_cmp))) { + LOG_WARN("failed to push back null first cmp func", K(ret)); + } else if (lib::is_oracle_mode() && OB_FAIL(spec.cmp_funcs_.push_back(null_last_cmp))) { + LOG_WARN("failed to push back null last cmp func", K(ret)); + } + } + } else { + // for use filter op, the compare funcs are used to compare left and right + // the compare funcs will be stored in ObExprJoinFilterContext finally + const common::ObIArray &join_filter_cmp_funcs = op.get_join_filter_cmp_funcs(); + for (int64_t i = 0; i < spec.join_keys_.count() && OB_SUCC(ret); ++i) { + ObExpr *join_expr = spec.join_keys_.at(i); + ObHashFunc hash_func; + ObCmpFunc cmp_func; + cmp_func.cmp_func_ = join_filter_cmp_funcs.at(i); + set_murmur_hash_func(hash_func, join_expr->basic_funcs_); + if (OB_ISNULL(hash_func.hash_func_) || OB_ISNULL(hash_func.batch_hash_func_) || + OB_ISNULL(cmp_func.cmp_func_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("hash func or cmp func is null, check datatype is valid", K(ret)); + } else if (OB_FAIL(spec.hash_funcs_.push_back(hash_func))) { + LOG_WARN("failed to push back hash func", K(ret)); + } else if (OB_FAIL(spec.cmp_funcs_.push_back(cmp_func))) { + LOG_WARN("failed to push back cmp func", K(ret)); + } } } } diff --git a/src/sql/engine/expr/ob_expr_join_filter.cpp b/src/sql/engine/expr/ob_expr_join_filter.cpp index a6b088bbb..3634df786 100644 --- a/src/sql/engine/expr/ob_expr_join_filter.cpp +++ b/src/sql/engine/expr/ob_expr_join_filter.cpp @@ -53,6 +53,8 @@ ObExprJoinFilter::ObExprJoinFilterContext::~ObExprJoinFilterContext() // do not destroy it, because other worker threads may not start yet rf_msg_->dec_ref_count(); } + hash_funcs_.reset(); + cmp_funcs_.reset(); } void ObExprJoinFilter::ObExprJoinFilterContext::reset_monitor_info() @@ -121,27 +123,6 @@ int ObExprJoinFilter::cg_expr(ObExprCGCtx &expr_cg_ctx, const ObRawExpr &raw_exp } } - rt_expr.inner_func_cnt_ = rt_expr.arg_cnt_ * FUNCTION_CNT; - - if (0 == rt_expr.inner_func_cnt_) { - // do nothing - } else if (OB_FAIL(ret)) { - } else if (OB_ISNULL(rt_expr.inner_functions_ = reinterpret_cast(expr_cg_ctx.allocator_-> - alloc(sizeof(ObExpr::EvalFunc) * rt_expr.arg_cnt_ * FUNCTION_CNT)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("alloc memory for inner_functions_ failed", K(ret)); - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < rt_expr.arg_cnt_; ++i) { - rt_expr.inner_functions_[GET_FUNC(i, HASH_ROW)] = - reinterpret_cast(rt_expr.args_[i]->basic_funcs_->murmur_hash_v2_); - rt_expr.inner_functions_[GET_FUNC(i, HASH_BATCH)] = - reinterpret_cast(rt_expr.args_[i]->basic_funcs_->murmur_hash_v2_batch_); - rt_expr.inner_functions_[GET_FUNC(i, NULL_FIRST_COMPARE)] = - reinterpret_cast(rt_expr.args_[i]->basic_funcs_->null_first_cmp_); - rt_expr.inner_functions_[GET_FUNC(i, NULL_LAST_COMPARE)] = - reinterpret_cast(rt_expr.args_[i]->basic_funcs_->null_last_cmp_); - } - } return ret; } diff --git a/src/sql/engine/expr/ob_expr_join_filter.h b/src/sql/engine/expr/ob_expr_join_filter.h index ca015ef77..09857b73a 100644 --- a/src/sql/engine/expr/ob_expr_join_filter.h +++ b/src/sql/engine/expr/ob_expr_join_filter.h @@ -31,19 +31,11 @@ enum RuntimeFilterType class ObExprJoinFilter : public ObExprOperator { public: - #define FUNCTION_CNT 4 - #define GET_FUNC(i, j) (((i) * (FUNCTION_CNT)) + (j)) - enum FunctionIndex{ - HASH_ROW = 0, - HASH_BATCH = 1, - NULL_FIRST_COMPARE = 2, - NULL_LAST_COMPARE = 3 - }; class ObExprJoinFilterContext : public ObExprOperatorCtx { public: ObExprJoinFilterContext() : ObExprOperatorCtx(), - rf_msg_(nullptr), rf_key_(), start_time_(0), + rf_msg_(nullptr), rf_key_(), hash_funcs_(), cmp_funcs_(), start_time_(0), filter_count_(0), total_count_(0), check_count_(0), n_times_(0), ready_ts_(0), next_check_start_pos_(0), window_cnt_(0), window_size_(0), @@ -62,6 +54,8 @@ public: public: ObP2PDatahubMsgBase *rf_msg_; ObP2PDhKey rf_key_; + ObHashFuncs hash_funcs_; + ObCmpFuncs cmp_funcs_; int64_t start_time_; int64_t filter_count_; int64_t total_count_; diff --git a/src/sql/engine/join/ob_join_filter_op.cpp b/src/sql/engine/join/ob_join_filter_op.cpp index 76c146b68..8e79bce62 100644 --- a/src/sql/engine/join/ob_join_filter_op.cpp +++ b/src/sql/engine/join/ob_join_filter_op.cpp @@ -62,8 +62,7 @@ OB_SERIALIZE_MEMBER((ObJoinFilterSpec, ObOpSpec), filter_len_, join_keys_, hash_funcs_, - null_first_cmp_funcs_, - null_last_cmp_funcs_, + cmp_funcs_, filter_shared_type_, calc_tablet_id_expr_, rf_infos_, @@ -264,9 +263,7 @@ int ObJoinFilterOpInput::construct_msg_details( LOG_WARN("fail to prepare allocate col cnt", K(ret)); } else if (OB_FAIL(range_msg.cells_size_.prepare_allocate(col_cnt))) { LOG_WARN("fail to prepare allocate col cnt", K(ret)); - } else if (OB_FAIL(range_msg.null_first_cmp_funcs_.assign(spec.null_first_cmp_funcs_))) { - LOG_WARN("fail to init cmp funcs", K(ret)); - } else if (OB_FAIL(range_msg.null_last_cmp_funcs_.assign(spec.null_last_cmp_funcs_))) { + } else if (OB_FAIL(range_msg.cmp_funcs_.assign(spec.cmp_funcs_))) { LOG_WARN("fail to init cmp funcs", K(ret)); } else if (OB_FAIL(range_msg.need_null_cmp_flags_.assign(spec.need_null_cmp_flags_))) { LOG_WARN("fail to init cmp flags", K(ret)); @@ -285,7 +282,7 @@ int ObJoinFilterOpInput::construct_msg_details( LOG_WARN("fail to init in hash set", K(ret)); } else if (OB_FAIL(in_msg.cur_row_.prepare_allocate(col_cnt))) { LOG_WARN("fail to prepare allocate col cnt", K(ret)); - } else if (OB_FAIL(in_msg.cmp_funcs_.assign(spec.null_first_cmp_funcs_))) { + } else if (OB_FAIL(in_msg.cmp_funcs_.assign(spec.cmp_funcs_))) { LOG_WARN("fail to init cmp funcs", K(ret)); } else if (OB_FAIL(in_msg.hash_funcs_for_insert_.assign(spec.hash_funcs_))) { LOG_WARN("fail to init cmp funcs", K(ret)); @@ -314,8 +311,7 @@ ObJoinFilterSpec::ObJoinFilterSpec(common::ObIAllocator &alloc, const ObPhyOpera filter_len_(0), join_keys_(alloc), hash_funcs_(alloc), - null_first_cmp_funcs_(alloc), - null_last_cmp_funcs_(alloc), + cmp_funcs_(alloc), filter_shared_type_(JoinFilterSharedType::INVALID_TYPE), calc_tablet_id_expr_(NULL), rf_infos_(alloc), @@ -838,7 +834,7 @@ int ObJoinFilterOp::open_join_filter_use() ObExprJoinFilter::ObExprJoinFilterContext *join_filter_ctx = NULL; if (OB_ISNULL(join_filter_ctx = static_cast( ctx_.get_expr_op_ctx(MY_SPEC.rf_infos_.at(i).filter_expr_id_)))) { - if (OB_FAIL(ctx_.create_expr_op_ctx(MY_SPEC.rf_infos_.at(i).filter_expr_id_,join_filter_ctx))) { + if (OB_FAIL(ctx_.create_expr_op_ctx(MY_SPEC.rf_infos_.at(i).filter_expr_id_, join_filter_ctx))) { LOG_WARN("failed to create operator ctx", K(ret), K(MY_SPEC.rf_infos_.at(i).filter_expr_id_)); } else { ObP2PDhKey dh_key(MY_SPEC.rf_infos_.at(i).p2p_datahub_id_, px_seq_id, task_id); @@ -846,6 +842,17 @@ int ObJoinFilterOp::open_join_filter_use() int64_t tenant_id = ctx_.get_my_session()->get_effective_tenant_id(); join_filter_ctx->window_size_ = ADAPTIVE_BF_WINDOW_ORG_SIZE; join_filter_ctx->max_wait_time_ms_ = filter_input->config_.runtime_filter_wait_time_ms_; + join_filter_ctx->hash_funcs_.set_allocator(&ctx_.get_allocator()); + join_filter_ctx->cmp_funcs_.set_allocator(&ctx_.get_allocator()); + if (OB_FAIL(join_filter_ctx->hash_funcs_.init(MY_SPEC.hash_funcs_.count()))) { + LOG_WARN("failed to assign hash_func"); + } else if (OB_FAIL(join_filter_ctx->cmp_funcs_.init(MY_SPEC.cmp_funcs_.count()))) { + LOG_WARN("failed to assign cmp_funcs_"); + } else if (OB_FAIL(join_filter_ctx->hash_funcs_.assign(MY_SPEC.hash_funcs_))) { + LOG_WARN("failed to assign hash_func"); + } else if (OB_FAIL(join_filter_ctx->cmp_funcs_.assign(MY_SPEC.cmp_funcs_))) { + LOG_WARN("failed to assign cmp_funcs_"); + } } } else { ret = OB_ERR_UNEXPECTED; diff --git a/src/sql/engine/join/ob_join_filter_op.h b/src/sql/engine/join/ob_join_filter_op.h index 7a1689ba6..38df603ec 100644 --- a/src/sql/engine/join/ob_join_filter_op.h +++ b/src/sql/engine/join/ob_join_filter_op.h @@ -194,8 +194,7 @@ public: int64_t filter_len_; ExprFixedArray join_keys_; common::ObHashFuncs hash_funcs_; - ObCmpFuncs null_first_cmp_funcs_; - ObCmpFuncs null_last_cmp_funcs_; + ObCmpFuncs cmp_funcs_; JoinFilterSharedType filter_shared_type_; ObExpr *calc_tablet_id_expr_; common::ObFixedArray rf_infos_; diff --git a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp index 4164b147e..c6687b4ae 100644 --- a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp +++ b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.cpp @@ -79,8 +79,7 @@ OB_DEF_SERIALIZE(ObRFRangeFilterMsg) upper_bounds_, need_null_cmp_flags_, cells_size_, - null_first_cmp_funcs_, - null_last_cmp_funcs_); + cmp_funcs_); return ret; } @@ -93,8 +92,7 @@ OB_DEF_DESERIALIZE(ObRFRangeFilterMsg) upper_bounds_, need_null_cmp_flags_, cells_size_, - null_first_cmp_funcs_, - null_last_cmp_funcs_); + cmp_funcs_); if (OB_FAIL(adjust_cell_size())) { LOG_WARN("fail do adjust cell size", K(ret)); } @@ -110,8 +108,7 @@ OB_DEF_SERIALIZE_SIZE(ObRFRangeFilterMsg) upper_bounds_, need_null_cmp_flags_, cells_size_, - null_first_cmp_funcs_, - null_last_cmp_funcs_); + cmp_funcs_); return len; } @@ -477,15 +474,9 @@ int ObRFBloomFilterMsg::might_contain(const ObExpr &expr, if (OB_FAIL(expr.args_[i]->eval(ctx, datum))) { LOG_WARN("failed to eval datum", K(ret)); } else { - if (OB_ISNULL(expr.inner_functions_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("expr.inner_functions_ is null", K(ret)); - } else { - hash_func.hash_func_ = reinterpret_cast( - expr.inner_functions_[GET_FUNC(i, ObExprJoinFilter::HASH_ROW)]); - if (OB_FAIL(hash_func.hash_func_(*datum, hash_val, hash_val))) { - LOG_WARN("fail to calc hash val", K(ret)); - } + hash_func.hash_func_ = filter_ctx.hash_funcs_.at(i).hash_func_; + if (OB_FAIL(hash_func.hash_func_(*datum, hash_val, hash_val))) { + LOG_WARN("fail to calc hash val", K(ret)); } } } @@ -540,19 +531,12 @@ int ObRFBloomFilterMsg::might_contain_batch( LOG_WARN("evaluate batch failed", K(ret), K(*e)); } else { const bool is_batch_seed = (i > 0); - if (OB_ISNULL(expr.inner_functions_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("the inner_functions_ of expr is null", K(ret)); - } else { - ObBatchDatumHashFunc hash_func_batch = - reinterpret_cast( - expr.inner_functions_[GET_FUNC(i, ObExprJoinFilter::HASH_BATCH)]); - hash_func_batch(hash_values, - e->locate_batch_datums(ctx), e->is_batch_result(), - skip, batch_size, - is_batch_seed ? hash_values : &seed, - is_batch_seed); - } + ObBatchDatumHashFunc hash_func_batch = filter_ctx.hash_funcs_.at(i).batch_hash_func_; + hash_func_batch(hash_values, + e->locate_batch_datums(ctx), e->is_batch_result(), + skip, batch_size, + is_batch_seed ? hash_values : &seed, + is_batch_seed); } } if (OB_FAIL(ObBitVector::flip_foreach(skip, batch_size, @@ -812,7 +796,7 @@ int ObRFBloomFilterMsg::generate_filter_indexes( ObRFRangeFilterMsg::ObRFRangeFilterMsg() : ObP2PDatahubMsgBase(), lower_bounds_(allocator_), upper_bounds_(allocator_), need_null_cmp_flags_(allocator_), cells_size_(allocator_), - null_first_cmp_funcs_(allocator_), null_last_cmp_funcs_(allocator_) + cmp_funcs_(allocator_) { } @@ -823,11 +807,11 @@ int ObRFRangeFilterMsg::reuse() lower_bounds_.reset(); upper_bounds_.reset(); cells_size_.reset(); - if (OB_FAIL(lower_bounds_.prepare_allocate(null_first_cmp_funcs_.count()))) { + if (OB_FAIL(lower_bounds_.prepare_allocate(cmp_funcs_.count()))) { LOG_WARN("fail to prepare allocate col cnt", K(ret)); - } else if (OB_FAIL(upper_bounds_.prepare_allocate(null_first_cmp_funcs_.count()))) { + } else if (OB_FAIL(upper_bounds_.prepare_allocate(cmp_funcs_.count()))) { LOG_WARN("fail to prepare allocate col cnt", K(ret)); - } else if (OB_FAIL(cells_size_.prepare_allocate(null_first_cmp_funcs_.count()))) { + } else if (OB_FAIL(cells_size_.prepare_allocate(cmp_funcs_.count()))) { LOG_WARN("fail to prepare allocate col cnt", K(ret)); } return ret; @@ -843,9 +827,7 @@ int ObRFRangeFilterMsg::assign(const ObP2PDatahubMsgBase &msg) LOG_WARN("fail to assign lower bounds", K(ret)); } else if (OB_FAIL(upper_bounds_.assign(other_msg.upper_bounds_))) { LOG_WARN("fail to assign upper bounds", K(ret)); - } else if (OB_FAIL(null_first_cmp_funcs_.assign(other_msg.null_first_cmp_funcs_))) { - LOG_WARN("failed to assign cmp funcs", K(ret)); - } else if (OB_FAIL(null_last_cmp_funcs_.assign(other_msg.null_last_cmp_funcs_))) { + } else if (OB_FAIL(cmp_funcs_.assign(other_msg.cmp_funcs_))) { LOG_WARN("failed to assign cmp funcs", K(ret)); } else if (OB_FAIL(need_null_cmp_flags_.assign(other_msg.need_null_cmp_flags_))) { LOG_WARN("failed to assign cmp flags", K(ret)); @@ -910,7 +892,7 @@ int ObRFRangeFilterMsg::get_min(ObIArray &vals) int ret = OB_SUCCESS; for (int i = 0; i < vals.count() && OB_SUCC(ret); ++i) { // null value is also suitable - if (OB_FAIL(get_min(null_first_cmp_funcs_.at(i), lower_bounds_.at(i), + if (OB_FAIL(get_min(cmp_funcs_.at(i), lower_bounds_.at(i), vals.at(i), cells_size_.at(i).min_datum_buf_size_))) { LOG_WARN("fail to compare value", K(ret)); } @@ -922,8 +904,8 @@ int ObRFRangeFilterMsg::get_max(ObIArray &vals) { int ret = OB_SUCCESS; for (int i = 0; i < vals.count() && OB_SUCC(ret); ++i) { - // null value is also suitable - if (OB_FAIL(get_max(null_last_cmp_funcs_.at(i), upper_bounds_.at(i), + // null value is also suitable + if (OB_FAIL(get_max(cmp_funcs_.at(i), upper_bounds_.at(i), vals.at(i), cells_size_.at(i).max_datum_buf_size_))) { LOG_WARN("fail to compare value", K(ret)); } @@ -935,7 +917,9 @@ int ObRFRangeFilterMsg::get_min(ObCmpFunc &func, ObDatum &l, ObDatum &r, int64_t { int ret = OB_SUCCESS; int cmp = 0; - if (is_empty_ || (OB_ISNULL(l.ptr_))) { + // when [null, null] merge [a, b], the expect result in mysql mode is [null, b] + // the lower bound l, with ptr==NULL and null_==true, should not be covered by a + if (is_empty_ || (OB_ISNULL(l.ptr_) && !l.is_null())) { if (OB_FAIL(dynamic_copy_cell(r, l, cell_size))) { LOG_WARN("fail to deep copy datum"); } @@ -1008,30 +992,6 @@ int ObRFRangeFilterMsg::get_max(ObCmpFunc &func, ObDatum &l, ObDatum &r, int64_t return ret; } -int ObRFRangeFilterMsg::might_contain(ObIArray &vals, bool &is_match) -{ - int ret = OB_SUCCESS; - is_match = true; - int cmp_min = 0; - int cmp_max = 0; - for (int i = 0; i < vals.count() && OB_SUCC(ret); ++i) { - cmp_min = 0; - cmp_max = 0; - if (OB_FAIL(null_first_cmp_funcs_.at(i).cmp_func_(lower_bounds_.at(i), vals.at(i), cmp_min))) { - LOG_WARN("fail to compare value", K(ret)); - } else if (cmp_min > 0) { - is_match = false; - break; - } else if (OB_FAIL(null_last_cmp_funcs_.at(i).cmp_func_(upper_bounds_.at(i), vals.at(i), cmp_max))) { - LOG_WARN("fail to compare value", K(ret)); - } else if (cmp_max < 0) { - is_match = false; - break; - } - } - return ret; -} - int ObRFRangeFilterMsg::insert_by_row( const common::ObIArray &expr_array, const common::ObHashFuncs &hash_funcs, @@ -1067,9 +1027,9 @@ int ObRFRangeFilterMsg::insert_by_row( } else if (datum->is_null() && !need_null_cmp_flags_.at(i)) { /*do nothing*/ break; - } else if (OB_FAIL(get_min(null_last_cmp_funcs_.at(i), lower_bounds_.at(i), *datum, cells_size_.at(i).min_datum_buf_size_))) { + } else if (OB_FAIL(get_min(cmp_funcs_.at(i), lower_bounds_.at(i), *datum, cells_size_.at(i).min_datum_buf_size_))) { LOG_WARN("failed to compare value", K(ret)); - } else if (OB_FAIL(get_max(null_first_cmp_funcs_.at(i), upper_bounds_.at(i), *datum, cells_size_.at(i).max_datum_buf_size_))) { + } else if (OB_FAIL(get_max(cmp_funcs_.at(i), upper_bounds_.at(i), *datum, cells_size_.at(i).max_datum_buf_size_))) { LOG_WARN("failed to compare value", K(ret)); } } @@ -1113,8 +1073,7 @@ int ObRFRangeFilterMsg::might_contain(const ObExpr &expr, { int ret = OB_SUCCESS; ObDatum *datum = nullptr; - ObCmpFunc cmp_func_null_last; - ObCmpFunc cmp_func_null_first; + ObCmpFunc cmp_func; int cmp_min = 0; int cmp_max = 0; bool is_match = true; @@ -1127,27 +1086,19 @@ int ObRFRangeFilterMsg::might_contain(const ObExpr &expr, if (OB_FAIL(expr.args_[i]->eval(ctx, datum))) { LOG_WARN("failed to eval datum", K(ret)); } else { - if (OB_ISNULL(expr.inner_functions_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("expr.inner_functions_ is null", K(ret)); - } else { - cmp_min = 0; - cmp_max = 0; - cmp_func_null_first.cmp_func_ = reinterpret_cast( - expr.inner_functions_[GET_FUNC(i, ObExprJoinFilter::NULL_FIRST_COMPARE)]); - cmp_func_null_last.cmp_func_ = reinterpret_cast( - expr.inner_functions_[GET_FUNC(i, ObExprJoinFilter::NULL_LAST_COMPARE)]); - if (OB_FAIL(cmp_func_null_first.cmp_func_(*datum, lower_bounds_.at(i), cmp_min))) { - LOG_WARN("fail to compare value", K(ret)); - } else if (cmp_min < 0) { - is_match = false; - break; - } else if (OB_FAIL(cmp_func_null_last.cmp_func_(*datum, upper_bounds_.at(i), cmp_max))) { - LOG_WARN("fail to compare value", K(ret)); - } else if (cmp_max > 0) { - is_match = false; - break; - } + cmp_min = 0; + cmp_max = 0; + cmp_func.cmp_func_ = filter_ctx.cmp_funcs_.at(i).cmp_func_; + if (OB_FAIL(cmp_func.cmp_func_(*datum, lower_bounds_.at(i), cmp_min))) { + LOG_WARN("fail to compare value", K(ret)); + } else if (cmp_min < 0) { + is_match = false; + break; + } else if (OB_FAIL(cmp_func.cmp_func_(*datum, upper_bounds_.at(i), cmp_max))) { + LOG_WARN("fail to compare value", K(ret)); + } else if (cmp_max > 0) { + is_match = false; + break; } } } @@ -1466,15 +1417,11 @@ int ObRFInFilterMsg::might_contain(const ObExpr &expr, if (OB_FAIL(expr.args_[i]->eval(ctx, datum))) { LOG_WARN("failed to eval datum", K(ret)); } else { - if (OB_ISNULL(expr.inner_functions_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("expr.inner_functions_ is null", K(ret)); - } else if (OB_FAIL(cur_row.push_back(*datum))) { + if (OB_FAIL(cur_row.push_back(*datum))) { LOG_WARN("failed to push back datum", K(ret)); } else { ObHashFunc hash_func; - hash_func.hash_func_ = reinterpret_cast( - expr.inner_functions_[GET_FUNC(i, ObExprJoinFilter::HASH_ROW)]); + hash_func.hash_func_ = filter_ctx.hash_funcs_.at(i).hash_func_; if (OB_FAIL(hash_func.hash_func_(*datum, hash_val, hash_val))) { LOG_WARN("fail to calc hash val", K(ret)); } @@ -1482,7 +1429,7 @@ int ObRFInFilterMsg::might_contain(const ObExpr &expr, } } if (OB_SUCC(ret)) { - ObRFInFilterNode node(&cmp_funcs_, nullptr, &cur_row, hash_val); + ObRFInFilterNode node(&filter_ctx.cmp_funcs_, nullptr, &cur_row, hash_val); if (OB_FAIL(rows_set_.exist_refactored(node))) { if (OB_HASH_NOT_EXIST == ret) { is_match = false; diff --git a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.h b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.h index 623007a34..1aee60d40 100644 --- a/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.h +++ b/src/sql/engine/px/p2p_datahub/ob_runtime_filter_msg.h @@ -126,8 +126,7 @@ public: virtual int destroy() { lower_bounds_.reset(); upper_bounds_.reset(); - null_first_cmp_funcs_.reset(); - null_last_cmp_funcs_.reset(); + cmp_funcs_.reset(); need_null_cmp_flags_.reset(); cells_size_.reset(); allocator_.reset(); @@ -156,7 +155,6 @@ public: ObEvalCtx &eval_ctx, uint64_t *batch_hash_values) override; virtual int reuse() override; - int might_contain(ObIArray &vals, bool &is_match); int adjust_cell_size(); private: int get_min(ObIArray &vals); @@ -170,8 +168,7 @@ public: ObFixedArray upper_bounds_; ObFixedArray need_null_cmp_flags_; ObFixedArray cells_size_; - ObCmpFuncs null_first_cmp_funcs_; - ObCmpFuncs null_last_cmp_funcs_; + ObCmpFuncs cmp_funcs_; }; class ObRFInFilterMsg : public ObP2PDatahubMsgBase diff --git a/src/sql/optimizer/ob_log_join_filter.h b/src/sql/optimizer/ob_log_join_filter.h index 6e6f51a56..36711ff5c 100644 --- a/src/sql/optimizer/ob_log_join_filter.h +++ b/src/sql/optimizer/ob_log_join_filter.h @@ -73,6 +73,10 @@ public: { return join_filter_exprs_; } common::ObIArray &get_join_filter_exprs_for_update() { return join_filter_exprs_; } + int add_join_filter_cmp_funcs(const common::ObDatumCmpFuncType &cmp_fun) + { return join_filter_cmp_funcs_.push_back(cmp_fun);} + const common::ObIArray &get_join_filter_cmp_funcs() + { return join_filter_cmp_funcs_; } common::ObIArray &get_is_null_safe_cmps() { return is_null_safe_cmps_; } const common::ObIArray &get_join_filter_types() @@ -108,6 +112,8 @@ private: //equal join condition expr common::ObSEArray join_exprs_; bool is_use_filter_shuffle_; // 标记use端filter是否有shuffle + // join_filter_cmp_funcs_ is for join filter use + common::ObSEArray join_filter_cmp_funcs_; common::ObSEArray join_filter_exprs_; common::ObSEArray join_filter_types_; common::ObSEArray p2p_sequence_ids_; diff --git a/src/sql/optimizer/ob_logical_operator.cpp b/src/sql/optimizer/ob_logical_operator.cpp index 2900ba5c4..4c08a569f 100644 --- a/src/sql/optimizer/ob_logical_operator.cpp +++ b/src/sql/optimizer/ob_logical_operator.cpp @@ -4530,16 +4530,42 @@ int ObLogicalOperator::generate_runtime_filter_expr( ObLogJoinFilter *join_filter_create = static_cast(join_filter_create_op); common::ObIArray &exprs = op->get_filter_exprs(); ObRawExprFactory &expr_factory = get_plan()->get_optimizer_context().get_expr_factory(); - common::ObIArray &join_exprs = join_filter_use->get_join_exprs(); + common::ObIArray &join_use_exprs = join_filter_use->get_join_exprs(); + common::ObIArray &join_create_exprs = join_filter_create->get_join_exprs(); ObOpRawExpr *join_filter_expr = NULL; ObSQLSessionInfo *session_info = get_plan()->get_optimizer_context().get_session_info(); - if (OB_FAIL(expr_factory.create_raw_expr(T_OP_RUNTIME_FILTER, join_filter_expr))) { + if (OB_UNLIKELY(join_use_exprs.count() != join_create_exprs.count())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("join_use_exprs's size doesn't match join_create_exprs's size", + K(join_use_exprs.count()), K(join_create_exprs.count())); + } else if (OB_FAIL(expr_factory.create_raw_expr(T_OP_RUNTIME_FILTER, join_filter_expr))) { LOG_WARN("fail to create raw expr", K(ret)); } else { join_filter_expr->set_runtime_filter_type(type); - for (int i = 0; i < join_exprs.count() && OB_SUCC(ret); ++i) { - if (OB_FAIL(join_filter_expr->add_param_expr(join_exprs.at(i)))) { + for (int i = 0; i < join_use_exprs.count() && OB_SUCC(ret); ++i) { + ObRawExpr *join_use_expr = join_use_exprs.at(i); + ObRawExpr *join_create_expr = join_create_exprs.at(i); + if (OB_ISNULL(join_use_expr) || OB_ISNULL(join_create_expr)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("join_use_expr or join_create_expr is NULL!", K(join_use_expr), K(join_create_expr)); + } else if (OB_FAIL(join_filter_expr->add_param_expr(join_use_exprs.at(i)))) { LOG_WARN("fail to add param expr", K(ret)); + } else { + CK(join_use_expr->get_collation_type() == join_create_expr->get_collation_type()); + ObCmpFunc cmp_func; + const ObScale scale = ObDatumFuncs::max_scale(join_use_expr->get_result_type().get_scale(), + join_create_expr->get_result_type().get_scale()); + bool has_lob_header = is_lob_storage(join_use_expr->get_data_type()) + || is_lob_storage(join_create_expr->get_data_type()); + cmp_func.cmp_func_ = ObDatumFuncs::get_nullsafe_cmp_func( + join_use_expr->get_data_type(), + join_create_expr->get_data_type(), + lib::is_oracle_mode()? NULL_LAST : NULL_FIRST, + join_use_expr->get_collation_type(), + scale, + lib::is_oracle_mode(), + has_lob_header); + join_filter_use->add_join_filter_cmp_funcs(cmp_func.cmp_func_); } } if (OB_SUCC(ret)) {