[CP] Bug fix: MergeJoin Operator return -4016 when set datum store mem limits

This commit is contained in:
GongYusen
2024-07-09 14:04:25 +00:00
committed by ob-robot
parent f2a2f17564
commit 7c93f607e0
2 changed files with 50 additions and 16 deletions

View File

@ -875,6 +875,36 @@ int ObMergeJoinOp::set_is_match(const int64_t idx, const bool is_match)
return ret;
}
int ObMergeJoinOp::update_store_mem_bound(ObRADatumStore *left, ObRADatumStore *right)
{
int ret = OB_SUCCESS;
bool updated = false;
ObMergeJoinMemChecker checker(get_total_rows_in_datum_store());
if (OB_FAIL(sql_mem_processor_.update_max_available_mem_size_periodically(
&mem_context_->get_malloc_allocator(),
checker,
updated))) {
LOG_WARN("failed to update max available memory size periodically", K(ret));
} else {
int64_t t_mem_bound = sql_mem_processor_.get_mem_bound();
int64_t l_mem_bound = static_cast<int64_t>(left_mem_bound_ratio_ * t_mem_bound);
int64_t r_mem_bound = t_mem_bound - l_mem_bound;
if (OB_UNLIKELY(t_mem_bound < 0) || OB_UNLIKELY(l_mem_bound < 0) || OB_UNLIKELY(r_mem_bound < 0)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected negative mem bound", K(ret), K(t_mem_bound), K(l_mem_bound),
K(r_mem_bound), K(left_mem_bound_ratio_));
} else {
if (OB_NOT_NULL(left)) {
left->set_mem_limit(l_mem_bound);
}
if (OB_NOT_NULL(right)) {
right->set_mem_limit(r_mem_bound);
}
}
}
return ret;
}
int ObMergeJoinOp::process_dump()
{
int ret = OB_SUCCESS;
@ -1107,6 +1137,11 @@ int ObMergeJoinOp::ChildBatchFetcher::get_next_small_group(int64_t &cmp_res)
bool enough_datums = merge_join_op_.has_enough_datums();
JoinRowList row_list(datum_store_.get_row_cnt());
ObEvalCtx::BatchInfoScopeGuard guard(merge_join_op_.eval_ctx_);
if (need_store_unmatch && OB_FAIL(merge_join_op_.update_store_mem_bound(
is_left ? &datum_store_ : nullptr,
is_left ? nullptr : &datum_store_))) {
LOG_WARN("failed to update max available memory size periodically", K(ret));
}
while (OB_SUCC(ret) && !all_batch_finished && !greater_found && !enough_datums) {
if (need_store_unmatch && OB_LIKELY(cur_idx_ < brs_.size_)) {
ObRADatumStore::StoredRow *stored_row = NULL;
@ -1366,18 +1401,8 @@ int ObMergeJoinOp::iterate_both_chidren(ObEvalCtx::BatchInfoScopeGuard &guard)
JoinRowList r_row_list(r_store->get_row_cnt());
ObRADatumStore::StoredRow *l_row = NULL;
ObRADatumStore::StoredRow *r_row = NULL;
bool updated = false;
if (OB_FAIL(sql_mem_processor_.update_max_available_mem_size_periodically(
&mem_context_->get_malloc_allocator(),
[&](int64_t cur_cnt) { return get_total_rows_in_datum_store() > cur_cnt; },
updated))) {
if (OB_FAIL(update_store_mem_bound(l_store, r_store))) {
LOG_WARN("failed to update max available memory size periodically", K(ret));
}
int64_t t_mem_bound = sql_mem_processor_.get_mem_bound();
int64_t l_mem_bound = static_cast<int64_t>(left_mem_bound_ratio_ * t_mem_bound);
int64_t r_mem_bound = 0; // will be set to the remaining memory size later.
l_store->set_mem_limit(l_mem_bound);
if (OB_FAIL(ret)) {
} else if (OB_FAIL(store_group_first_row(left_brs_fetcher_, l_row_list, l_row, guard))) {
LOG_WARN("store left group first row failed", K(ret));
} else if (OB_FAIL(store_group_first_row(right_brs_fetcher_, r_row_list, r_row, guard))) {
@ -1386,11 +1411,7 @@ int ObMergeJoinOp::iterate_both_chidren(ObEvalCtx::BatchInfoScopeGuard &guard)
MY_SPEC.is_left_unique_,
l_row)))) {
LOG_WARN("get left next group failed", K(ret));
} else if (OB_UNLIKELY((r_mem_bound = t_mem_bound - l_store->get_mem_hold()) < 0)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected negative mem bound", K(ret), K(t_mem_bound), K(l_mem_bound),
K(r_mem_bound), K(l_store->get_mem_hold()));
} else if (FALSE_IT(r_store->set_mem_limit(r_mem_bound))) {
} else if (OB_FAIL((right_brs_fetcher_.get_next_equal_group<false>(r_row_list, l_row, false,
r_row)))) {
LOG_WARN("get right next group failed", K(ret));

View File

@ -23,6 +23,18 @@ namespace oceanbase
{
namespace sql
{
class ObMergeJoinMemChecker
{
public:
ObMergeJoinMemChecker(int64_t row_cnt): cur_row_cnt_(row_cnt)
{}
bool operator()(int64_t max_row_count)
{
return cur_row_cnt_ > max_row_count;
}
int64_t cur_row_cnt_;
};
class ObMergeJoinSpec: public ObJoinSpec
{
OB_UNIS_VERSION_V(1);
@ -490,6 +502,7 @@ private:
inline bool is_match(const int64_t idx) { return rj_match_vec_->at(idx); }
// this function only used for non-vectorized code patch
int process_dump();
int update_store_mem_bound(ObRADatumStore *left, ObRADatumStore *right);
typedef int (ObMergeJoinOp::*state_operation_func_type)();
typedef int (ObMergeJoinOp::*state_function_func_type)();
private: