[code](pipelineX) refine some pipelineX code (#28570)

This commit is contained in:
Mryange
2023-12-20 11:45:06 +08:00
committed by GitHub
parent 3e85797443
commit fe184e322a
14 changed files with 130 additions and 82 deletions

View File

@ -100,7 +100,7 @@ bool ExchangeSinkLocalState::transfer_large_data_by_brpc() const {
}
Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info));
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
_sender_id = info.sender_id;
@ -174,9 +174,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
id, p._dest_node_id, _sender_id, _state->be_number(), state->get_query_ctx());
register_channels(_sink_buffer.get());
_exchange_sink_dependency = AndDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
auto* _exchange_sink_dependency = _dependency;
_queue_dependency = ExchangeSinkQueueDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
@ -237,7 +235,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
}
Status ExchangeSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state));
RETURN_IF_ERROR(Base::open(state));
auto& p = _parent->cast<ExchangeSinkOperatorX>();
if (p._part_type == TPartitionType::HASH_PARTITIONED ||
p._part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
@ -522,8 +520,7 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status exec_status)
std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
PipelineXSinkLocalState<>::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", Sink Buffer: (_should_stop = {}, _busy_channels = {})",
_sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load());
return fmt::to_string(debug_string_buffer);
@ -536,6 +533,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_close_timer);
COUNTER_UPDATE(_wait_queue_timer, _queue_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
if (_broadcast_dependency) {
COUNTER_UPDATE(_wait_broadcast_buffer_timer, _broadcast_dependency->watcher_elapse_time());
}
@ -545,7 +543,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
}
_sink_buffer->update_profile(profile());
_sink_buffer->close();
return PipelineXSinkLocalState<>::close(state, exec_status);
return Base::close(state, exec_status);
}
} // namespace doris::pipeline

View File

@ -144,20 +144,25 @@ public:
// TODO(gabriel): blocked by memory
};
class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
class ExchangeSinkLocalState final : public PipelineXSinkLocalState<AndDependency> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
using Base = PipelineXSinkLocalState<AndDependency>;
public:
ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: PipelineXSinkLocalState<>(parent, state),
: Base(parent, state),
current_channel_idx(0),
only_local_exchange(false),
_serializer(this) {}
_serializer(this) {
_finish_dependency = std::make_shared<FinishDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx());
}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
Dependency* dependency() override { return _exchange_sink_dependency.get(); }
Dependency* finishdependency() override { return _finish_dependency.get(); }
Status serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers = 1);
void register_channels(pipeline::ExchangeSinkBuffer<ExchangeSinkLocalState>* buffer);
Status get_next_available_buffer(vectorized::BroadcastPBlockHolder** holder);
@ -231,11 +236,12 @@ private:
vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer;
std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency;
std::shared_ptr<AndDependency> _exchange_sink_dependency;
std::shared_ptr<BroadcastDependency> _broadcast_dependency;
std::vector<std::shared_ptr<LocalExchangeChannelDependency>> _local_channels_dependency;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
int _partition_count;
std::shared_ptr<Dependency> _finish_dependency;
};
class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> {

View File

@ -41,12 +41,11 @@ bool ExchangeSourceOperator::is_pending_finish() const {
}
ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<>(state, parent), num_rows_skipped(0), is_ready(false) {}
: Base(state, parent), num_rows_skipped(0), is_ready(false) {}
std::string ExchangeLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
PipelineXLocalState<>::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", Queues: (");
const auto& queues = stream_recvr->sender_queues();
for (size_t i = 0; i < queues.size(); i++) {
@ -68,15 +67,14 @@ std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const {
}
Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<ExchangeSourceOperatorX>();
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(),
profile(), p.is_merging(), p.sub_plan_query_statistics_recvr());
source_dependency = AndDependency::create_shared(_parent->operator_id(), _parent->node_id(),
state->get_query_ctx());
auto* source_dependency = _dependency;
const auto& queues = stream_recvr->sender_queues();
deps.resize(queues.size());
metrics.resize(queues.size());
@ -101,7 +99,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
Status ExchangeLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(PipelineXLocalState<>::open(state));
RETURN_IF_ERROR(Base::open(state));
return Status::OK();
}
@ -215,7 +213,7 @@ Status ExchangeLocalState::close(RuntimeState* state) {
if (_parent->cast<ExchangeSourceOperatorX>()._is_merging) {
vsort_exec_exprs.close(state);
}
return PipelineXLocalState<>::close(state);
return Base::close(state);
}
Status ExchangeSourceOperatorX::close(RuntimeState* state) {

View File

@ -71,21 +71,22 @@ private:
};
class ExchangeSourceOperatorX;
class ExchangeLocalState final : public PipelineXLocalState<> {
class ExchangeLocalState final : public PipelineXLocalState<AndDependency> {
ENABLE_FACTORY_CREATOR(ExchangeLocalState);
public:
using Base = PipelineXLocalState<AndDependency>;
ExchangeLocalState(RuntimeState* state, OperatorXBase* parent);
Status init(RuntimeState* state, LocalStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
Dependency* dependency() override { return source_dependency.get(); }
std::string debug_string(int indentation_level) const override;
std::shared_ptr<doris::vectorized::VDataStreamRecvr> stream_recvr;
doris::vectorized::VSortExecExprs vsort_exec_exprs;
int64_t num_rows_skipped;
bool is_ready;
std::shared_ptr<AndDependency> source_dependency;
std::vector<std::shared_ptr<ExchangeDataDependency>> deps;
std::vector<RuntimeProfile::Counter*> metrics;

View File

@ -128,9 +128,13 @@ RuntimeProfile* MultiCastDataStreamerSourceOperator::get_runtime_profile() const
MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState* state,
OperatorXBase* parent)
: Base(state, parent),
vectorized::RuntimeFilterConsumer(
static_cast<Parent*>(parent)->dest_id_from_sink(), parent->runtime_filter_descs(),
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {};
vectorized::RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(),
parent->runtime_filter_descs(),
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
};
Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));

