Fix runtime range filter correct problem.

This commit is contained in:
obdev
2023-06-16 09:12:16 +00:00
committed by ob-robot
parent 9df89f3baf
commit b8cc51f8a1
9 changed files with 146 additions and 167 deletions

View File

@ -53,6 +53,8 @@ ObExprJoinFilter::ObExprJoinFilterContext::~ObExprJoinFilterContext()
// do not destroy it, because other worker threads may not start yet
rf_msg_->dec_ref_count();
}
hash_funcs_.reset();
cmp_funcs_.reset();
}
void ObExprJoinFilter::ObExprJoinFilterContext::reset_monitor_info()
@ -121,27 +123,6 @@ int ObExprJoinFilter::cg_expr(ObExprCGCtx &expr_cg_ctx, const ObRawExpr &raw_exp
}
}
rt_expr.inner_func_cnt_ = rt_expr.arg_cnt_ * FUNCTION_CNT;
if (0 == rt_expr.inner_func_cnt_) {
// do nothing
} else if (OB_FAIL(ret)) {
} else if (OB_ISNULL(rt_expr.inner_functions_ = reinterpret_cast<void**>(expr_cg_ctx.allocator_->
alloc(sizeof(ObExpr::EvalFunc) * rt_expr.arg_cnt_ * FUNCTION_CNT)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory for inner_functions_ failed", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < rt_expr.arg_cnt_; ++i) {
rt_expr.inner_functions_[GET_FUNC(i, HASH_ROW)] =
reinterpret_cast<void*>(rt_expr.args_[i]->basic_funcs_->murmur_hash_v2_);
rt_expr.inner_functions_[GET_FUNC(i, HASH_BATCH)] =
reinterpret_cast<void*>(rt_expr.args_[i]->basic_funcs_->murmur_hash_v2_batch_);
rt_expr.inner_functions_[GET_FUNC(i, NULL_FIRST_COMPARE)] =
reinterpret_cast<void*>(rt_expr.args_[i]->basic_funcs_->null_first_cmp_);
rt_expr.inner_functions_[GET_FUNC(i, NULL_LAST_COMPARE)] =
reinterpret_cast<void*>(rt_expr.args_[i]->basic_funcs_->null_last_cmp_);
}
}
return ret;
}

View File

