[FEAT MERGE] [433] sql execution improvements

Co-authored-by: Zach41 <zach_41@163.com>
Co-authored-by: lucky-sinx <2549261744@qq.com>
Co-authored-by: GongYusen <986957406@qq.com>
This commit is contained in:
obdev
2024-08-28 13:54:39 +00:00
committed by ob-robot
parent 9e776df847
commit 6a68362067
256 changed files with 19203 additions and 3072 deletions

View File

@ -1191,7 +1191,27 @@ int ObWindowFunctionOp::init()
} else if (OB_ISNULL(ctx_.get_my_session())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL ptr", K(ret));
} else if (OB_FAIL(init_mem_context())) {
LOG_WARN("fail to init memory context", K(ret));
} else {
int64_t est_rows = MY_SPEC.rows_;
if (OB_FAIL(ObPxEstimateSizeUtil::get_px_size(
&ctx_, MY_SPEC.px_est_size_factor_, est_rows, est_rows))) {
LOG_WARN("failed to get px size", K(ret));
} else if (OB_FAIL(sql_mem_processor_.init(
&mem_context_->get_malloc_allocator(),
ctx_.get_my_session()->get_effective_tenant_id(),
(est_rows * MY_SPEC.width_ / MY_SPEC.estimated_part_cnt_),
MY_SPEC.type_,
MY_SPEC.id_,
&ctx_))) {
LOG_WARN("failed to init sql memory manager processor", K(ret));
} else {
LOG_DEBUG("show some est values", K(ret), K(MY_SPEC.rows_), K(est_rows), K(MY_SPEC.width_),
K(MY_SPEC.estimated_part_cnt_), K(MY_SPEC.input_rows_mem_bound_ratio_));
}
}
if (OB_SUCC(ret)) {
const uint64_t tenant_id = ctx_.get_my_session()->get_effective_tenant_id();
local_allocator_.set_tenant_id(tenant_id);
local_allocator_.set_label(ObModIds::OB_SQL_WINDOW_LOCAL);
@ -1310,6 +1330,10 @@ int ObWindowFunctionOp::init()
LOG_WARN("failed to initialize init_group_rows", K(ret));
aggr_func->~AggrCell();
aggr_func = NULL;
} else if (MY_SPEC.enable_hash_base_distinct_
&& aggr_func->aggr_processor_.has_distinct()
&& OB_FAIL(init_distinct_set(aggr_func->aggr_processor_))) {
LOG_WARN("failed to init distinct set", K(ret));
} else {
aggr_func->aggr_processor_.set_dir_id(dir_id_);
aggr_func->aggr_processor_.set_io_event_observer(&io_event_observer_);
@ -1471,6 +1495,25 @@ int ObWindowFunctionOp::build_participator_whole_msg_array()
return ret;
}
int ObWindowFunctionOp::init_mem_context()
{
int ret = OB_SUCCESS;
if (OB_ISNULL(mem_context_)) {
ObSQLSessionInfo *session = ctx_.get_my_session();
uint64_t tenant_id = session->get_effective_tenant_id();
lib::ContextParam param;
param.set_mem_attr(tenant_id, ObModIds::OB_SQL_WINDOW_ROW_STORE, ObCtxIds::WORK_AREA)
.set_properties(lib::USE_TL_PAGE_OPTIONAL);
if (OB_FAIL(CURRENT_CONTEXT->CREATE_CONTEXT(mem_context_, param))) {
LOG_WARN("create entity failed", K(ret));
} else if (OB_ISNULL(mem_context_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null memory entity returned", K(ret));
}
}
return ret;
}
int ObWindowFunctionOp::inner_open()
{
int ret = OB_SUCCESS;
@ -1483,6 +1526,10 @@ int ObWindowFunctionOp::inner_open()
LOG_WARN("init shadow copy row failed", K(ret));
} else if (OB_FAIL(init())) {
LOG_WARN("init failed", K(ret));
} else if (MY_SPEC.enable_hash_base_distinct_
&& distinct_aggr_count_ > 0
&& OB_FAIL(hp_infras_mgr_.reserve_hp_infras(distinct_aggr_count_))) {
LOG_WARN("failed to init hp infras group", K(ret), K(distinct_aggr_count_));
} else if (OB_FAIL(reset_for_scan(ctx_.get_my_session()->get_effective_tenant_id()))) {
LOG_WARN("reset_for_scan failed", K(ret));
}
@ -1517,6 +1564,8 @@ int ObWindowFunctionOp::inner_rescan()
last_row_same_order_cache_ = SAME_ORDER_CACHE_DEFAULT;
last_computed_part_rows_ = 0;
last_aggr_status_ = 0;
global_mem_limit_version_ = 0;
amm_periodic_cnt_ = 0;
next_wf_pby_expr_cnt_to_transmit_ =
const_cast<WFInfoFixedArray *>(&MY_SPEC.wf_infos_)->at(0).partition_exprs_.count();
@ -1549,6 +1598,7 @@ int ObWindowFunctionOp::inner_rescan()
int ObWindowFunctionOp::inner_close()
{
sql_mem_processor_.unregister_profile();
curr_row_collect_values_.reset();
input_rows_.foreach_store([](RowsStore *&s) { s->ra_rs_.reset(); return OB_SUCCESS; });
all_expr_datums_copy_.reset();
@ -1578,17 +1628,21 @@ int ObWindowFunctionOp::inner_close()
pby_hash_values_sets_.at(i) = NULL;
}
pby_hash_values_sets_.reset();
sql_mem_processor_.unregister_profile();
return ObOperator::inner_close();
}
void ObWindowFunctionOp::destroy()
{
sql_mem_processor_.unregister_profile_if_necessary();
hp_infras_mgr_.destroy();
input_rows_.~Stores();
wf_list_.~WinFuncCellList();
local_allocator_.reset();
local_allocator_.~ObArenaAllocator();
rescan_alloc_.~ObArenaAllocator();
patch_alloc_.~ObArenaAllocator();
destroy_mem_context();
ObOperator::destroy();
}
@ -1639,10 +1693,15 @@ int ObWindowFunctionOp::create_row_store(RowsStore *&s)
{
int ret = OB_SUCCESS;
if (NULL == s) {
s = OB_NEWx(RowsStore, (&local_allocator_));
s = OB_NEWx(RowsStore, (&local_allocator_), (*this));
if (NULL == s) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
} else if (OB_FAIL(s->prior_dumping_rows_stores_.init(
MY_SPEC.is_vectorized() + MY_SPEC.range_dist_parallel_))) {
// one for processed of vec and one for first of rd
LOG_WARN("prior_dumping_rows_stores_ prepare allocate failed",
K(ret), K(MY_SPEC.is_vectorized()), K(MY_SPEC.range_dist_parallel_));
}
}
return ret;
@ -2091,13 +2150,10 @@ int ObWindowFunctionOp::detect_aggr_status() // for participator
}
// for participator, add aggr result row
int ObWindowFunctionOp::found_part_end(
const WinFuncCell *end,
//const int64_t aggr_status_value,
RowsStore *rows_store,
bool add_row_cnt /* = true */)
int ObWindowFunctionOp::found_part_end(const WinFuncCell *end, bool add_row_cnt /* = true */)
{
int ret = OB_SUCCESS;
RowsStore *rows_store = input_rows_.cur_;
if (MY_SPEC.is_participator()) {
if (last_aggr_status_ < wf_list_.get_last()->wf_idx_) {
for (WinFuncCell *wf = wf_list_.get_first(); OB_SUCC(ret) && wf != end; wf = wf->get_next()) {
@ -2141,6 +2197,13 @@ int ObWindowFunctionOp::found_part_end(
}
}
}
if (OB_SUCC(ret) && !rows_store->ra_rs_.is_empty_save_row_cnt()) {
// If there are at least two blocks in the store, we need to check if we need to
// dump the input_rows at the end of one partition
if (OB_FAIL(rows_store->process_dump<true>(true))) {
LOG_WARN("fail to process dump for input store", K(ret));
}
}
return ret;
}
@ -2204,7 +2267,7 @@ int ObWindowFunctionOp::partial_next_row()
LOG_WARN("check other window function failed", K(ret));
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(found_part_end(end, input_rows_.cur_))) {
} else if (OB_FAIL(found_part_end(end))) {
// For participator, add aggr result row to input rows
LOG_WARN("found_part_end failed", K(ret), K(last_aggr_status_));
} else if (OB_FAIL(compute_wf_values(end, check_times))) {
@ -2216,7 +2279,16 @@ int ObWindowFunctionOp::partial_next_row()
ret = OB_ITER_END;
} else if (MY_SPEC.range_dist_parallel_ && !first_part_saved_) {
first_part_saved_ = true;
foreach_stores([](Stores &s) { std::swap(s.cur_, s.first_); return OB_SUCCESS; });
ret = foreach_stores([](Stores &s) {
int ret = OB_SUCCESS;
std::swap(s.cur_, s.first_);
if (OB_FAIL(s.cur_->prior_dumping_rows_stores_.push_back(s.first_))) {
LOG_WARN("push_back s.first_ to prior_dumping_rows_stores_ failed", K(ret));
} else {
s.first_->ra_rs_.finish_add_row();
}
return ret;
});
continue;
} else if (MY_SPEC.range_dist_parallel_ && child_iter_end_ && !last_part_saved_) {
last_part_saved_ = true;
@ -3546,7 +3618,7 @@ int ObWindowFunctionOp::get_next_partition(int64_t &check_times)
if (child_iter_end_) {
if (!first_batch) {
if (OB_FAIL(found_part_end(end, input_rows_.cur_))) {
if (OB_FAIL(found_part_end(end))) {
// add aggr result row for the last part
LOG_WARN("found_part_end failed", K(ret), K(last_aggr_status_));
} else if (OB_FAIL(compute_wf_values(end, check_times))) {
@ -3559,7 +3631,7 @@ int ObWindowFunctionOp::get_next_partition(int64_t &check_times)
|| child_brs->size_ == (row_idx = next_nonskip_row_index(row_idx, *child_brs))) {
child_iter_end_ = true;
if (!first_batch) {
if (OB_FAIL(found_part_end(end, input_rows_.cur_))) {
if (OB_FAIL(found_part_end(end))) {
LOG_WARN("found_part_end failed", K(ret), K(last_aggr_status_));
} else if (OB_FAIL(compute_wf_values(end, check_times))) {
LOG_WARN("compute wf values failed", K(ret));
@ -3630,7 +3702,7 @@ int ObWindowFunctionOp::process_child_batch(
} else if (!same_part) {
if (OB_FAIL(check_wf_same_partition(end))) {
LOG_WARN("check wf same partition failed", K(ret));
} else if (OB_FAIL(found_part_end(end, input_rows_.cur_))) {
} else if (OB_FAIL(found_part_end(end))) {
LOG_WARN("found_part_end failed", K(ret));
} else if (end != wf_list_.get_header()) {
if (OB_FAIL(found_new_part(false))) {
@ -3681,18 +3753,35 @@ int ObWindowFunctionOp::process_child_batch(
// %cur_ always be rows store we are computing.
// And rows in %processed_ row store are all computed and ready to output.
// swap $cur_ and $processed_ row store
foreach_stores([](Stores &s){ std::swap(s.cur_, s.processed_); return OB_SUCCESS; });
if (MY_SPEC.range_dist_parallel_ && !first_part_saved_) {
ret = foreach_stores([](Stores &s) {
int ret = OB_SUCCESS;
std::swap(s.cur_, s.processed_);
if (NULL != s.first_ && !s.first_->is_empty()
&& OB_FAIL(s.cur_->prior_dumping_rows_stores_.push_back(s.first_))) {
LOG_WARN("push_back s.first_ to prior_dumping_rows_stores_ failed", K(ret));
} else if (OB_FAIL(s.cur_->prior_dumping_rows_stores_.push_back(s.processed_))) {
LOG_WARN("push_back s.processed_ to prior_dumping_rows_stores_ failed", K(ret));
} else {
s.processed_->ra_rs_.finish_add_row();
s.processed_->prior_dumping_rows_stores_.clear();
}
return ret;
});
if (OB_SUCC(ret) && MY_SPEC.range_dist_parallel_ && !first_part_saved_) {
first_part_saved_ = true;
foreach_stores([](Stores &s){ std::swap(s.first_, s.processed_); return OB_SUCCESS; });
foreach_stores([](Stores &s){
std::swap(s.first_, s.processed_); return OB_SUCCESS;
});
}
}
++row_idx;
++remain.row_cnt_;
if (need_split_store && OB_FAIL(save_part_first_row_idx())) {
LOG_WARN("save partition by exprs failed", K(ret));
} else { // need to deal with remain rows in this child_brs after found new part
need_loop_until_child_brs_end = true;
if (OB_SUCC(ret)) {
++row_idx;
++remain.row_cnt_;
if (need_split_store && OB_FAIL(save_part_first_row_idx())) {
LOG_WARN("save partition by exprs failed", K(ret));
} else { // need to deal with remain rows in this child_brs after found new part
need_loop_until_child_brs_end = true;
}
}
} else if (!child_iter_end_) {
if (need_split_store) { // need_split_store is true means we have not found next part ever
@ -3710,7 +3799,7 @@ int ObWindowFunctionOp::process_child_batch(
} else { // child_iter_end_
found_next_part = true;
// add aggr result row for the last part
if (OB_FAIL(found_part_end(wf_list_.get_header(), input_rows_.cur_))) {
if (OB_FAIL(found_part_end(wf_list_.get_header()))) {
LOG_WARN("found_part_end failed", K(ret), K(last_aggr_status_));
} else if (OB_FAIL(compute_wf_values(wf_list_.get_header(), check_times))) {
LOG_WARN("compute wf values failed", K(ret));
@ -3736,9 +3825,10 @@ int ObWindowFunctionOp::output_rows_store_rows(const int64_t output_row_cnt,
i++, guard.set_batch_idx(batch_idx_offset + i)) {
const int64_t idx = i + rows_store.output_row_idx_;
if (OB_FAIL(rows_store.get_row(idx, child_row))) {
LOG_WARN("get row failed", K(ret));
LOG_WARN("get row failed",
K(ret), K(idx), K(i), K(output_row_cnt), K(rows_store.output_row_idx_));
} else if (OB_FAIL(child_row->to_expr(get_all_expr(), eval_ctx_))) {
LOG_WARN("row to expr failed", K(ret));
LOG_WARN("row to expr failed", K(ret), K(idx), K(i), K(rows_store.output_row_idx_));
} else if (MY_SPEC.is_consolidator()
&& MY_SPEC.wf_aggr_status_expr_->locate_expr_datum(eval_ctx_).get_int() < 0) {
brs_.skip_->set(eval_ctx_.get_batch_idx());
@ -4033,6 +4123,35 @@ int ObWindowFunctionOp::final_next_batch(const int64_t max_row_cnt)
return ret;
}
int ObWindowFunctionOp::init_distinct_set(ObAggregateProcessor &aggr_processor)
{
int ret = OB_SUCCESS;
aggr_processor.set_io_event_observer(&io_event_observer_);
if (OB_FAIL(init_hp_infras_group_mgr())) {
LOG_WARN("failed to init hp infras group manager", K(ret));
} else {
++distinct_aggr_count_;
aggr_processor.set_hp_infras_mgr(&hp_infras_mgr_);
aggr_processor.set_enable_hash_distinct();
}
return ret;
}
int ObWindowFunctionOp::init_hp_infras_group_mgr()
{
int ret = OB_SUCCESS;
int64_t est_rows = MY_SPEC.rows_ / MY_SPEC.estimated_part_cnt_;
uint64_t tenant_id = ctx_.get_my_session()->get_effective_tenant_id();
if (!hp_infras_mgr_.is_inited()) {
if (OB_FAIL(hp_infras_mgr_.init(tenant_id, GCONF.is_sql_operator_dump_enabled(), est_rows,
MY_SPEC.width_, true /*unique*/, 1 /*ways*/, &eval_ctx_,
&sql_mem_processor_, &io_event_observer_))) {
LOG_WARN("failed to init hash infras group", K(ret));
}
}
return ret;
}
int ObWindowFunctionOp::check_interval_valid(ObExpr &expr)
{
int ret = OB_SUCCESS;
@ -4069,5 +4188,70 @@ int ObWindowFunctionOp::check_interval_valid(ObExpr &expr)
return ret;
}
template <bool IS_INPUT>
int ObWindowFunctionOp::RowsStore::process_dump(const bool found_part_end /*false*/)
{
int ret = OB_SUCCESS;
bool need_dump = false;
if (OB_FAIL(op_.update_mem_limit_version_periodically())) {
LOG_WARN("fail to update op global memory limit version periodically", K(ret));
} else if (OB_UNLIKELY(found_part_end || need_check_dump(op_.get_global_mem_limit_version()))) {
local_mem_limit_version_ = op_.get_global_mem_limit_version();
int64_t target_dump_size = 0;
if (IS_INPUT) {
const static double MEM_DISCOUNT_FOR_PART_END = 0.8;
const double mem_bound_ratio = !found_part_end ? op_.get_input_rows_mem_bound_ratio() :
op_.get_input_rows_mem_bound_ratio() * MEM_DISCOUNT_FOR_PART_END;
target_dump_size = -(op_.sql_mem_processor_.get_mem_bound() * mem_bound_ratio);
for (int64_t i = 0; i < prior_dumping_rows_stores_.count(); ++i) {
target_dump_size += prior_dumping_rows_stores_.at(i)->ra_rs_.get_mem_hold();
}
target_dump_size += ra_rs_.get_mem_hold();
} else { // for result
// dump BIG_BLOCK_SIZE * 2 at least, because dump is already needed at this time,
// ensure the minimum amount of memory for each dump.
const static int64_t MIN_DUMP_SIZE = (256L << 10) * 2;
target_dump_size = (op_.sql_mem_processor_.get_data_size() - op_.sql_mem_processor_.get_mem_bound());
target_dump_size = MAX(target_dump_size, MIN_DUMP_SIZE);
}
if (OB_UNLIKELY(target_dump_size > 0)) {
// dump prior_dump_stores until the mem hold of that less than mem limit
// the max count of prior_dumping_rows_stores_ is 2, one for first and one for processed
const static int64_t MAX_PRIOR_ELEMENT_COUNT = 2;
if (prior_dumping_rows_stores_.count() > MAX_PRIOR_ELEMENT_COUNT) { // defense check
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the cnt of elements is unexpected", K(ret),
K(prior_dumping_rows_stores_.count()));
} else {
int64_t pop_count = 0;
for (int64_t i = 0; OB_SUCC(ret) && target_dump_size > 0
&& i < prior_dumping_rows_stores_.count(); ++i) {
const int64_t mem_hold = prior_dumping_rows_stores_.at(i)->ra_rs_.get_mem_hold();
if (OB_FAIL(prior_dumping_rows_stores_.at(i)->ra_rs_.dump(false, target_dump_size))) {
LOG_WARN("fail to dump row stores", K(ret), K(i),
K(prior_dumping_rows_stores_.at(i)->ra_rs_));
} else {
target_dump_size -=
(mem_hold - prior_dumping_rows_stores_.at(i)->ra_rs_.get_mem_hold());
pop_count += prior_dumping_rows_stores_.at(i)->ra_rs_.is_all_dumped();
}
}
if (OB_SUCC(ret) && target_dump_size > 0) {
ret = ra_rs_.dump(false, target_dump_size);
}
if (prior_dumping_rows_stores_.count() == pop_count) {
prior_dumping_rows_stores_.clear();
} else if (1 == pop_count) {
prior_dumping_rows_stores_.at(0) = prior_dumping_rows_stores_.at(1);
prior_dumping_rows_stores_.pop_back(); // pop_back is only --count_ for array
}
op_.sql_mem_processor_.set_number_pass(1);
}
}
}
return ret;
}
} // namespace sql
} // namespace oceanbase