View File

@ -120,8 +120,11 @@ public:
friend class MultiCastDataStreamerSourceOperatorX;
RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }
private:
vectorized::VExprContextSPtrs _output_expr_contexts;
std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
};
class MultiCastDataStreamerSourceOperatorX final

View File

@ -100,7 +100,14 @@ std::string ScanOperator::debug_string() const {
template <typename Derived>
ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalStateBase(state, parent) {}
: ScanLocalStateBase(state, parent) {
_finish_dependency = std::make_shared<FinishDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx());
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
}
template <typename Derived>
bool ScanLocalState<Derived>::ready_to_read() {
@ -1311,6 +1318,9 @@ Status ScanLocalState<Derived>::_init_profile() {
_max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT);
_wait_for_finish_dependency_timer =
ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency");
return Status::OK();
}
@ -1442,7 +1452,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
_scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this), state);
}
COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
return PipelineXLocalState<>::close(state);
}

View File

@ -171,6 +171,8 @@ protected:
RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr;
// time of prefilter input block from scanner
RuntimeProfile::Counter* _wait_for_eos_timer = nullptr;
RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
};
template <typename LocalStateType>
@ -211,6 +213,9 @@ class ScanLocalState : public ScanLocalStateBase {
Dependency* dependency() override { return _scan_dependency.get(); }
RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); };
Dependency* finishdependency() override { return _finish_dependency.get(); }
protected:
template <typename LocalStateType>
friend class ScanOperatorX;
@ -405,6 +410,10 @@ protected:
std::atomic<bool> _eos = false;
std::mutex _block_lock;
std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
std::shared_ptr<Dependency> _finish_dependency;
};
template <typename LocalStateType>

View File

@ -308,25 +308,14 @@ Status OperatorX<LocalStateType>::setup_local_state(RuntimeState* state, LocalSt
PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent,
RuntimeState* state)
: _parent(parent),
_state(state),
_finish_dependency(new FinishDependency(parent->operator_id(), parent->node_id(),
parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx())) {}
: _parent(parent), _state(state) {}
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent)
: _num_rows_returned(0),
_rows_returned_counter(nullptr),
_peak_memory_usage_counter(nullptr),
_parent(parent),
_state(state),
_finish_dependency(new FinishDependency(parent->operator_id(), parent->node_id(),
parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx())) {
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
state->get_query_ctx());
}
_state(state) {}
template <typename DependencyType>
Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalStateInfo& info) {
@ -334,22 +323,30 @@ Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
_runtime_profile->set_metadata(_parent->node_id());
_runtime_profile->set_is_sink(false);
info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
_wait_for_finish_dependency_timer =
ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency");
constexpr auto is_fake_shared =
std::is_same_v<typename DependencyType::SharedState, FakeSharedState>;
_dependency = (DependencyType*)info.dependency.get();
if constexpr (!std::is_same_v<FakeDependency, DependencyType>) {
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
auto& deps = info.upstream_dependencies;
if constexpr (std::is_same_v<LocalExchangeSourceDependency, DependencyType>) {
_dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first);
} else {
_shared_state =
(typename DependencyType::SharedState*)_dependency->shared_state().get();
_shared_state->ref();
_shared_state->source_dep = _dependency;
_shared_state->sink_dep = deps.front().get();
} else if constexpr (!is_fake_shared) {
_dependency->set_shared_state(deps.front()->shared_state());
_shared_state =
(typename DependencyType::SharedState*)_dependency->shared_state().get();
_shared_state->ref();
_shared_state->source_dep = _dependency;
_shared_state->sink_dep = deps.front().get();
}
_shared_state = (typename DependencyType::SharedState*)_dependency->shared_state().get();
_shared_state->ref();
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
_shared_state->source_dep = _dependency;
_shared_state->sink_dep = deps.front().get();
}
_conjuncts.resize(_parent->_conjuncts.size());
@ -386,7 +383,6 @@ Status PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
}
COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
if (_rows_returned_counter != nullptr) {
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
@ -405,6 +401,8 @@ Status PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
_profile->set_metadata(_parent->node_id());
_profile->set_is_sink(true);
_wait_for_finish_dependency_timer = ADD_TIMER(_profile, "PendingFinishDependency");
constexpr auto is_fake_shared =
std::is_same_v<typename DependencyType::SharedState, FakeSharedState>;
if constexpr (!std::is_same_v<FakeDependency, DependencyType>) {
auto& deps = info.dependencys;
_dependency = (DependencyType*)deps.front().get();
@ -412,12 +410,18 @@ Status PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
_dependency = info.le_state_map[_parent->dests_id().front()].second.get();
}
if (_dependency) {
_shared_state =
(typename DependencyType::SharedState*)_dependency->shared_state().get();
if constexpr (!is_fake_shared) {
_shared_state =
(typename DependencyType::SharedState*)_dependency->shared_state().get();
}
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
}
_shared_state->ref();
if constexpr (!is_fake_shared) {
_shared_state->ref();
}
} else {
auto& deps = info.dependencys;
deps.front() = std::make_shared<FakeDependency>(0, 0, state->get_query_ctx());
@ -446,7 +450,6 @@ Status PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Statu
if constexpr (!std::is_same_v<DependencyType, FakeDependency>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
}
COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
if (_peak_memory_usage_counter) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
}
@ -536,6 +539,7 @@ Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_s
return Status::OK();
}
COUNTER_SET(_wait_for_dependency_timer, _async_writer_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
// if the init failed, the _writer may be nullptr. so here need check
if (_writer) {
if (_writer->need_normal_close()) {
@ -630,6 +634,7 @@ template class PipelineXSinkLocalState<MultiCastSinkDependency>;
template class PipelineXSinkLocalState<SetSinkDependency>;
template class PipelineXSinkLocalState<SetProbeSinkDependency>;
template class PipelineXSinkLocalState<LocalExchangeSinkDependency>;
template class PipelineXSinkLocalState<AndDependency>;
template class PipelineXLocalState<HashJoinProbeDependency>;
template class PipelineXLocalState<SortSourceDependency>;
@ -642,6 +647,7 @@ template class PipelineXLocalState<MultiCastSourceDependency>;
template class PipelineXLocalState<PartitionSortSourceDependency>;
template class PipelineXLocalState<SetSourceDependency>;
template class PipelineXLocalState<LocalExchangeSourceDependency>;
template class PipelineXLocalState<AndDependency>;
template class AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, JdbcTableSinkOperatorX>;

View File

@ -102,8 +102,10 @@ public:
virtual Dependency* dependency() { return nullptr; }
Dependency* finishdependency() { return _finish_dependency.get(); }
RuntimeFilterDependency* filterdependency() { return _filter_dependency.get(); }
// override in Scan
virtual Dependency* finishdependency() { return nullptr; }
// override in Scan MultiCastSink
virtual RuntimeFilterDependency* filterdependency() { return nullptr; }
protected:
friend class OperatorXBase;
@ -121,7 +123,6 @@ protected:
RuntimeProfile::Counter* _blocks_returned_counter = nullptr;
RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
RuntimeProfile::Counter* _projection_timer = nullptr;
RuntimeProfile::Counter* _exec_timer = nullptr;
// Account for peak memory used by this node
@ -135,8 +136,6 @@ protected:
vectorized::VExprContextSPtrs _projections;
bool _closed = false;
vectorized::Block _origin_block;
std::shared_ptr<Dependency> _finish_dependency;
std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
};
class OperatorXBase : public OperatorBase {
@ -397,7 +396,8 @@ public:
RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
virtual Dependency* dependency() { return nullptr; }
Dependency* finishdependency() { return _finish_dependency.get(); }
// override in exchange sink , AsyncWriterSink
virtual Dependency* finishdependency() { return nullptr; }
protected:
DataSinkOperatorXBase* _parent = nullptr;
@ -424,7 +424,6 @@ protected:
RuntimeProfile::Counter* _exec_timer = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
std::shared_ptr<Dependency> _finish_dependency;
};
class DataSinkOperatorXBase : public OperatorBase {
@ -659,7 +658,11 @@ class AsyncWriterSink : public PipelineXSinkLocalState<FakeDependency> {
public:
using Base = PipelineXSinkLocalState<FakeDependency>;
AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state), _async_writer_dependency(nullptr) {}
: Base(parent, state), _async_writer_dependency(nullptr) {
_finish_dependency = std::make_shared<FinishDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx());
}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
@ -672,11 +675,15 @@ public:
Status try_close(RuntimeState* state, Status exec_status) override;
Dependency* finishdependency() override { return _finish_dependency.get(); }
protected:
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
std::unique_ptr<Writer> _writer;
std::shared_ptr<AsyncWriterDependency> _async_writer_dependency;
std::shared_ptr<Dependency> _finish_dependency;
};
} // namespace doris::pipeline

