[FEAT MERGE] [435] sql execution improvements
Co-authored-by: GongYusen <986957406@qq.com> Co-authored-by: Zach41 <zach_41@163.com> Co-authored-by: Cai-Yao <729673078@qq.com>
This commit is contained in:
@ -90,7 +90,7 @@ int ObWindowFunctionOpInput::sync_wait(
|
||||
} else if (OB_ISNULL(whole_msg_provider)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected status: whole_msg_provider is null", K(ret));
|
||||
} else if (!whole_msg_provider->msg_set()) {
|
||||
} else if (!whole_msg_provider->whole_msg_set()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected status: whole_msg_provider has not been set msg", K(ret));
|
||||
} else {
|
||||
@ -129,7 +129,7 @@ int ObWindowFunctionOpInput::sync_wait(
|
||||
} // end while
|
||||
if (OB_SUCC(ret)) {
|
||||
ObSpinLockGuard guard(shared_info->lock_);
|
||||
if (whole_msg_provider->msg_set()) {
|
||||
if (whole_msg_provider->whole_msg_set()) {
|
||||
whole_msg_provider->reset();
|
||||
//ATOMIC_SET(&sync_cnt, 0);
|
||||
}
|
||||
|
||||
@ -462,6 +462,7 @@ void ObWindowFunctionVecOp::destroy()
|
||||
wf_list_.~WinFuncColExprList();
|
||||
rescan_alloc_.~ObArenaAllocator();
|
||||
patch_alloc_.~ObArenaAllocator();
|
||||
hp_infras_mgr_.destroy();
|
||||
destroy_mem_context();
|
||||
local_allocator_ = nullptr;
|
||||
ObOperator::destroy();
|
||||
@ -681,6 +682,11 @@ int ObWindowFunctionVecOp::init()
|
||||
K(MY_SPEC.estimated_part_cnt_), K(MY_SPEC.input_rows_mem_bound_ratio_));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(init_hp_infras_group_mgr())) {
|
||||
LOG_WARN("init hp infras group mgr failed", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else {
|
||||
@ -716,6 +722,7 @@ int ObWindowFunctionVecOp::init()
|
||||
}
|
||||
}
|
||||
// create aggr rows
|
||||
int64_t distinct_aggr_count = 0;
|
||||
for (int wf_idx = 1; OB_SUCC(ret) && wf_idx <= wf_infos.count(); wf_idx++) {
|
||||
WinFuncInfo &wf_info = wf_infos.at(wf_idx - 1);
|
||||
for (int j = 0; OB_SUCC(ret) && j < wf_info.partition_exprs_.count(); j++) {
|
||||
@ -786,6 +793,9 @@ int ObWindowFunctionVecOp::init()
|
||||
LOG_WARN("allocate aggr expr failed", K(ret));
|
||||
} else {
|
||||
win_col->wf_expr_ = aggr_expr;
|
||||
if (wf_info.aggr_info_.has_distinct_) {
|
||||
distinct_aggr_count++;
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -909,6 +919,10 @@ int ObWindowFunctionVecOp::init()
|
||||
max_pby_col_cnt_ = all_part_exprs_.count();
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && distinct_aggr_count > 0 && OB_FAIL(hp_infras_mgr_.reserve_hp_infras(distinct_aggr_count))) {
|
||||
LOG_WARN("reserve hp infras failed", K(ret));
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && MY_SPEC.is_participator()) {
|
||||
if (OB_FAIL(build_pby_hash_values_for_transmit())) {
|
||||
LOG_WARN("build transimitting hash values failed", K(ret));
|
||||
@ -2234,7 +2248,8 @@ int ObWindowFunctionVecOp::compute_wf_values(WinFuncColExpr *end, int64_t &check
|
||||
if (OB_SUCC(ret) && it->wf_expr_->is_aggregate_expr()) {
|
||||
// enable removal optimization
|
||||
it->agg_ctx_->removal_info_.enable_removal_opt_ =
|
||||
!(MY_SPEC.single_part_parallel_) && it->wf_info_.remove_type_ != common::REMOVE_INVALID;
|
||||
!(MY_SPEC.single_part_parallel_) && it->wf_info_.remove_type_ != common::REMOVE_INVALID
|
||||
&& !it->wf_info_.aggr_info_.has_distinct_;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(it->wf_expr_->process_partition(win_expr_ctx, it->part_first_row_idx_,
|
||||
@ -3321,8 +3336,12 @@ bool ObWindowFunctionVecOp::all_supported_winfuncs(const ObIArray<ObWinFunRawExp
|
||||
for (int i = 0; ret && i < win_exprs.count(); i++) {
|
||||
ObWinFunRawExpr *win_expr = win_exprs.at(i);
|
||||
if (win_expr->get_agg_expr() != nullptr) {
|
||||
ret = aggregate::supported_aggregate_function(win_expr->get_func_type())
|
||||
&& !win_expr->get_agg_expr()->is_param_distinct();
|
||||
if (GET_MIN_CLUSTER_VERSION() < CLUSTER_VERSION_4_3_5_0) {
|
||||
ret = aggregate::supported_aggregate_function(win_expr->get_func_type())
|
||||
&& !win_expr->get_agg_expr()->is_param_distinct();
|
||||
} else {
|
||||
ret = aggregate::supported_aggregate_function(win_expr->get_func_type());
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -3456,7 +3475,9 @@ int WinFuncColExpr::init_aggregate_ctx(const int64_t tenant_id)
|
||||
LOG_WARN("allocate memory failed", K(ret));
|
||||
} else if (OB_FAIL(agg_expr->aggr_processor_->init())) {
|
||||
LOG_WARN("processor init failed", K(ret), K(wf_info_.aggr_info_));
|
||||
} else if (FALSE_IT(agg_expr->aggr_processor_->set_io_event_observer(&op_.io_event_observer_))) {
|
||||
} else if (FALSE_IT(agg_expr->aggr_processor_->set_support_fast_single_row_agg(true))) {
|
||||
} else if (FALSE_IT(agg_expr->aggr_processor_->set_hp_infras_mgr(&op_.hp_infras_mgr_))) {
|
||||
} else if (FALSE_IT(agg_ctx_ = agg_expr->aggr_processor_->get_rt_ctx())) {
|
||||
} else if (FALSE_IT(aggr_row_buf_sz = op_.spec_.max_batch_size_ * agg_ctx_->row_meta().row_size_)) {
|
||||
// do nothing
|
||||
@ -4081,5 +4102,20 @@ int ObWindowFunctionVecOpInput::sync_wait(ObExecContext &ctx, ObReportingWFWhole
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObWindowFunctionVecOp::init_hp_infras_group_mgr()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!hp_infras_mgr_.is_inited()) {
|
||||
int64_t est_rows = MY_SPEC.rows_ / MY_SPEC.estimated_part_cnt_;
|
||||
uint64_t tenant_id = ctx_.get_my_session()->get_effective_tenant_id();
|
||||
if (OB_FAIL(hp_infras_mgr_.init(tenant_id, GCONF.is_sql_operator_dump_enabled(), est_rows,
|
||||
MY_SPEC.width_, true, 1, &eval_ctx_, &sql_mem_processor_,
|
||||
&io_event_observer_, NONE_COMPRESSOR))) {
|
||||
LOG_WARN("init hp mgr failed", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // end sql
|
||||
} // end oceanbase
|
||||
|
||||
@ -20,6 +20,7 @@
|
||||
#include "sql/engine/basic/ob_vector_result_holder.h"
|
||||
#include "sql/engine/window_function/ob_window_function_op.h"
|
||||
#include "sql/engine/px/datahub/components/ob_dh_second_stage_reporting_wf.h"
|
||||
#include "sql/engine/basic/ob_hp_infras_vec_mgr.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -241,7 +242,8 @@ public:
|
||||
sql_mem_processor_(profile_, op_monitor_info_),
|
||||
global_mem_limit_version_(0),
|
||||
amm_periodic_cnt_(0),
|
||||
store_it_age_()
|
||||
store_it_age_(),
|
||||
hp_infras_mgr_(MTL_ID())
|
||||
{}
|
||||
|
||||
virtual ~ObWindowFunctionVecOp() { destroy(); }
|
||||
@ -429,6 +431,8 @@ private:
|
||||
{
|
||||
return local_allocator_->used();
|
||||
}
|
||||
|
||||
int init_hp_infras_group_mgr();
|
||||
public:
|
||||
struct OpBatchCtx { // values used to help batch-calculation
|
||||
const ObCompactRow **stored_rows_;
|
||||
@ -548,6 +552,7 @@ private:
|
||||
int64_t amm_periodic_cnt_;
|
||||
|
||||
ObTempBlockStore::IterationAge store_it_age_;
|
||||
ObHashPartInfrasVecMgr hp_infras_mgr_;
|
||||
};
|
||||
|
||||
} // end sql
|
||||
|
||||
@ -1128,6 +1128,12 @@ int AggrExpr::process_window(WinExprEvalCtx &ctx, const Frame &frame, const int6
|
||||
}
|
||||
}
|
||||
} // end while
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(aggr_processor_->advance_collect_result(eval_ctx.get_batch_idx(),
|
||||
ctx.win_col_.wf_res_row_meta_, agg_row))) {
|
||||
LOG_WARN("advance collect failed", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (aggregate::agg_res_not_null(ctx.win_col_.wf_info_.func_type_)) {
|
||||
ctx.win_col_.agg_ctx_->row_meta().locate_notnulls_bitmap(agg_row).set(0);
|
||||
|
||||
Reference in New Issue
Block a user