[pipelineX](refactor) refine codes (#23521)

* [pipelineX](refactor) refine codes

* update

* update
This commit is contained in:
Gabriel
2023-08-28 14:38:07 +08:00
committed by GitHub
parent 4c8fc06e40
commit 28a2e71084
10 changed files with 16 additions and 38 deletions

View File

@ -59,6 +59,7 @@ AggSinkLocalState::AggSinkLocalState(DataSinkOperatorX* parent, RuntimeState* st
_max_row_size_counter(nullptr) {}
Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info));
_dependency = (AggDependency*)info.dependency;
_shared_state = (AggSharedState*)_dependency->shared_state();
_agg_data = _shared_state->agg_data.get();
@ -78,7 +79,6 @@ Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
for (size_t i = 0; i < _shared_state->probe_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._probe_expr_ctxs[i]->clone(state, _shared_state->probe_expr_ctxs[i]));
}
_profile = p._pool->add(new RuntimeProfile("AggSinkLocalState"));
_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
_hash_table_memory_usage =
ADD_CHILD_COUNTER(profile(), "HashTable", TUnit::BYTES, "MemoryUsage");

View File

@ -28,14 +28,12 @@ OPERATOR_CODE_GENERATOR(AnalyticSinkOperator, StreamingOperator)
Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info));
_mem_tracker = std::make_unique<MemTracker>("ExchangeSinkLocalState:");
auto& p = _parent->cast<AnalyticSinkOperatorX>();
_dependency = (AnalyticDependency*)info.dependency;
_shared_state = (AnalyticSharedState*)_dependency->shared_state();
_shared_state->partition_by_column_idxs.resize(p._partition_by_eq_expr_ctxs.size());
_shared_state->ordey_by_column_idxs.resize(p._order_by_eq_expr_ctxs.size());
_profile = state->obj_pool()->add(new RuntimeProfile("AnalyticSinkLocalState"));
_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
_blocks_memory_usage = _profile->AddHighWaterMarkCounter("Blocks", TUnit::BYTES, "MemoryUsage");
_evaluation_timer = ADD_TIMER(profile(), "EvaluationTime");

View File

@ -127,7 +127,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
instances.emplace_back(channel->get_fragment_instance_id_str());
}
std::string title = "VDataStreamSender (dst_id={}, dst_fragments=[{}])";
_profile = p._pool->add(new RuntimeProfile(title));
SCOPED_TIMER(_profile->total_time_counter());
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());

View File

@ -44,14 +44,10 @@ ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase* paren
: PipelineXLocalState(state, parent), num_rows_skipped(0), is_ready(false) {}
Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
if (_init) {
return Status::OK();
}
RETURN_IF_ERROR(PipelineXLocalState::init(state, info));
stream_recvr = info.recvr;
RETURN_IF_ERROR(_parent->cast<ExchangeSourceOperatorX>()._vsort_exec_exprs.clone(
state, vsort_exec_exprs));
_init = true;
return Status::OK();
}

View File

@ -63,6 +63,8 @@ std::string OperatorBase::debug_string() const {
}
Status PipelineXSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(_parent->get_name()));
_mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
return Status::OK();
}

View File

@ -576,7 +576,6 @@ protected:
RuntimeState* _state;
vectorized::VExprContextSPtrs _conjuncts;
vectorized::VExprContextSPtrs _projections;
bool _init = false;
bool _closed = false;
vectorized::Block _origin_block;
};

View File

@ -51,12 +51,9 @@ bool ResultSinkOperator::can_write() {
}
Status ResultSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info));
auto& p = _parent->cast<ResultSinkOperatorX>();
auto fragment_instance_id = state->fragment_instance_id();
auto title = fmt::format("VDataBufferSender (dst_fragment_instance_id={:x}-{:x})",
fragment_instance_id.hi, fragment_instance_id.lo);
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(title));
// create sender
_sender = info.sender;
_output_vexpr_ctxs.resize(p._output_vexpr_ctxs.size());

View File

@ -59,7 +59,6 @@ private:
std::shared_ptr<BufferControlBlock> _sender;
std::shared_ptr<ResultWriter> _writer;
RuntimeProfile* _profile; // Allocated from _pool
};
class ResultSinkOperatorX final : public DataSinkOperatorX {

View File

@ -110,15 +110,9 @@ bool ScanLocalState::should_run_serial() const {
}
Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
if (_init) {
return Status::OK();
}
auto& p = _parent->cast<ScanOperatorX>();
set_scan_ranges(info.scan_ranges);
RETURN_IF_ERROR(PipelineXLocalState::init(state, info));
auto& p = _parent->cast<ScanOperatorX>();
set_scan_ranges(info.scan_ranges);
_common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
for (size_t i = 0; i < _common_expr_ctxs_push_down.size(); i++) {
RETURN_IF_ERROR(
@ -154,9 +148,7 @@ Status ScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
}
RETURN_IF_ERROR(status);
_init = true;
return Status::OK();
return status;
}
Status ScanLocalState::_normalize_conjuncts() {
@ -1221,21 +1213,17 @@ ScanOperatorX::ScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, const Des
bool ScanOperatorX::can_read(RuntimeState* state) {
auto& local_state = state->get_local_state(id())->cast<ScanLocalState>();
if (!local_state._init) {
if (local_state._eos || local_state._scanner_ctx->done()) {
// _eos: need eos
// _scanner_ctx->done(): need finish
// _scanner_ctx->no_schedule(): should schedule _scanner_ctx
return true;
} else {
if (local_state._eos || local_state._scanner_ctx->done()) {
// _eos: need eos
// _scanner_ctx->done(): need finish
// _scanner_ctx->no_schedule(): should schedule _scanner_ctx
return true;
} else {
if (local_state._scanner_ctx->get_num_running_scanners() == 0 &&
local_state._scanner_ctx->has_enough_space_in_blocks_queue()) {
local_state._scanner_ctx->reschedule_scanner_ctx();
}
return local_state.ready_to_read(); // there are some blocks to process
if (local_state._scanner_ctx->get_num_running_scanners() == 0 &&
local_state._scanner_ctx->has_enough_space_in_blocks_queue()) {
local_state._scanner_ctx->reschedule_scanner_ctx();
}
return local_state.ready_to_read(); // there are some blocks to process
}
}

View File

@ -29,12 +29,12 @@ namespace doris::pipeline {
OPERATOR_CODE_GENERATOR(SortSinkOperator, StreamingOperator)
Status SortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(PipelineXSinkLocalState::init(state, info));
auto& p = _parent->cast<SortSinkOperatorX>();
_dependency = (SortDependency*)info.dependency;
_shared_state = (SortSharedState*)_dependency->shared_state();
RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs));
_profile = p._pool->add(new RuntimeProfile("SortSinkLocalState"));
switch (p._algorithm) {
case SortAlgorithm::HEAP_SORT: {
_shared_state->sorter = vectorized::HeapSorter::create_unique(