View File

@ -601,8 +601,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
auto prepare_and_set_parent_profile = [&](PipelineXTask* task, size_t pip_idx) {
DCHECK(pipeline_id_to_profile[pip_idx]);
RETURN_IF_ERROR(task->prepare(get_task_runtime_state(task->task_id()), local_params,
request.fragment.output_sink));
RETURN_IF_ERROR(task->prepare(local_params, request.fragment.output_sink));
return Status::OK();
};
@ -828,7 +827,7 @@ Status PipelineXFragmentContext::_add_local_exchange(
OperatorXPtr source_op;
source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id));
RETURN_IF_ERROR(source_op->init(exchange_type));
if (operator_xs.size() > 0) {
if (!operator_xs.empty()) {
RETURN_IF_ERROR(operator_xs.front()->set_child(source_op));
}
operator_xs.insert(operator_xs.begin(), source_op);
@ -878,6 +877,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
const DescriptorTbl& descs, OperatorXPtr& op,
PipelinePtr& cur_pipe, int parent_idx,
int child_idx) {
// We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
// Therefore, here we need to use a stack-like structure.
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
std::stringstream error_msg;
switch (tnode.node_type) {

View File

@ -56,7 +56,7 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeSta
_source(_operators.front()),
_root(_operators.back()),
_sink(pipeline->sink_shared_pointer()),
_le_state_map(le_state_map),
_le_state_map(std::move(le_state_map)),
_task_idx(task_idx),
_execution_dep(state->get_query_ctx()->get_execution_dependency()) {
_pipeline_task_watcher.start();
@ -67,15 +67,13 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeSta
pipeline->incr_created_tasks();
}
Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams& local_params,
const TDataSink& tsink) {
Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink) {
DCHECK(_sink);
DCHECK(_cur_state == PipelineTaskState::NOT_READY) << get_state_name(_cur_state);
_init_profile();
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_prepare_timer);
DCHECK_EQ(state, _state);
{
// set sink local state
@ -85,20 +83,20 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams
get_downstream_dependency(),
_le_state_map,
tsink};
RETURN_IF_ERROR(_sink->setup_local_state(state, info));
RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
}
std::vector<TScanRangeParams> no_scan_ranges;
auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
_operators.front()->node_id(), no_scan_ranges);
auto* parent_profile = state->get_sink_local_state(_sink->operator_id())->profile();
auto* parent_profile = _state->get_sink_local_state(_sink->operator_id())->profile();
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& op = _operators[op_idx];
auto& deps = get_upstream_dependency(op->operator_id());
LocalStateInfo info {parent_profile, scan_ranges, deps,
_le_state_map, _task_idx, _source_dependency[op->operator_id()]};
RETURN_IF_ERROR(op->setup_local_state(state, info));
parent_profile = state->get_local_state(op->operator_id())->profile();
RETURN_IF_ERROR(op->setup_local_state(_state, info));
parent_profile = _state->get_local_state(op->operator_id())->profile();
}
_block = doris::vectorized::Block::create_unique();
@ -120,7 +118,9 @@ Status PipelineXTask::_extract_dependencies() {
DCHECK(dep != nullptr);
_read_dependencies.push_back(dep);
auto* fin_dep = local_state->finishdependency();
_finish_dependencies.push_back(fin_dep);
if (fin_dep) {
_finish_dependencies.push_back(fin_dep);
}
}
{
auto result = _state->get_sink_local_state_result(_sink->operator_id());
@ -132,7 +132,9 @@ Status PipelineXTask::_extract_dependencies() {
DCHECK(dep != nullptr);
_write_dependencies = dep;
auto* fin_dep = local_state->finishdependency();
_finish_dependencies.push_back(fin_dep);
if (fin_dep) {
_finish_dependencies.push_back(fin_dep);
}
}
{
auto result = _state->get_local_state_result(_source->operator_id());
@ -193,6 +195,7 @@ Status PipelineXTask::_open() {
for (size_t i = 0; i < 2; i++) {
auto st = local_state->open(_state);
if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
DCHECK(_filter_dependency);
_blocked_dep = _filter_dependency->is_blocked_by(this);
if (_blocked_dep) {
set_state(PipelineTaskState::BLOCKED_FOR_RF);
@ -377,9 +380,11 @@ std::string PipelineXTask::debug_string() {
fmt::format_to(debug_string_buffer, "{}. {}\n", i, _write_dependencies->debug_string(1));
i++;
fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n");
fmt::format_to(debug_string_buffer, "{}. {}\n", i, _filter_dependency->debug_string(1));
i++;
if (_filter_dependency) {
fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n");
fmt::format_to(debug_string_buffer, "{}. {}\n", i, _filter_dependency->debug_string(1));
i++;
}
fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) {

View File

@ -62,8 +62,7 @@ public:
return Status::InternalError("Should not reach here!");
}
Status prepare(RuntimeState* state, const TPipelineInstanceParams& local_params,
const TDataSink& tsink);
Status prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink);
Status execute(bool* eos) override;

View File

@ -220,6 +220,7 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
DCHECK(req.runtime_state != nullptr);
if (req.query_statistics) {
// use to report 'insert into select'
TQueryStatistics queryStatistics;
DCHECK(req.query_statistics->collect_dml_statistics());
req.query_statistics->to_thrift(&queryStatistics);