@ -31,19 +31,11 @@ enum RuntimeFilterType
class ObExprJoinFilter : public ObExprOperator
{
public:
#define FUNCTION_CNT 4
#define GET_FUNC(i, j) (((i) * (FUNCTION_CNT)) + (j))
enum FunctionIndex{
HASH_ROW = 0,
HASH_BATCH = 1,
NULL_FIRST_COMPARE = 2,
NULL_LAST_COMPARE = 3
};
class ObExprJoinFilterContext : public ObExprOperatorCtx
{
public:
ObExprJoinFilterContext() : ObExprOperatorCtx(),
rf_msg_(nullptr), rf_key_(), start_time_(0),
rf_msg_(nullptr), rf_key_(), hash_funcs_(), cmp_funcs_(), start_time_(0),
filter_count_(0), total_count_(0), check_count_(0),
n_times_(0), ready_ts_(0), next_check_start_pos_(0),
window_cnt_(0), window_size_(0),
@ -62,6 +54,8 @@ public:
public:
ObP2PDatahubMsgBase *rf_msg_;
ObP2PDhKey rf_key_;
ObHashFuncs hash_funcs_;
ObCmpFuncs cmp_funcs_;
int64_t start_time_;
int64_t filter_count_;
int64_t total_count_;

View File

@ -62,8 +62,7 @@ OB_SERIALIZE_MEMBER((ObJoinFilterSpec, ObOpSpec),
filter_len_,
join_keys_,
hash_funcs_,
null_first_cmp_funcs_,
null_last_cmp_funcs_,
cmp_funcs_,
filter_shared_type_,
calc_tablet_id_expr_,
rf_infos_,
@ -264,9 +263,7 @@ int ObJoinFilterOpInput::construct_msg_details(
LOG_WARN("fail to prepare allocate col cnt", K(ret));
} else if (OB_FAIL(range_msg.cells_size_.prepare_allocate(col_cnt))) {
LOG_WARN("fail to prepare allocate col cnt", K(ret));
} else if (OB_FAIL(range_msg.null_first_cmp_funcs_.assign(spec.null_first_cmp_funcs_))) {
LOG_WARN("fail to init cmp funcs", K(ret));
} else if (OB_FAIL(range_msg.null_last_cmp_funcs_.assign(spec.null_last_cmp_funcs_))) {
} else if (OB_FAIL(range_msg.cmp_funcs_.assign(spec.cmp_funcs_))) {
LOG_WARN("fail to init cmp funcs", K(ret));
} else if (OB_FAIL(range_msg.need_null_cmp_flags_.assign(spec.need_null_cmp_flags_))) {
LOG_WARN("fail to init cmp flags", K(ret));
@ -285,7 +282,7 @@ int ObJoinFilterOpInput::construct_msg_details(
LOG_WARN("fail to init in hash set", K(ret));
} else if (OB_FAIL(in_msg.cur_row_.prepare_allocate(col_cnt))) {
LOG_WARN("fail to prepare allocate col cnt", K(ret));
} else if (OB_FAIL(in_msg.cmp_funcs_.assign(spec.null_first_cmp_funcs_))) {
} else if (OB_FAIL(in_msg.cmp_funcs_.assign(spec.cmp_funcs_))) {
LOG_WARN("fail to init cmp funcs", K(ret));
} else if (OB_FAIL(in_msg.hash_funcs_for_insert_.assign(spec.hash_funcs_))) {
LOG_WARN("fail to init cmp funcs", K(ret));
@ -314,8 +311,7 @@ ObJoinFilterSpec::ObJoinFilterSpec(common::ObIAllocator &alloc, const ObPhyOpera
filter_len_(0),
join_keys_(alloc),
hash_funcs_(alloc),
null_first_cmp_funcs_(alloc),
null_last_cmp_funcs_(alloc),
cmp_funcs_(alloc),
filter_shared_type_(JoinFilterSharedType::INVALID_TYPE),
calc_tablet_id_expr_(NULL),
rf_infos_(alloc),
@ -838,7 +834,7 @@ int ObJoinFilterOp::open_join_filter_use()
ObExprJoinFilter::ObExprJoinFilterContext *join_filter_ctx = NULL;
if (OB_ISNULL(join_filter_ctx = static_cast<ObExprJoinFilter::ObExprJoinFilterContext *>(
ctx_.get_expr_op_ctx(MY_SPEC.rf_infos_.at(i).filter_expr_id_)))) {
if (OB_FAIL(ctx_.create_expr_op_ctx(MY_SPEC.rf_infos_.at(i).filter_expr_id_,join_filter_ctx))) {
if (OB_FAIL(ctx_.create_expr_op_ctx(MY_SPEC.rf_infos_.at(i).filter_expr_id_, join_filter_ctx))) {
LOG_WARN("failed to create operator ctx", K(ret), K(MY_SPEC.rf_infos_.at(i).filter_expr_id_));
} else {
ObP2PDhKey dh_key(MY_SPEC.rf_infos_.at(i).p2p_datahub_id_, px_seq_id, task_id);
@ -846,6 +842,17 @@ int ObJoinFilterOp::open_join_filter_use()
int64_t tenant_id = ctx_.get_my_session()->get_effective_tenant_id();
join_filter_ctx->window_size_ = ADAPTIVE_BF_WINDOW_ORG_SIZE;
join_filter_ctx->max_wait_time_ms_ = filter_input->config_.runtime_filter_wait_time_ms_;
join_filter_ctx->hash_funcs_.set_allocator(&ctx_.get_allocator());
join_filter_ctx->cmp_funcs_.set_allocator(&ctx_.get_allocator());
if (OB_FAIL(join_filter_ctx->hash_funcs_.init(MY_SPEC.hash_funcs_.count()))) {
LOG_WARN("failed to assign hash_func");
} else if (OB_FAIL(join_filter_ctx->cmp_funcs_.init(MY_SPEC.cmp_funcs_.count()))) {
LOG_WARN("failed to assign cmp_funcs_");
} else if (OB_FAIL(join_filter_ctx->hash_funcs_.assign(MY_SPEC.hash_funcs_))) {
LOG_WARN("failed to assign hash_func");
} else if (OB_FAIL(join_filter_ctx->cmp_funcs_.assign(MY_SPEC.cmp_funcs_))) {
LOG_WARN("failed to assign cmp_funcs_");
}
}
} else {
ret = OB_ERR_UNEXPECTED;

View File

@ -194,8 +194,7 @@ public:
int64_t filter_len_;
ExprFixedArray join_keys_;
common::ObHashFuncs hash_funcs_;
ObCmpFuncs null_first_cmp_funcs_;
ObCmpFuncs null_last_cmp_funcs_;
ObCmpFuncs cmp_funcs_;
JoinFilterSharedType filter_shared_type_;
ObExpr *calc_tablet_id_expr_;
common::ObFixedArray<ObRuntimeFilterInfo, common::ObIAllocator> rf_infos_;

View File

@ -79,8 +79,7 @@ OB_DEF_SERIALIZE(ObRFRangeFilterMsg)
upper_bounds_,
need_null_cmp_flags_,
cells_size_,
null_first_cmp_funcs_,
null_last_cmp_funcs_);
cmp_funcs_);
return ret;
}
@ -93,8 +92,7 @@ OB_DEF_DESERIALIZE(ObRFRangeFilterMsg)
upper_bounds_,
need_null_cmp_flags_,
cells_size_,
null_first_cmp_funcs_,
null_last_cmp_funcs_);
cmp_funcs_);
if (OB_FAIL(adjust_cell_size())) {
LOG_WARN("fail do adjust cell size", K(ret));
}
@ -110,8 +108,7 @@ OB_DEF_SERIALIZE_SIZE(ObRFRangeFilterMsg)
upper_bounds_,
need_null_cmp_flags_,
cells_size_,
null_first_cmp_funcs_,
null_last_cmp_funcs_);
cmp_funcs_);
return len;
}
@ -477,15 +474,9 @@ int ObRFBloomFilterMsg::might_contain(const ObExpr &expr,
if (OB_FAIL(expr.args_[i]->eval(ctx, datum))) {
LOG_WARN("failed to eval datum", K(ret));
} else {
if (OB_ISNULL(expr.inner_functions_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("expr.inner_functions_ is null", K(ret));
} else {
hash_func.hash_func_ = reinterpret_cast<ObDatumHashFuncType>(
expr.inner_functions_[GET_FUNC(i, ObExprJoinFilter::HASH_ROW)]);
if (OB_FAIL(hash_func.hash_func_(*datum, hash_val, hash_val))) {
LOG_WARN("fail to calc hash val", K(ret));
}
hash_func.hash_func_ = filter_ctx.hash_funcs_.at(i).hash_func_;
if (OB_FAIL(hash_func.hash_func_(*datum, hash_val, hash_val))) {
LOG_WARN("fail to calc hash val", K(ret));
}
}
}
@ -540,19 +531,12 @@ int ObRFBloomFilterMsg::might_contain_batch(
LOG_WARN("evaluate batch failed", K(ret), K(*e));
} else {
const bool is_batch_seed = (i > 0);
if (OB_ISNULL(expr.inner_functions_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the inner_functions_ of expr is null", K(ret));
} else {
ObBatchDatumHashFunc hash_func_batch =
reinterpret_cast<ObBatchDatumHashFunc>(
expr.inner_functions_[GET_FUNC(i, ObExprJoinFilter::HASH_BATCH)]);
hash_func_batch(hash_values,
e->locate_batch_datums(ctx), e->is_batch_result(),
skip, batch_size,
is_batch_seed ? hash_values : &seed,
is_batch_seed);
}
ObBatchDatumHashFunc hash_func_batch = filter_ctx.hash_funcs_.at(i).batch_hash_func_;
hash_func_batch(hash_values,
e->locate_batch_datums(ctx), e->is_batch_result(),
skip, batch_size,
is_batch_seed ? hash_values : &seed,
is_batch_seed);
}
}
if (OB_FAIL(ObBitVector::flip_foreach(skip, batch_size,
@ -812,7 +796,7 @@ int ObRFBloomFilterMsg::generate_filter_indexes(
ObRFRangeFilterMsg::ObRFRangeFilterMsg()
: ObP2PDatahubMsgBase(), lower_bounds_(allocator_), upper_bounds_(allocator_),
need_null_cmp_flags_(allocator_), cells_size_(allocator_),
null_first_cmp_funcs_(allocator_), null_last_cmp_funcs_(allocator_)
cmp_funcs_(allocator_)
{
}
@ -823,11 +807,11 @@ int ObRFRangeFilterMsg::reuse()
lower_bounds_.reset();
upper_bounds_.reset();
cells_size_.reset();
if (OB_FAIL(lower_bounds_.prepare_allocate(null_first_cmp_funcs_.count()))) {
if (OB_FAIL(lower_bounds_.prepare_allocate(cmp_funcs_.count()))) {
LOG_WARN("fail to prepare allocate col cnt", K(ret));
} else if (OB_FAIL(upper_bounds_.prepare_allocate(null_first_cmp_funcs_.count()))) {
} else if (OB_FAIL(upper_bounds_.prepare_allocate(cmp_funcs_.count()))) {
LOG_WARN("fail to prepare allocate col cnt", K(ret));
} else if (OB_FAIL(cells_size_.prepare_allocate(null_first_cmp_funcs_.count()))) {
} else if (OB_FAIL(cells_size_.prepare_allocate(cmp_funcs_.count()))) {
LOG_WARN("fail to prepare allocate col cnt", K(ret));
}
return ret;
@ -843,9 +827,7 @@ int ObRFRangeFilterMsg::assign(const ObP2PDatahubMsgBase &msg)
LOG_WARN("fail to assign lower bounds", K(ret));
} else if (OB_FAIL(upper_bounds_.assign(other_msg.upper_bounds_))) {
LOG_WARN("fail to assign upper bounds", K(ret));
} else if (OB_FAIL(null_first_cmp_funcs_.assign(other_msg.null_first_cmp_funcs_))) {
LOG_WARN("failed to assign cmp funcs", K(ret));
} else if (OB_FAIL(null_last_cmp_funcs_.assign(other_msg.null_last_cmp_funcs_))) {
} else if (OB_FAIL(cmp_funcs_.assign(other_msg.cmp_funcs_))) {
LOG_WARN("failed to assign cmp funcs", K(ret));
} else if (OB_FAIL(need_null_cmp_flags_.assign(other_msg.need_null_cmp_flags_))) {
LOG_WARN("failed to assign cmp flags", K(ret));
@ -910,7 +892,7 @@ int ObRFRangeFilterMsg::get_min(ObIArray<ObDatum> &vals)
int ret = OB_SUCCESS;
for (int i = 0; i < vals.count() && OB_SUCC(ret); ++i) {
// null value is also suitable
if (OB_FAIL(get_min(null_first_cmp_funcs_.at(i), lower_bounds_.at(i),
if (OB_FAIL(get_min(cmp_funcs_.at(i), lower_bounds_.at(i),
vals.at(i), cells_size_.at(i).min_datum_buf_size_))) {
LOG_WARN("fail to compare value", K(ret));
}
@ -922,8 +904,8 @@ int ObRFRangeFilterMsg::get_max(ObIArray<ObDatum> &vals)
{
int ret = OB_SUCCESS;
for (int i = 0; i < vals.count() && OB_SUCC(ret); ++i) {
// null value is also suitable
if (OB_FAIL(get_max(null_last_cmp_funcs_.at(i), upper_bounds_.at(i),
// null value is also suitable
if (OB_FAIL(get_max(cmp_funcs_.at(i), upper_bounds_.at(i),
vals.at(i), cells_size_.at(i).max_datum_buf_size_))) {
LOG_WARN("fail to compare value", K(ret));
}
@ -935,7 +917,9 @@ int ObRFRangeFilterMsg::get_min(ObCmpFunc &func, ObDatum &l, ObDatum &r, int64_t
{
int ret = OB_SUCCESS;
int cmp = 0;
if (is_empty_ || (OB_ISNULL(l.ptr_))) {
// when [null, null] merge [a, b], the expect result in mysql mode is [null, b]
// the lower bound l, with ptr==NULL and null_==true, should not be covered by a
if (is_empty_ || (OB_ISNULL(l.ptr_) && !l.is_null())) {
if (OB_FAIL(dynamic_copy_cell(r, l, cell_size))) {
LOG_WARN("fail to deep copy datum");
}
@ -1008,30 +992,6 @@ int ObRFRangeFilterMsg::get_max(ObCmpFunc &func, ObDatum &l, ObDatum &r, int64_t
return ret;
}
int ObRFRangeFilterMsg::might_contain(ObIArray<ObDatum> &vals, bool &is_match)
{
int ret = OB_SUCCESS;
is_match = true;
int cmp_min = 0;
int cmp_max = 0;
for (int i = 0; i < vals.count() && OB_SUCC(ret); ++i) {
cmp_min = 0;
cmp_max = 0;
if (OB_FAIL(null_first_cmp_funcs_.at(i).cmp_func_(lower_bounds_.at(i), vals.at(i), cmp_min))) {
LOG_WARN("fail to compare value", K(ret));
} else if (cmp_min > 0) {
is_match = false;
break;
} else if (OB_FAIL(null_last_cmp_funcs_.at(i).cmp_func_(upper_bounds_.at(i), vals.at(i), cmp_max))) {
LOG_WARN("fail to compare value", K(ret));
} else if (cmp_max < 0) {
is_match = false;
break;
}
}
return ret;
}
int ObRFRangeFilterMsg::insert_by_row(
const common::ObIArray<ObExpr *> &expr_array,
const common::ObHashFuncs &hash_funcs,
@ -1067,9 +1027,9 @@ int ObRFRangeFilterMsg::insert_by_row(
} else if (datum->is_null() && !need_null_cmp_flags_.at(i)) {
/*do nothing*/
break;
} else if (OB_FAIL(get_min(null_last_cmp_funcs_.at(i), lower_bounds_.at(i), *datum, cells_size_.at(i).min_datum_buf_size_))) {
} else if (OB_FAIL(get_min(cmp_funcs_.at(i), lower_bounds_.at(i), *datum, cells_size_.at(i).min_datum_buf_size_))) {
LOG_WARN("failed to compare value", K(ret));
} else if (OB_FAIL(get_max(null_first_cmp_funcs_.at(i), upper_bounds_.at(i), *datum, cells_size_.at(i).max_datum_buf_size_))) {
} else if (OB_FAIL(get_max(cmp_funcs_.at(i), upper_bounds_.at(i), *datum, cells_size_.at(i).max_datum_buf_size_))) {
LOG_WARN("failed to compare value", K(ret));
}
}
@ -1113,8 +1073,7 @@ int ObRFRangeFilterMsg::might_contain(const ObExpr &expr,
{
int ret = OB_SUCCESS;
ObDatum *datum = nullptr;
ObCmpFunc cmp_func_null_last;
ObCmpFunc cmp_func_null_first;
ObCmpFunc cmp_func;
int cmp_min = 0;
int cmp_max = 0;
bool is_match = true;
@ -1127,27 +1086,19 @@ int ObRFRangeFilterMsg::might_contain(const ObExpr &expr,
if (OB_FAIL(expr.args_[i]->eval(ctx, datum))) {
LOG_WARN("failed to eval datum", K(ret));
} else {
if (OB_ISNULL(expr.inner_functions_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("expr.inner_functions_ is null", K(ret));
} else {
cmp_min = 0;
cmp_max = 0;
cmp_func_null_first.cmp_func_ = reinterpret_cast<ObDatumCmpFuncType>(
expr.inner_functions_[GET_FUNC(i, ObExprJoinFilter::NULL_FIRST_COMPARE)]);
cmp_func_null_last.cmp_func_ = reinterpret_cast<ObDatumCmpFuncType>(
expr.inner_functions_[GET_FUNC(i, ObExprJoinFilter::NULL_LAST_COMPARE)]);
if (OB_FAIL(cmp_func_null_first.cmp_func_(*datum, lower_bounds_.at(i), cmp_min))) {
LOG_WARN("fail to compare value", K(ret));
} else if (cmp_min < 0) {
is_match = false;
break;
} else if (OB_FAIL(cmp_func_null_last.cmp_func_(*datum, upper_bounds_.at(i), cmp_max))) {
LOG_WARN("fail to compare value", K(ret));
} else if (cmp_max > 0) {
is_match = false;
break;
}
cmp_min = 0;
cmp_max = 0;
cmp_func.cmp_func_ = filter_ctx.cmp_funcs_.at(i).cmp_func_;
if (OB_FAIL(cmp_func.cmp_func_(*datum, lower_bounds_.at(i), cmp_min))) {
LOG_WARN("fail to compare value", K(ret));
} else if (cmp_min < 0) {
is_match = false;
break;
} else if (OB_FAIL(cmp_func.cmp_func_(*datum, upper_bounds_.at(i), cmp_max))) {
LOG_WARN("fail to compare value", K(ret));
} else if (cmp_max > 0) {
is_match = false;
break;
}
}
}
@ -1466,15 +1417,11 @@ int ObRFInFilterMsg::might_contain(const ObExpr &expr,
if (OB_FAIL(expr.args_[i]->eval(ctx, datum))) {
LOG_WARN("failed to eval datum", K(ret));
} else {
if (OB_ISNULL(expr.inner_functions_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("expr.inner_functions_ is null", K(ret));
} else if (OB_FAIL(cur_row.push_back(*datum))) {
if (OB_FAIL(cur_row.push_back(*datum))) {
LOG_WARN("failed to push back datum", K(ret));
} else {
ObHashFunc hash_func;
hash_func.hash_func_ = reinterpret_cast<ObDatumHashFuncType>(
expr.inner_functions_[GET_FUNC(i, ObExprJoinFilter::HASH_ROW)]);
hash_func.hash_func_ = filter_ctx.hash_funcs_.at(i).hash_func_;
if (OB_FAIL(hash_func.hash_func_(*datum, hash_val, hash_val))) {
LOG_WARN("fail to calc hash val", K(ret));
}
@ -1482,7 +1429,7 @@ int ObRFInFilterMsg::might_contain(const ObExpr &expr,
}
}
if (OB_SUCC(ret)) {
ObRFInFilterNode node(&cmp_funcs_, nullptr, &cur_row, hash_val);
ObRFInFilterNode node(&filter_ctx.cmp_funcs_, nullptr, &cur_row, hash_val);
if (OB_FAIL(rows_set_.exist_refactored(node))) {
if (OB_HASH_NOT_EXIST == ret) {
is_match = false;

View File

@ -126,8 +126,7 @@ public:
virtual int destroy() {
lower_bounds_.reset();
upper_bounds_.reset();
null_first_cmp_funcs_.reset();
null_last_cmp_funcs_.reset();
cmp_funcs_.reset();
need_null_cmp_flags_.reset();
cells_size_.reset();
allocator_.reset();
@ -156,7 +155,6 @@ public:
ObEvalCtx &eval_ctx,
uint64_t *batch_hash_values) override;
virtual int reuse() override;
int might_contain(ObIArray<ObDatum> &vals, bool &is_match);
int adjust_cell_size();
private:
int get_min(ObIArray<ObDatum> &vals);
@ -170,8 +168,7 @@ public:
ObFixedArray<ObDatum, common::ObIAllocator> upper_bounds_;
ObFixedArray<bool, common::ObIAllocator> need_null_cmp_flags_;
ObFixedArray<MinMaxCellSize, common::ObIAllocator> cells_size_;
ObCmpFuncs null_first_cmp_funcs_;
ObCmpFuncs null_last_cmp_funcs_;
ObCmpFuncs cmp_funcs_;
};
class ObRFInFilterMsg : public ObP2PDatahubMsgBase