diff --git a/src/sql/engine/join/ob_merge_join_op.cpp b/src/sql/engine/join/ob_merge_join_op.cpp index 2a7351fd92..d8e5a731b8 100644 --- a/src/sql/engine/join/ob_merge_join_op.cpp +++ b/src/sql/engine/join/ob_merge_join_op.cpp @@ -875,6 +875,36 @@ int ObMergeJoinOp::set_is_match(const int64_t idx, const bool is_match) return ret; } +int ObMergeJoinOp::update_store_mem_bound(ObRADatumStore *left, ObRADatumStore *right) +{ + int ret = OB_SUCCESS; + bool updated = false; + ObMergeJoinMemChecker checker(get_total_rows_in_datum_store()); + if (OB_FAIL(sql_mem_processor_.update_max_available_mem_size_periodically( + &mem_context_->get_malloc_allocator(), + checker, + updated))) { + LOG_WARN("failed to update max available memory size periodically", K(ret)); + } else { + int64_t t_mem_bound = sql_mem_processor_.get_mem_bound(); + int64_t l_mem_bound = static_cast(left_mem_bound_ratio_ * t_mem_bound); + int64_t r_mem_bound = t_mem_bound - l_mem_bound; + if (OB_UNLIKELY(t_mem_bound < 0) || OB_UNLIKELY(l_mem_bound < 0) || OB_UNLIKELY(r_mem_bound < 0)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected negative mem bound", K(ret), K(t_mem_bound), K(l_mem_bound), + K(r_mem_bound), K(left_mem_bound_ratio_)); + } else { + if (OB_NOT_NULL(left)) { + left->set_mem_limit(l_mem_bound); + } + if (OB_NOT_NULL(right)) { + right->set_mem_limit(r_mem_bound); + } + } + } + return ret; +} + int ObMergeJoinOp::process_dump() { int ret = OB_SUCCESS; @@ -1107,6 +1137,11 @@ int ObMergeJoinOp::ChildBatchFetcher::get_next_small_group(int64_t &cmp_res) bool enough_datums = merge_join_op_.has_enough_datums(); JoinRowList row_list(datum_store_.get_row_cnt()); ObEvalCtx::BatchInfoScopeGuard guard(merge_join_op_.eval_ctx_); + if (need_store_unmatch && OB_FAIL(merge_join_op_.update_store_mem_bound( + is_left ? &datum_store_ : nullptr, + is_left ? nullptr : &datum_store_))) { + LOG_WARN("failed to update max available memory size periodically", K(ret)); + } while (OB_SUCC(ret) && !all_batch_finished && !greater_found && !enough_datums) { if (need_store_unmatch && OB_LIKELY(cur_idx_ < brs_.size_)) { ObRADatumStore::StoredRow *stored_row = NULL; @@ -1366,18 +1401,8 @@ int ObMergeJoinOp::iterate_both_chidren(ObEvalCtx::BatchInfoScopeGuard &guard) JoinRowList r_row_list(r_store->get_row_cnt()); ObRADatumStore::StoredRow *l_row = NULL; ObRADatumStore::StoredRow *r_row = NULL; - bool updated = false; - if (OB_FAIL(sql_mem_processor_.update_max_available_mem_size_periodically( - &mem_context_->get_malloc_allocator(), - [&](int64_t cur_cnt) { return get_total_rows_in_datum_store() > cur_cnt; }, - updated))) { + if (OB_FAIL(update_store_mem_bound(l_store, r_store))) { LOG_WARN("failed to update max available memory size periodically", K(ret)); - } - int64_t t_mem_bound = sql_mem_processor_.get_mem_bound(); - int64_t l_mem_bound = static_cast(left_mem_bound_ratio_ * t_mem_bound); - int64_t r_mem_bound = 0; // will be set to the remaining memory size later. - l_store->set_mem_limit(l_mem_bound); - if (OB_FAIL(ret)) { } else if (OB_FAIL(store_group_first_row(left_brs_fetcher_, l_row_list, l_row, guard))) { LOG_WARN("store left group first row failed", K(ret)); } else if (OB_FAIL(store_group_first_row(right_brs_fetcher_, r_row_list, r_row, guard))) { @@ -1386,11 +1411,7 @@ int ObMergeJoinOp::iterate_both_chidren(ObEvalCtx::BatchInfoScopeGuard &guard) MY_SPEC.is_left_unique_, l_row)))) { LOG_WARN("get left next group failed", K(ret)); - } else if (OB_UNLIKELY((r_mem_bound = t_mem_bound - l_store->get_mem_hold()) < 0)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("unexpected negative mem bound", K(ret), K(t_mem_bound), K(l_mem_bound), - K(r_mem_bound), K(l_store->get_mem_hold())); - } else if (FALSE_IT(r_store->set_mem_limit(r_mem_bound))) { + } else if (OB_FAIL((right_brs_fetcher_.get_next_equal_group(r_row_list, l_row, false, r_row)))) { LOG_WARN("get right next group failed", K(ret)); diff --git a/src/sql/engine/join/ob_merge_join_op.h b/src/sql/engine/join/ob_merge_join_op.h index 6fa8dac98e..087dec6ab4 100644 --- a/src/sql/engine/join/ob_merge_join_op.h +++ b/src/sql/engine/join/ob_merge_join_op.h @@ -23,6 +23,18 @@ namespace oceanbase { namespace sql { +class ObMergeJoinMemChecker +{ +public: + ObMergeJoinMemChecker(int64_t row_cnt): cur_row_cnt_(row_cnt) + {} + bool operator()(int64_t max_row_count) + { + return cur_row_cnt_ > max_row_count; + } + int64_t cur_row_cnt_; +}; + class ObMergeJoinSpec: public ObJoinSpec { OB_UNIS_VERSION_V(1); @@ -490,6 +502,7 @@ private: inline bool is_match(const int64_t idx) { return rj_match_vec_->at(idx); } // this function only used for non-vectorized code patch int process_dump(); + int update_store_mem_bound(ObRADatumStore *left, ObRADatumStore *right); typedef int (ObMergeJoinOp::*state_operation_func_type)(); typedef int (ObMergeJoinOp::*state_function_func_type)(); private: