[Vectorized](operator) support analytic eval operator (#14774)

This commit is contained in:
zhangstar333
2022-12-10 19:32:11 +08:00
committed by GitHub
parent 4c6458108c
commit ef46b580d0
8 changed files with 324 additions and 122 deletions

View File

@ -17,13 +17,10 @@
#include "vec/exec/vanalytic_eval_node.h"
#include "exprs/agg_fn_evaluator.h"
#include "exprs/anyval_util.h"
#include "runtime/descriptors.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "udf/udf_internal.h"
#include "vec/utils/util.hpp"
#include "vec/exprs/vexpr.h"
namespace doris::vectorized {
@ -41,8 +38,7 @@ VAnalyticEvalNode::VAnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode,
if (!tnode.analytic_node.__isset
.window) { //haven't set window, Unbounded: [unbounded preceding,unbounded following]
_executor.get_next = std::bind<Status>(&VAnalyticEvalNode::_get_next_for_partition, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3);
std::placeholders::_1);
} else if (tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) {
DCHECK(!_window.__isset.window_start) << "RANGE windows must have UNBOUNDED PRECEDING";
@ -53,21 +49,20 @@ VAnalyticEvalNode::VAnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode,
if (!_window.__isset
.window_end) { //haven't set end, so same as PARTITION, [unbounded preceding, unbounded following]
_executor.get_next = std::bind<Status>(&VAnalyticEvalNode::_get_next_for_partition,
this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3);
this, std::placeholders::_1);
} else {
_fn_scope = AnalyticFnScope::RANGE; //range: [unbounded preceding,current row]
_executor.get_next = std::bind<Status>(&VAnalyticEvalNode::_get_next_for_range, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3);
std::placeholders::_1);
}
} else {
if (!_window.__isset.window_start &&
!_window.__isset.window_end) { //haven't set start and end, same as PARTITION
_executor.get_next = std::bind<Status>(&VAnalyticEvalNode::_get_next_for_partition,
this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3);
this, std::placeholders::_1);
} else {
if (_window.__isset.window_start) { //calculate start boundary
TAnalyticWindowBoundary b = _window.window_start;
@ -97,8 +92,7 @@ VAnalyticEvalNode::VAnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode,
_fn_scope = AnalyticFnScope::ROWS;
_executor.get_next = std::bind<Status>(&VAnalyticEvalNode::_get_next_for_rows, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3);
std::placeholders::_1);
}
}
_agg_arena_pool = std::make_unique<Arena>();
@ -216,15 +210,28 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) {
}
Status VAnalyticEvalNode::open(RuntimeState* state) {
RETURN_IF_ERROR(alloc_resource(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_ERROR(child(0)->open(state));
return Status::OK();
}
Status VAnalyticEvalNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
release_resource(state);
return ExecNode::close(state);
}
Status VAnalyticEvalNode::alloc_resource(RuntimeState* state) {
{
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VAnalyticEvalNode::open");
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(ExecNode::open(state));
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
RETURN_IF_CANCELLED(state);
}
RETURN_IF_ERROR(child(0)->open(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_ERROR(VExpr::open(_partition_by_eq_expr_ctxs, state));
RETURN_IF_ERROR(VExpr::open(_order_by_eq_expr_ctxs, state));
@ -237,20 +244,65 @@ Status VAnalyticEvalNode::open(RuntimeState* state) {
return Status::OK();
}
Status VAnalyticEvalNode::close(RuntimeState* state) {
if (is_closed()) {
Status VAnalyticEvalNode::pull(doris::RuntimeState* /*state*/, vectorized::Block* output_block,
bool* eos) {
if (_input_eos && (_output_block_index == _input_blocks.size() || _input_total_rows == 0)) {
*eos = true;
return Status::OK();
}
while (!_input_eos || _output_block_index < _input_blocks.size()) {
_found_partition_end = _get_partition_by_end();
_need_more_input = whether_need_next_partition(_found_partition_end);
if (_need_more_input) {
return Status::OK();
}
_next_partition = _init_next_partition(_found_partition_end);
_init_result_columns();
size_t current_block_rows = _input_blocks[_output_block_index].rows();
_executor.get_next(current_block_rows);
if (_window_end_position == current_block_rows) {
break;
}
}
RETURN_IF_ERROR(_output_current_block(output_block));
RETURN_IF_ERROR(
VExprContext::filter_block(_vconjunct_ctx_ptr, output_block, output_block->columns()));
reached_limit(output_block, eos);
return Status::OK();
}
void VAnalyticEvalNode::release_resource(RuntimeState* state) {
if (is_closed()) {
return;
}
START_AND_SCOPE_SPAN(state->get_tracer(), span, "VAnalyticEvalNode::close");
VExpr::close(_partition_by_eq_expr_ctxs, state);
VExpr::close(_order_by_eq_expr_ctxs, state);
for (size_t i = 0; i < _agg_functions_size; ++i) VExpr::close(_agg_expr_ctxs[i], state);
for (auto* agg_function : _agg_functions) agg_function->close(state);
for (size_t i = 0; i < _agg_functions_size; ++i) {
VExpr::close(_agg_expr_ctxs[i], state);
}
for (auto* agg_function : _agg_functions) {
agg_function->close(state);
}
_destroy_agg_status();
_release_mem();
return ExecNode::close(state);
return ExecNode::release_resource(state);
}
//TODO: maybe could have better strategy, not noly when need data to sink data
//even could get some resources in advance as soon as possible
bool VAnalyticEvalNode::can_write() {
return _need_more_input;
}
bool VAnalyticEvalNode::can_read() {
if (_need_more_input) {
return false;
}
return true;
}
Status VAnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
@ -268,99 +320,70 @@ Status VAnalyticEvalNode::get_next(RuntimeState* state, vectorized::Block* block
return Status::OK();
}
RETURN_IF_ERROR(_executor.get_next(state, block, eos));
while (!_input_eos || _output_block_index < _input_blocks.size()) {
RETURN_IF_ERROR(_consumed_block_and_init_partition(state, &_next_partition, eos));
if (*eos) {
return Status::OK();
}
size_t current_block_rows = _input_blocks[_output_block_index].rows();
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_ERROR(_executor.get_next(current_block_rows));
if (_window_end_position == current_block_rows) {
break;
}
}
RETURN_IF_ERROR(_output_current_block(block));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
RETURN_IF_ERROR(VExprContext::filter_block(_vconjunct_ctx_ptr, block, block->columns()));
reached_limit(block, eos);
return Status::OK();
}
Status VAnalyticEvalNode::_get_next_for_partition(RuntimeState* state, Block* block, bool* eos) {
while (!_input_eos || _output_block_index < _input_blocks.size()) {
bool next_partition = false;
RETURN_IF_ERROR(_consumed_block_and_init_partition(state, &next_partition, eos));
if (*eos) {
break;
}
Status VAnalyticEvalNode::_get_next_for_partition(size_t current_block_rows) {
if (_next_partition) {
_executor.execute(_partition_by_start.pos, _partition_by_end.pos, _partition_by_start.pos,
_partition_by_end.pos);
}
_executor.insert_result(current_block_rows);
return Status::OK();
}
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
size_t current_block_rows = _input_blocks[_output_block_index].rows();
if (next_partition) {
_executor.execute(_partition_by_start.pos, _partition_by_end.pos,
_partition_by_start.pos, _partition_by_end.pos);
Status VAnalyticEvalNode::_get_next_for_range(size_t current_block_rows) {
while (_current_row_position < _partition_by_end.pos &&
_window_end_position < current_block_rows) {
if (_current_row_position >= _order_by_end.pos) {
_update_order_by_range();
_executor.execute(_order_by_start.pos, _order_by_end.pos, _order_by_start.pos,
_order_by_end.pos);
}
_executor.insert_result(current_block_rows);
if (_window_end_position == current_block_rows) {
return _output_current_block(block);
}
}
return Status::OK();
}
Status VAnalyticEvalNode::_get_next_for_range(RuntimeState* state, Block* block, bool* eos) {
while (!_input_eos || _output_block_index < _input_blocks.size()) {
bool next_partition = false;
RETURN_IF_ERROR(_consumed_block_and_init_partition(state, &next_partition, eos));
if (*eos) {
break;
}
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
size_t current_block_rows = _input_blocks[_output_block_index].rows();
while (_current_row_position < _partition_by_end.pos &&
_window_end_position < current_block_rows) {
if (_current_row_position >= _order_by_end.pos) {
_update_order_by_range();
_executor.execute(_order_by_start.pos, _order_by_end.pos, _order_by_start.pos,
_order_by_end.pos);
}
_executor.insert_result(current_block_rows);
}
if (_window_end_position == current_block_rows) {
return _output_current_block(block);
}
}
return Status::OK();
}
Status VAnalyticEvalNode::_get_next_for_rows(RuntimeState* state, Block* block, bool* eos) {
while (!_input_eos || _output_block_index < _input_blocks.size()) {
bool next_partition = false;
RETURN_IF_ERROR(_consumed_block_and_init_partition(state, &next_partition, eos));
if (*eos) {
break;
}
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
size_t current_block_rows = _input_blocks[_output_block_index].rows();
while (_current_row_position < _partition_by_end.pos &&
_window_end_position < current_block_rows) {
int64_t range_start, range_end;
if (!_window.__isset.window_start &&
_window.window_end.type ==
TAnalyticWindowBoundaryType::
CURRENT_ROW) { //[preceding, current_row],[current_row, following]
range_start = _current_row_position;
range_end = _current_row_position +
1; //going on calculate,add up data, no need to reset state
Status VAnalyticEvalNode::_get_next_for_rows(size_t current_block_rows) {
while (_current_row_position < _partition_by_end.pos &&
_window_end_position < current_block_rows) {
int64_t range_start, range_end;
if (!_window.__isset.window_start &&
_window.window_end.type ==
TAnalyticWindowBoundaryType::
CURRENT_ROW) { //[preceding, current_row],[current_row, following]
range_start = _current_row_position;
range_end = _current_row_position +
1; //going on calculate,add up data, no need to reset state
} else {
_reset_agg_status();
if (!_window.__isset
.window_start) { //[preceding, offset] --unbound: [preceding, following]
range_start = _partition_by_start.pos;
} else {
_reset_agg_status();
if (!_window.__isset
.window_start) { //[preceding, offset] --unbound: [preceding, following]
range_start = _partition_by_start.pos;
} else {
range_start = _current_row_position + _rows_start_offset;
}
range_end = _current_row_position + _rows_end_offset + 1;
range_start = _current_row_position + _rows_start_offset;
}
_executor.execute(_partition_by_start.pos, _partition_by_end.pos, range_start,
range_end);
_executor.insert_result(current_block_rows);
}
if (_window_end_position == current_block_rows) {
return _output_current_block(block);
range_end = _current_row_position + _rows_end_offset + 1;
}
_executor.execute(_partition_by_start.pos, _partition_by_end.pos, range_start, range_end);
_executor.insert_result(current_block_rows);
}
return Status::OK();
}
@ -492,14 +515,22 @@ Status VAnalyticEvalNode::_fetch_next_block_data(RuntimeState* state) {
_children[0]->get_next_span(), _input_eos);
} while (!_input_eos && block.rows() == 0);
if (_input_eos && block.rows() == 0) {
RETURN_IF_ERROR(sink(state, &block, _input_eos));
return Status::OK();
}
Status VAnalyticEvalNode::sink(doris::RuntimeState* /*state*/, vectorized::Block* input_block,
bool eos) {
_input_eos = eos;
if (_input_eos && input_block->rows() == 0) {
_need_more_input = false;
return Status::OK();
}
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
input_block_first_row_positions.emplace_back(_input_total_rows);
size_t block_rows = block.rows();
size_t block_rows = input_block->rows();
_input_total_rows += block_rows;
_all_block_end.block_num = _input_blocks.size();
_all_block_end.row_num = block_rows;
@ -507,7 +538,7 @@ Status VAnalyticEvalNode::_fetch_next_block_data(RuntimeState* state) {
if (_origin_cols
.empty()) { //record origin columns, maybe be after this, could cast some column but no need to save
for (int c = 0; c < block.columns(); ++c) {
for (int c = 0; c < input_block->columns(); ++c) {
_origin_cols.emplace_back(c);
}
}
@ -515,31 +546,33 @@ Status VAnalyticEvalNode::_fetch_next_block_data(RuntimeState* state) {
for (size_t i = 0; i < _agg_functions_size;
++i) { //insert _agg_intput_columns, execute calculate for its
for (size_t j = 0; j < _agg_expr_ctxs[i].size(); ++j) {
RETURN_IF_ERROR(_insert_range_column(&block, _agg_expr_ctxs[i][j],
RETURN_IF_ERROR(_insert_range_column(input_block, _agg_expr_ctxs[i][j],
_agg_intput_columns[i][j].get(), block_rows));
}
}
//record column idx in block
for (size_t i = 0; i < _partition_by_eq_expr_ctxs.size(); ++i) {
int result_col_id = -1;
RETURN_IF_ERROR(_partition_by_eq_expr_ctxs[i]->execute(&block, &result_col_id));
RETURN_IF_ERROR(_partition_by_eq_expr_ctxs[i]->execute(input_block, &result_col_id));
DCHECK_GE(result_col_id, 0);
_partition_by_column_idxs[i] = result_col_id;
}
for (size_t i = 0; i < _order_by_eq_expr_ctxs.size(); ++i) {
int result_col_id = -1;
RETURN_IF_ERROR(_order_by_eq_expr_ctxs[i]->execute(&block, &result_col_id));
RETURN_IF_ERROR(_order_by_eq_expr_ctxs[i]->execute(input_block, &result_col_id));
DCHECK_GE(result_col_id, 0);
_ordey_by_column_idxs[i] = result_col_id;
}
mem_tracker_held()->consume(block.allocated_bytes());
_blocks_memory_usage->add(block.allocated_bytes());
mem_tracker_held()->consume(input_block->allocated_bytes());
_blocks_memory_usage->add(input_block->allocated_bytes());
//TODO: if need improvement, the is a tips to maintain a free queue,
//so the memory could reuse, no need to new/delete again;
_input_blocks.emplace_back(std::move(block));
_input_blocks.emplace_back(std::move(*input_block));
_found_partition_end = _get_partition_by_end();
_need_more_input = whether_need_next_partition(_found_partition_end);
return Status::OK();
}