[pipelineX](refactor) Rebuild relationship between dep and operator (#31487)

This commit is contained in:
Gabriel
2024-02-29 09:58:27 +08:00
committed by yiguolei
parent 4a5283b466
commit 2b7fa9d6bb
28 changed files with 264 additions and 315 deletions

View File

@ -76,7 +76,6 @@ private:
if (need_more_input) {
_dependency->block();
_dependency->set_ready_to_write();
_shared_state->sink_dep->set_ready();
} else {
_dependency->set_block_to_write();
_dependency->set_ready();

View File

@ -67,7 +67,7 @@ Status EsScanLocalState::_process_conjuncts() {
Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) {
if (_scan_ranges.empty()) {
_eos = true;
_dependency->set_ready();
_scan_dependency->set_ready();
return Status::OK();
}

View File

@ -170,12 +170,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
id, p._dest_node_id, _sender_id, _state->be_number(), state);
register_channels(_sink_buffer.get());
auto* _exchange_sink_dependency = _dependency;
_queue_dependency =
Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"ExchangeSinkQueueDependency", true, state->get_query_ctx());
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
_exchange_sink_dependency->add_child(_queue_dependency);
if ((p._part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) &&
!only_local_exchange) {
_broadcast_dependency =
@ -186,7 +184,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
for (int i = 0; i < config::num_broadcast_buffer; ++i) {
_broadcast_pb_blocks->push(vectorized::BroadcastPBlockHolder::create_shared());
}
_exchange_sink_dependency->add_child(_broadcast_dependency);
_wait_broadcast_buffer_timer =
ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name);
@ -194,19 +191,15 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
size_t dep_id = 0;
_local_channels_dependency.resize(local_size);
_wait_channel_timer.resize(local_size);
auto deps_for_channels = AndDependency::create_shared(
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
for (auto channel : channels) {
if (channel->is_local()) {
_local_channels_dependency[dep_id] = channel->get_local_channel_dependency();
DCHECK(_local_channels_dependency[dep_id] != nullptr);
deps_for_channels->add_child(_local_channels_dependency[dep_id]);
_wait_channel_timer[dep_id] = ADD_CHILD_TIMER(
_profile, fmt::format("WaitForLocalExchangeBuffer{}", dep_id), timer_name);
dep_id++;
}
}
_exchange_sink_dependency->add_child(deps_for_channels);
}
if (p._part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = channels.size();

View File

@ -64,9 +64,9 @@ private:
int _mult_cast_id = -1;
};
class ExchangeSinkLocalState final : public PipelineXSinkLocalState<AndSharedState> {
class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
using Base = PipelineXSinkLocalState<AndSharedState>;
using Base = PipelineXSinkLocalState<>;
public:
ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
@ -79,6 +79,16 @@ public:
state->get_query_ctx());
}
std::vector<Dependency*> dependencies() const override {
std::vector<Dependency*> dep_vec;
dep_vec.push_back(_queue_dependency.get());
if (_broadcast_dependency) {
dep_vec.push_back(_broadcast_dependency.get());
}
std::for_each(_local_channels_dependency.begin(), _local_channels_dependency.end(),
[&](std::shared_ptr<Dependency> dep) { dep_vec.push_back(dep.get()); });
return dep_vec;
}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
@ -154,8 +164,8 @@ private:
vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer;
std::shared_ptr<Dependency> _queue_dependency;
std::shared_ptr<Dependency> _broadcast_dependency;
std::shared_ptr<Dependency> _queue_dependency = nullptr;
std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
/**
* We use this to control the execution for local exchange.

View File

@ -74,7 +74,6 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(),
profile(), p.is_merging());
auto* source_dependency = _dependency;
const auto& queues = stream_recvr->sender_queues();
deps.resize(queues.size());
metrics.resize(queues.size());
@ -82,10 +81,8 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
deps[i] = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"SHUFFLE_DATA_DEPENDENCY", state->get_query_ctx());
queues[i]->set_dependency(deps[i]);
source_dependency->add_child(deps[i]);
}
static const std::string timer_name =
"WaitForDependency[" + source_dependency->name() + "]Time";
static const std::string timer_name = "WaitForDependencyTime";
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, timer_name, 1);
for (size_t i = 0; i < queues.size(); i++) {
metrics[i] = ADD_CHILD_TIMER_WITH_LEVEL(_runtime_profile, fmt::format("WaitForData{}", i),

View File

@ -51,17 +51,24 @@ public:
};
class ExchangeSourceOperatorX;
class ExchangeLocalState final : public PipelineXLocalState<AndSharedState> {
class ExchangeLocalState final : public PipelineXLocalState<> {
ENABLE_FACTORY_CREATOR(ExchangeLocalState);
public:
using Base = PipelineXLocalState<AndSharedState>;
using Base = PipelineXLocalState<>;
ExchangeLocalState(RuntimeState* state, OperatorXBase* parent);
Status init(RuntimeState* state, LocalStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
std::string debug_string(int indentation_level) const override;
std::vector<Dependency*> dependencies() const override {
std::vector<Dependency*> dep_vec;
std::for_each(deps.begin(), deps.end(),
[&](std::shared_ptr<Dependency> dep) { dep_vec.push_back(dep.get()); });
return dep_vec;
}
std::shared_ptr<doris::vectorized::VDataStreamRecvr> stream_recvr;
doris::vectorized::VSortExecExprs vsort_exec_exprs;
int64_t num_rows_skipped;

View File

@ -33,7 +33,7 @@ namespace doris::pipeline {
Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) {
if (_scan_ranges.empty()) {
_eos = true;
_dependency->set_ready();
_scan_dependency->set_ready();
return Status::OK();
}

View File

@ -92,11 +92,16 @@ public:
const RowDescriptor& row_desc() const override { return _row_desc; }
std::shared_ptr<MultiCastSharedState> create_multi_cast_data_streamer() {
auto multi_cast_data_streamer =
std::shared_ptr<BasicSharedState> create_shared_state() const override {
std::shared_ptr<BasicSharedState> ss =
std::make_shared<MultiCastSharedState>(_row_desc, _pool, _cast_sender_count);
return multi_cast_data_streamer;
ss->id = operator_id();
for (auto& dest : dests_id()) {
ss->related_op_ids.insert(dest);
}
return ss;
}
const TMultiCastDataStreamSink& sink_node() { return _sink; }
private:

View File

@ -225,7 +225,7 @@ bool OlapScanLocalState::_storage_no_merge() {
Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* scanners) {
if (_scan_ranges.empty()) {
_eos = true;
_dependency->set_ready();
_scan_dependency->set_ready();
return Status::OK();
}
SCOPED_TIMER(_scanner_init_timer);
@ -486,7 +486,7 @@ Status OlapScanLocalState::_build_key_ranges_and_filters() {
}
if (eos) {
_eos = true;
_dependency->set_ready();
_scan_dependency->set_ready();
}
for (auto& iter : _colname_to_value_range) {

View File

@ -111,7 +111,12 @@ bool ScanLocalState<Derived>::should_run_serial() const {
template <typename Derived>
Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<EmptySharedState>::init(state, info));
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
_scan_dependency =
Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
_parent->get_name() + "_DEPENDENCY", state->get_query_ctx());
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" + _scan_dependency->name() + "]Time", 1);
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<typename Derived::Parent>();
@ -252,7 +257,7 @@ Status ScanLocalState<Derived>::_normalize_conjuncts() {
[&](auto&& range) {
if (range.is_empty_value_range()) {
_eos = true;
_dependency->set_ready();
_scan_dependency->set_ready();
}
},
it.second.second);
@ -543,8 +548,7 @@ template <typename Derived>
std::string ScanLocalState<Derived>::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}, _eos = {}",
PipelineXLocalState<EmptySharedState>::debug_string(indentation_level),
_eos.load());
PipelineXLocalState<>::debug_string(indentation_level), _eos.load());
if (_scanner_ctx) {
fmt::format_to(debug_string_buffer, "");
fmt::format_to(debug_string_buffer, ", Scanner Context: {}", _scanner_ctx->debug_string());
@ -587,7 +591,7 @@ Status ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr,
if (constant_val == nullptr || !*reinterpret_cast<bool*>(constant_val)) {
*pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE;
_eos = true;
_dependency->set_ready();
_scan_dependency->set_ready();
}
} else if (const vectorized::ColumnVector<vectorized::UInt8>* bool_column =
check_and_get_column<vectorized::ColumnVector<vectorized::UInt8>>(
@ -605,7 +609,7 @@ Status ScanLocalState<Derived>::_eval_const_conjuncts(vectorized::VExpr* vexpr,
if (constant_val == nullptr || !*reinterpret_cast<bool*>(constant_val)) {
*pdt = vectorized::VScanNode::PushDownType::ACCEPTABLE;
_eos = true;
_dependency->set_ready();
_scan_dependency->set_ready();
}
} else {
LOG(WARNING) << "Constant predicate in scan node should return a bool column with "
@ -803,7 +807,7 @@ Status ScanLocalState<Derived>::_normalize_not_in_and_not_eq_predicate(
auto fn_name = std::string("");
if (!is_fixed_range && state->null_in_set) {
_eos = true;
_dependency->set_ready();
_scan_dependency->set_ready();
}
while (iter->has_next()) {
// column not in (nullptr) is always true
@ -1201,7 +1205,7 @@ Status ScanLocalState<Derived>::_prepare_scanners() {
}
if (scanners.empty()) {
_eos = true;
_dependency->set_ready();
_scan_dependency->set_ready();
} else {
for (auto& scanner : scanners) {
scanner->set_query_statistics(_query_statistics.get());
@ -1218,7 +1222,7 @@ Status ScanLocalState<Derived>::_start_scanners(
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = PipXScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(),
state()->scan_queue_mem_limit(), _dependency->shared_from_this());
state()->scan_queue_mem_limit(), _scan_dependency);
return Status::OK();
}
@ -1404,7 +1408,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
COUNTER_UPDATE(exec_time_counter(), _dependency->watcher_elapse_time());
COUNTER_UPDATE(exec_time_counter(), _scan_dependency->watcher_elapse_time());
COUNTER_UPDATE(exec_time_counter(), _filter_dependency->watcher_elapse_time());
SCOPED_TIMER(_close_timer);
@ -1412,10 +1416,10 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
if (_scanner_ctx) {
_scanner_ctx->stop_scanners(state);
}
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());
return PipelineXLocalState<EmptySharedState>::close(state);
return PipelineXLocalState<>::close(state);
}
template <typename LocalStateType>

View File

@ -57,11 +57,10 @@ public:
std::string debug_string() const override;
};
class ScanLocalStateBase : public PipelineXLocalState<EmptySharedState>,
public vectorized::RuntimeFilterConsumer {
class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::RuntimeFilterConsumer {
public:
ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<EmptySharedState>(state, parent),
: PipelineXLocalState<>(state, parent),
vectorized::RuntimeFilterConsumer(parent->node_id(), parent->runtime_filter_descs(),
parent->row_descriptor(), _conjuncts) {}
virtual ~ScanLocalStateBase() = default;
@ -97,6 +96,8 @@ protected:
std::atomic<bool> _opened {false};
DependencySPtr _scan_dependency = nullptr;
std::shared_ptr<RuntimeProfile> _scanner_profile;
RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
RuntimeProfile::Counter* _scanner_ctx_sched_time = nullptr;
@ -166,6 +167,8 @@ class ScanLocalState : public ScanLocalStateBase {
RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); };
std::vector<Dependency*> dependencies() const override { return {_scan_dependency.get()}; }
protected:
template <typename LocalStateType>
friend class ScanOperatorX;

View File

@ -131,6 +131,8 @@ public:
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
std::shared_ptr<BasicSharedState> create_shared_state() const override { return nullptr; }
private:
void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state);
Status _extract_probe_column(SetProbeSinkLocalState<is_intersect>& local_state,

View File

@ -198,10 +198,8 @@ Status SetSinkOperatorX<is_intersect>::init(const TPlanNode& tnode, RuntimeState
// Create result_expr_ctx_lists_ from thrift exprs.
if (tnode.node_type == TPlanNodeType::type::INTERSECT_NODE) {
result_texpr_lists = &(tnode.intersect_node.result_expr_lists);
_child_quantity = tnode.intersect_node.result_expr_lists.size();
} else if (tnode.node_type == TPlanNodeType::type::EXCEPT_NODE) {
result_texpr_lists = &(tnode.except_node.result_expr_lists);
_child_quantity = tnode.except_node.result_expr_lists.size();
} else {
return Status::NotSupported("Not Implemented, Check The Operation Node.");
}

View File

@ -99,6 +99,9 @@ public:
const DescriptorTbl& descs)
: Base(sink_id, tnode.node_id, tnode.node_id),
_cur_child_id(child_id),
_child_quantity(tnode.node_type == TPlanNodeType::type::INTERSECT_NODE
? tnode.intersect_node.result_expr_lists.size()
: tnode.except_node.result_expr_lists.size()),
_is_colocate(is_intersect ? tnode.intersect_node.is_colocate
: tnode.except_node.is_colocate),
_partition_exprs(is_intersect ? tnode.intersect_node.result_expr_lists[child_id]
@ -131,7 +134,7 @@ private:
vectorized::Block& block, vectorized::ColumnRawPtrs& raw_ptrs);
const int _cur_child_id;
int _child_quantity;
const int _child_quantity;
// every child has its result expr list
vectorized::VExprContextSPtrs _child_exprs;
const bool _is_colocate;

View File

@ -54,11 +54,8 @@ Status SetSourceLocalState<is_intersect>::init(RuntimeState* state, LocalStateIn
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& deps = info.upstream_dependencies;
_shared_state->probe_finished_children_dependency.resize(deps.size(), nullptr);
for (auto& dep : deps) {
dep->set_shared_state(_dependency->shared_state());
}
_shared_state->probe_finished_children_dependency.resize(
_parent->cast<SetSourceOperatorX<is_intersect>>()._child_quantity, nullptr);
return Status::OK();
}

View File

@ -85,7 +85,10 @@ public:
SetSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs)
: Base(pool, tnode, operator_id, descs) {};
: Base(pool, tnode, operator_id, descs),
_child_quantity(tnode.node_type == TPlanNodeType::type::INTERSECT_NODE
? tnode.intersect_node.result_expr_lists.size()
: tnode.except_node.result_expr_lists.size()) {};
~SetSourceOperatorX() override = default;
[[nodiscard]] bool is_source() const override { return true; }
@ -105,6 +108,7 @@ private:
void _add_result_columns(SetSourceLocalState<is_intersect>& local_state,
vectorized::RowRefListWithFlags& value, int& block_size);
const int _child_quantity;
};
} // namespace pipeline

View File

@ -111,6 +111,19 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
std::shared_ptr<BasicSharedState> create_shared_state() const override {
if (_cur_child_id > 0) {
return nullptr;
} else {
std::shared_ptr<BasicSharedState> ss = std::make_shared<UnionSharedState>(_child_size);
ss->id = operator_id();
for (auto& dest : dests_id()) {
ss->related_op_ids.insert(dest);
}
return ss;
}
}
private:
int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; }

View File

@ -111,15 +111,18 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<Parent>();
int child_count = p.get_child_count();
if (child_count != 0) {
auto& deps = info.upstream_dependencies;
for (auto& dep : deps) {
dep->set_shared_state(_dependency->shared_state());
}
if (p.get_child_count() != 0) {
((UnionSharedState*)_dependency->shared_state())
->data_queue.set_source_dependency(_shared_state->source_deps.front());
} else {
_only_const_dependency = Dependency::create_shared(
_parent->operator_id(), _parent->node_id(), _parent->get_name() + "_DEPENDENCY",
state->get_query_ctx());
_dependency = _only_const_dependency.get();
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
}
((UnionSharedState*)_dependency->shared_state())
->data_queue.set_source_dependency(info.dependency);
// Const exprs materialized by this node. These exprs don't refer to any children.
// Only materialized by the first fragment instance to avoid duplication.
if (state->per_fragment_instance_idx() == 0) {
@ -138,7 +141,8 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(clone_expr_list(_const_expr_list, other_expr_list));
}
}
if (child_count == 0) {
if (p.get_child_count() == 0) {
_dependency->set_ready();
}
return Status::OK();
@ -147,9 +151,11 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
std::string UnionSourceLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));
fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, has_data = {})",
_shared_state->data_queue.is_all_finish(),
_shared_state->data_queue.remaining_has_data());
if (_shared_state) {
fmt::format_to(debug_string_buffer, ", data_queue: (is_all_finish = {}, has_data = {})",
_shared_state->data_queue.is_all_finish(),
_shared_state->data_queue.remaining_has_data());
}
return fmt::to_string(debug_string_buffer);
}
@ -161,7 +167,7 @@ Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* b
RETURN_IF_ERROR(get_next_const(state, block));
}
local_state._need_read_for_const_expr = has_more_const(state);
} else {
} else if (_child_size != 0) {
std::unique_ptr<vectorized::Block> output_block = vectorized::Block::create_unique();
int child_idx = 0;
RETURN_IF_ERROR(local_state._shared_state->data_queue.get_block_from_queue(&output_block,

View File

@ -87,6 +87,10 @@ private:
bool _need_read_for_const_expr {true};
int _const_expr_list_idx {0};
std::vector<vectorized::VExprContextSPtrs> _const_expr_lists;
// If this operator has no children, there is no shared state which owns dependency. So we
// use this local state to hold this dependency.
DependencySPtr _only_const_dependency = nullptr;
};
class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {

View File

@ -29,6 +29,22 @@
namespace doris::pipeline {
Dependency* BasicSharedState::create_source_dependency(int operator_id, int node_id,
std::string name, QueryContext* ctx) {
source_deps.push_back(
std::make_shared<Dependency>(operator_id, node_id, name + "_DEPENDENCY", ctx));
source_deps.back()->set_shared_state(this);
return source_deps.back().get();
}
Dependency* BasicSharedState::create_sink_dependency(int dest_id, int node_id, std::string name,
QueryContext* ctx) {
sink_deps.push_back(
std::make_shared<Dependency>(dest_id, node_id, name + "_DEPENDENCY", true, ctx));
sink_deps.back()->set_shared_state(this);
return sink_deps.back().get();
}
void Dependency::_add_block_task(PipelineXTask* task) {
DCHECK(_blocked_task.empty() || _blocked_task[_blocked_task.size() - 1] != task)
<< "Duplicate task: " << task->debug_string();
@ -103,17 +119,6 @@ std::string RuntimeFilterDependency::debug_string(int indentation_level) {
return fmt::to_string(debug_string_buffer);
}
std::string AndDependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}{}: id={}, children=[",
std::string(indentation_level * 2, ' '), _name, _node_id);
for (auto& child : _children) {
fmt::format_to(debug_string_buffer, "{}, \n", child->debug_string(indentation_level = 1));
}
fmt::format_to(debug_string_buffer, "{}]", std::string(indentation_level * 2, ' '));
return fmt::to_string(debug_string_buffer);
}
bool RuntimeFilterTimer::has_ready() {
std::unique_lock<std::mutex> lc(_lock);
return _is_ready;
@ -193,7 +198,7 @@ void LocalExchangeSharedState::sub_running_sink_operators() {
}
LocalExchangeSharedState::LocalExchangeSharedState(int num_instances) {
source_dependencies.resize(num_instances, nullptr);
source_deps.resize(num_instances, nullptr);
mem_trackers.resize(num_instances, nullptr);
}

View File

@ -70,9 +70,18 @@ struct BasicSharedState {
<< " and expect type is" << typeid(TARGET).name();
return reinterpret_cast<const TARGET*>(this);
}
DependencySPtr source_dep = nullptr;
DependencySPtr sink_dep = nullptr;
std::vector<DependencySPtr> source_deps;
std::vector<DependencySPtr> sink_deps;
int id = 0;
std::set<int> related_op_ids;
virtual ~BasicSharedState() = default;
Dependency* create_source_dependency(int operator_id, int node_id, std::string name,
QueryContext* ctx);
Dependency* create_sink_dependency(int dest_id, int node_id, std::string name,
QueryContext* ctx);
};
class Dependency : public std::enable_shared_from_this<Dependency> {
@ -94,22 +103,15 @@ public:
_query_ctx(query_ctx) {}
virtual ~Dependency() = default;
bool is_write_dependency() const { return _is_write_dependency; }
[[nodiscard]] int id() const { return _id; }
[[nodiscard]] virtual std::string name() const { return _name; }
virtual void add_child(std::shared_ptr<Dependency> child) {
LOG(FATAL) << "Only AndDependency could add child, it is wrong usage";
}
BasicSharedState* shared_state() { return _shared_state; }
void set_shared_state(BasicSharedState* shared_state) { _shared_state = shared_state; }
virtual std::string debug_string(int indentation_level = 0);
// Start the watcher. We use it to count how long this dependency block the current pipeline task.
void start_watcher() {
for (auto& child : _children) {
child->start_watcher();
}
_watcher.start();
}
void start_watcher() { _watcher.start(); }
[[nodiscard]] int64_t watcher_elapse_time() { return _watcher.elapsed_time(); }
// Which dependency current pipeline task is blocked by. `nullptr` if this dependency is ready.
@ -118,21 +120,21 @@ public:
void set_ready();
void set_ready_to_read() {
DCHECK(_is_write_dependency) << debug_string();
DCHECK(_shared_state->source_dep != nullptr) << debug_string();
_shared_state->source_dep->set_ready();
DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
_shared_state->source_deps.front()->set_ready();
}
void set_block_to_read() {
DCHECK(_is_write_dependency) << debug_string();
DCHECK(_shared_state->source_dep != nullptr) << debug_string();
_shared_state->source_dep->block();
DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
_shared_state->source_deps.front()->block();
}
void set_ready_to_write() {
DCHECK(_shared_state->sink_dep != nullptr) << debug_string();
_shared_state->sink_dep->set_ready();
DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
_shared_state->sink_deps.front()->set_ready();
}
void set_block_to_write() {
DCHECK(_shared_state->sink_dep != nullptr) << debug_string();
_shared_state->sink_dep->block();
DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
_shared_state->sink_deps.front()->block();
}
// Notify downstream pipeline tasks this dependency is blocked.
@ -172,7 +174,6 @@ protected:
BasicSharedState* _shared_state = nullptr;
MonotonicStopWatch _watcher;
std::list<std::shared_ptr<Dependency>> _children;
std::mutex _task_lock;
std::vector<PipelineXTask*> _blocked_task;
@ -322,31 +323,6 @@ protected:
std::shared_ptr<std::atomic_bool> _blocked_by_rf;
};
struct EmptySharedState final : public BasicSharedState {};
struct AndSharedState final : public BasicSharedState {};
class AndDependency final : public Dependency {
public:
using SharedState = AndSharedState;
ENABLE_FACTORY_CREATOR(AndDependency);
AndDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "AndDependency", query_ctx) {}
std::string debug_string(int indentation_level = 0) override;
void add_child(std::shared_ptr<Dependency> child) override { _children.push_back(child); }
[[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override {
for (auto& child : Dependency::_children) {
if (auto* dep = child->is_blocked_by(task)) {
return dep;
}
}
return nullptr;
}
};
struct AggSharedState : public BasicSharedState {
public:
AggSharedState() {
@ -661,25 +637,28 @@ public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
LocalExchangeSharedState(int num_instances);
std::unique_ptr<Exchanger> exchanger {};
std::vector<DependencySPtr> source_dependencies;
DependencySPtr sink_dependency;
std::vector<MemTracker*> mem_trackers;
std::atomic<size_t> mem_usage = 0;
std::mutex le_lock;
void create_source_dependencies(int operator_id, int node_id, QueryContext* ctx) {
for (size_t i = 0; i < source_deps.size(); i++) {
source_deps[i] = std::make_shared<Dependency>(
operator_id, node_id, "LOCAL_EXCHANGE_OPERATOR_DEPENDENCY", ctx);
source_deps[i]->set_shared_state(this);
}
};
void sub_running_sink_operators();
void _set_always_ready() {
for (auto& dep : source_dependencies) {
for (auto& dep : source_deps) {
DCHECK(dep);
dep->set_always_ready();
}
}
void set_dep_by_channel_id(DependencySPtr dep, int channel_id) {
source_dependencies[channel_id] = dep;
}
Dependency* get_dep_by_channel_id(int channel_id) { return source_deps[channel_id].get(); }
void set_ready_to_read(int channel_id) {
auto& dep = source_dependencies[channel_id];
auto& dep = source_deps[channel_id];
DCHECK(dep) << channel_id;
dep->set_ready();
}
@ -700,13 +679,13 @@ public:
void add_total_mem_usage(size_t delta) {
if (mem_usage.fetch_add(delta) > config::local_exchange_buffer_mem_limit) {
sink_dependency->block();
sink_deps.front()->block();
}
}
void sub_total_mem_usage(size_t delta) {
if (mem_usage.fetch_sub(delta) <= config::local_exchange_buffer_mem_limit) {
sink_dependency->set_ready();
sink_deps.front()->set_ready();
}
}
};

View File

@ -26,7 +26,6 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* state, LocalStateInfo&
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
_channel_id = info.task_idx;
_shared_state->set_dep_by_channel_id(info.dependency, _channel_id);
_shared_state->mem_trackers[_channel_id] = _mem_tracker.get();
_exchanger = _shared_state->exchanger.get();
DCHECK(_exchanger != nullptr);

View File

@ -267,80 +267,24 @@ Status DataSinkOperatorX<LocalStateType>::setup_local_state(RuntimeState* state,
return Status::OK();
}
template <typename SharedStateType>
constexpr bool NeedToCreate = true;
template <>
inline constexpr bool NeedToCreate<MultiCastSharedState> = false;
template <>
inline constexpr bool NeedToCreate<SetSharedState> = false;
template <>
inline constexpr bool NeedToCreate<UnionSharedState> = false;
template <>
inline constexpr bool NeedToCreate<LocalExchangeSharedState> = false;
template <typename LocalStateType>
void DataSinkOperatorX<LocalStateType>::get_dependency(
vector<DependencySPtr>& dependency,
std::map<int, std::shared_ptr<BasicSharedState>>& shared_states, QueryContext* ctx) {
std::shared_ptr<BasicSharedState> ss = nullptr;
if constexpr (NeedToCreate<typename LocalStateType::SharedStateType>) {
ss.reset(new typename LocalStateType::SharedStateType());
DCHECK(!shared_states.contains(dests_id().front()));
if constexpr (!std::is_same_v<typename LocalStateType::SharedStateType, FakeSharedState>) {
shared_states.insert({dests_id().front(), ss});
}
std::shared_ptr<BasicSharedState> DataSinkOperatorX<LocalStateType>::create_shared_state() const {
if constexpr (std::is_same_v<typename LocalStateType::SharedStateType,
LocalExchangeSharedState>) {
return nullptr;
} else if constexpr (std::is_same_v<typename LocalStateType::SharedStateType,
MultiCastSharedState>) {
ss = ((MultiCastDataStreamSinkOperatorX*)this)->create_multi_cast_data_streamer();
auto& dests = dests_id();
for (auto& dest_id : dests) {
DCHECK(!shared_states.contains(dest_id));
shared_states.insert({dest_id, ss});
}
}
if constexpr (std::is_same_v<typename LocalStateType::SharedStateType, AndSharedState>) {
auto& dests = dests_id();
for (auto& dest_id : dests) {
dependency.push_back(std::make_shared<AndDependency>(dest_id, _node_id, ctx));
dependency.back()->set_shared_state(ss.get());
}
} else if constexpr (!std::is_same_v<typename LocalStateType::SharedStateType,
FakeSharedState>) {
auto& dests = dests_id();
for (auto& dest_id : dests) {
dependency.push_back(std::make_shared<Dependency>(dest_id, _node_id,
_name + "_DEPENDENCY", true, ctx));
dependency.back()->set_shared_state(ss.get());
}
LOG(FATAL) << "should not reach here!";
return nullptr;
} else {
dependency.push_back(nullptr);
}
}
template <typename LocalStateType>
DependencySPtr OperatorX<LocalStateType>::get_dependency(
QueryContext* ctx, std::map<int, std::shared_ptr<BasicSharedState>>& shared_states) {
std::shared_ptr<BasicSharedState> ss = nullptr;
if constexpr (std::is_same_v<typename LocalStateType::SharedStateType, SetSharedState>) {
std::shared_ptr<BasicSharedState> ss = nullptr;
ss.reset(new typename LocalStateType::SharedStateType());
shared_states.insert({operator_id(), ss});
} else if constexpr (std::is_same_v<typename LocalStateType::SharedStateType,
UnionSharedState>) {
ss.reset(new typename LocalStateType::SharedStateType(
((UnionSourceOperatorX*)this)->get_child_count()));
shared_states.insert({operator_id(), ss});
ss->id = operator_id();
for (auto& dest : dests_id()) {
ss->related_op_ids.insert(dest);
}
return ss;
}
DependencySPtr dep = nullptr;
if constexpr (std::is_same_v<typename LocalStateType::SharedStateType, AndSharedState>) {
dep = std::make_shared<AndDependency>(_operator_id, _node_id, ctx);
} else if constexpr (std::is_same_v<typename LocalStateType::SharedStateType,
FakeSharedState>) {
dep = std::make_shared<FakeDependency>(_operator_id, _node_id, ctx);
} else {
dep = std::make_shared<Dependency>(_operator_id, _node_id, _op_name + "_DEPENDENCY", ctx);
dep->set_shared_state(ss.get());
}
return dep;
}
template <typename LocalStateType>
@ -373,25 +317,22 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
_runtime_profile->set_is_sink(false);
info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
constexpr auto is_fake_shared = std::is_same_v<SharedStateArg, FakeSharedState>;
_dependency = info.dependency.get();
if constexpr (!is_fake_shared) {
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
auto& deps = info.upstream_dependencies;
if constexpr (std::is_same_v<LocalExchangeSharedState, SharedStateArg>) {
_dependency->set_shared_state(info.le_state_map[_parent->operator_id()].first.get());
_shared_state = _dependency->shared_state()->template cast<SharedStateArg>();
_shared_state = info.le_state_map[_parent->operator_id()].first.get();
_shared_state->source_dep = info.dependency;
} else if constexpr (!std::is_same_v<SharedStateArg, EmptySharedState> &&
!std::is_same_v<SharedStateArg, AndSharedState>) {
_dependency->set_shared_state(info.shared_state);
_shared_state = _dependency->shared_state()->template cast<SharedStateArg>();
_dependency = _shared_state->get_dep_by_channel_id(info.task_idx);
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
} else if (info.shared_state) {
// For UnionSourceOperator without children, there is no shared state.
_shared_state = info.shared_state->template cast<SharedStateArg>();
_shared_state->source_dep = info.dependency;
if (!deps.empty()) {
_shared_state->sink_dep = deps.front();
}
_dependency = _shared_state->create_source_dependency(
_parent->operator_id(), _parent->node_id(), _parent->get_name(),
state->get_query_ctx());
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
}
}
@ -445,20 +386,19 @@ Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
_wait_for_finish_dependency_timer = ADD_TIMER(_profile, "PendingFinishDependency");
constexpr auto is_fake_shared = std::is_same_v<SharedState, FakeSharedState>;
if constexpr (!is_fake_shared) {
auto& deps = info.dependencies;
_dependency = deps.front().get();
if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) {
_dependency = info.le_state_map[_parent->dests_id().front()].second.get();
}
if (_dependency) {
_shared_state = (SharedState*)_dependency->shared_state();
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
} else {
_shared_state = info.shared_state->template cast<SharedState>();
_dependency = _shared_state->create_sink_dependency(
_parent->dests_id().front(), _parent->node_id(), _parent->get_name(),
state->get_query_ctx());
}
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
} else {
auto& deps = info.dependencies;
deps.front() = std::make_shared<FakeDependency>(0, 0, state->get_query_ctx());
_dependency = deps.front().get();
_dependency = nullptr;
}
_rows_input_counter = ADD_COUNTER_WITH_LEVEL(_profile, "InputRows", TUnit::UNIT, 1);
_open_timer = ADD_TIMER_WITH_LEVEL(_profile, "OpenTime", 1);
@ -656,7 +596,6 @@ template class PipelineXSinkLocalState<UnionSharedState>;
template class PipelineXSinkLocalState<PartitionSortNodeSharedState>;
template class PipelineXSinkLocalState<MultiCastSharedState>;
template class PipelineXSinkLocalState<SetSharedState>;
template class PipelineXSinkLocalState<AndSharedState>;
template class PipelineXSinkLocalState<LocalExchangeSharedState>;
template class PipelineXSinkLocalState<BasicSharedState>;
@ -671,8 +610,6 @@ template class PipelineXLocalState<MultiCastSharedState>;
template class PipelineXLocalState<PartitionSortNodeSharedState>;
template class PipelineXLocalState<SetSharedState>;
template class PipelineXLocalState<LocalExchangeSharedState>;
template class PipelineXLocalState<EmptySharedState>;
template class PipelineXLocalState<AndSharedState>;
template class PipelineXLocalState<BasicSharedState>;
template class AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileSinkOperatorX>;

View File

@ -31,13 +31,10 @@ namespace doris::pipeline {
struct LocalStateInfo {
RuntimeProfile* parent_profile = nullptr;
const std::vector<TScanRangeParams> scan_ranges;
std::vector<DependencySPtr>& upstream_dependencies;
BasicSharedState* shared_state;
std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>>
le_state_map;
const int task_idx;
DependencySPtr dependency;
};
// This struct is used only for initializing local sink state.
@ -45,7 +42,7 @@ struct LocalSinkStateInfo {
const int task_idx;
RuntimeProfile* parent_profile = nullptr;
const int sender_id;
std::vector<DependencySPtr>& dependencies;
BasicSharedState* shared_state;
std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>>
le_state_map;
const TDataSink& tsink;
@ -100,7 +97,7 @@ public:
[[nodiscard]] virtual std::string debug_string(int indentation_level = 0) const = 0;
virtual Dependency* dependency() { return nullptr; }
virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }
// override in Scan
virtual Dependency* finishdependency() { return nullptr; }
@ -184,8 +181,6 @@ public:
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name);
}
[[nodiscard]] std::string get_name() const override { return _op_name; }
[[nodiscard]] virtual DependencySPtr get_dependency(
QueryContext* ctx, std::map<int, std::shared_ptr<BasicSharedState>>& shared_states) = 0;
[[nodiscard]] virtual DataDistribution required_data_distribution() const {
return _child_x && _child_x->ignore_data_distribution() && !is_source()
? DataDistribution(ExchangeType::PASSTHROUGH)
@ -348,10 +343,6 @@ public:
[[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
return state->get_local_state(operator_id())->template cast<LocalState>();
}
DependencySPtr get_dependency(
QueryContext* ctx,
std::map<int, std::shared_ptr<BasicSharedState>>& shared_states) override;
};
template <typename SharedStateArg = FakeSharedState>
@ -372,7 +363,9 @@ public:
[[nodiscard]] std::string debug_string(int indentation_level = 0) const override;
Dependency* dependency() override { return _dependency; }
std::vector<Dependency*> dependencies() const override {
return _dependency ? std::vector<Dependency*> {_dependency} : std::vector<Dependency*> {};
}
protected:
Dependency* _dependency = nullptr;
@ -422,7 +415,7 @@ public:
RuntimeProfile::Counter* rows_input_counter() { return _rows_input_counter; }
RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
virtual Dependency* dependency() { return nullptr; }
virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }
// override in exchange sink , AsyncWriterSink
virtual Dependency* finishdependency() { return nullptr; }
@ -513,9 +506,7 @@ public:
return reinterpret_cast<const TARGET&>(*this);
}
virtual void get_dependency(std::vector<DependencySPtr>& dependency,
std::map<int, std::shared_ptr<BasicSharedState>>& shared_states,
QueryContext* ctx) = 0;
[[nodiscard]] virtual std::shared_ptr<BasicSharedState> create_shared_state() const = 0;
[[nodiscard]] virtual DataDistribution required_data_distribution() const {
return _child_x && _child_x->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
@ -612,9 +603,7 @@ public:
~DataSinkOperatorX() override = default;
Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override;
void get_dependency(std::vector<DependencySPtr>& dependency,
std::map<int, std::shared_ptr<BasicSharedState>>& shared_states,
QueryContext* ctx) override;
std::shared_ptr<BasicSharedState> create_shared_state() const override;
using LocalState = LocalStateType;
[[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
@ -640,7 +629,9 @@ public:
virtual std::string name_suffix() { return " (id=" + std::to_string(_parent->node_id()) + ")"; }
Dependency* dependency() override { return _dependency; }
std::vector<Dependency*> dependencies() const override {
return _dependency ? std::vector<Dependency*> {_dependency} : std::vector<Dependency*> {};
}
protected:
Dependency* _dependency = nullptr;
@ -717,7 +708,9 @@ public:
Status sink(RuntimeState* state, vectorized::Block* block, bool eos);
Dependency* dependency() override { return _async_writer_dependency.get(); }
std::vector<Dependency*> dependencies() const override {
return {_async_writer_dependency.get()};
}
Status close(RuntimeState* state, Status exec_status) override;
Dependency* finishdependency() override { return _finish_dependency.get(); }

View File

@ -610,9 +610,13 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
auto& deps = _dag[_pipeline->id()];
for (auto& dep : deps) {
if (pipeline_id_to_task.contains(dep)) {
task->add_upstream_dependency(
pipeline_id_to_task[dep]->get_downstream_dependency(),
pipeline_id_to_task[dep]->get_shared_states());
auto ss = pipeline_id_to_task[dep]->get_sink_shared_state();
if (ss) {
task->inject_shared_state(ss);
} else {
pipeline_id_to_task[dep]->inject_shared_state(
task->get_source_shared_state());
}
}
}
}
@ -781,7 +785,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
"LOCAL_EXCHANGE_SINK_DEPENDENCY", true,
_runtime_state->get_query_ctx());
sink_dep->set_shared_state(shared_state.get());
shared_state->sink_dependency = sink_dep;
shared_state->sink_deps.push_back(sink_dep);
_op_id_to_le_state.insert({local_exchange_id, {shared_state, sink_dep}});
// 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to
@ -804,6 +808,9 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
}
operator_xs.insert(operator_xs.begin(), source_op);
shared_state->create_source_dependencies(source_op->operator_id(), source_op->node_id(),
_query_ctx.get());
// 5. Set children for two pipelines separately.
std::vector<std::shared_ptr<Pipeline>> new_children;
std::vector<PipelineId> edges_with_source;

View File

@ -191,8 +191,6 @@ private:
#pragma clang diagnostic pop
#endif
std::atomic_bool _canceled = false;
// `_dag` manage dependencies between pipelines by pipeline ID. the indices will be blocked by members
std::map<PipelineId, std::vector<PipelineId>> _dag;

View File

@ -60,10 +60,10 @@ PipelineXTask::PipelineXTask(
_task_idx(task_idx),
_execution_dep(state->get_query_ctx()->get_execution_dependency()) {
_pipeline_task_watcher.start();
_sink->get_dependency(_downstream_dependency, _shared_states, state->get_query_ctx());
for (auto& op : _operators) {
_source_dependency.insert(
{op->operator_id(), op->get_dependency(state->get_query_ctx(), _shared_states)});
auto shared_state = _sink->create_shared_state();
if (shared_state) {
_sink_shared_state = shared_state;
}
pipeline->incr_created_tasks();
}
@ -82,7 +82,7 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const
LocalSinkStateInfo info {_task_idx,
_task_profile.get(),
local_params.sender_id,
get_downstream_dependency(),
get_sink_shared_state().get(),
_le_state_map,
tsink};
RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
@ -97,14 +97,8 @@ Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
auto& op = _operators[op_idx];
auto& deps = get_upstream_dependency(op->operator_id());
LocalStateInfo info {parent_profile,
scan_ranges,
deps,
get_shared_state(op->operator_id()),
_le_state_map,
_task_idx,
_source_dependency[op->operator_id()]};
LocalStateInfo info {parent_profile, scan_ranges, get_op_shared_state(op->operator_id()),
_le_state_map, _task_idx};
RETURN_IF_ERROR(op->setup_local_state(_state, info));
parent_profile = _state->get_local_state(op->operator_id())->profile();
query_ctx->register_query_statistics(
@ -126,9 +120,9 @@ Status PipelineXTask::_extract_dependencies() {
return result.error();
}
auto* local_state = result.value();
auto* dep = local_state->dependency();
DCHECK(dep != nullptr);
_read_dependencies.push_back(dep);
const auto& deps = local_state->dependencies();
std::copy(deps.begin(), deps.end(),
std::inserter(_read_dependencies, _read_dependencies.end()));
auto* fin_dep = local_state->finishdependency();
if (fin_dep) {
_finish_dependencies.push_back(fin_dep);
@ -136,9 +130,9 @@ Status PipelineXTask::_extract_dependencies() {
}
{
auto* local_state = _state->get_sink_local_state();
auto* dep = local_state->dependency();
DCHECK(dep != nullptr);
_write_dependencies = dep;
_write_dependencies = local_state->dependencies();
DCHECK(std::all_of(_write_dependencies.begin(), _write_dependencies.end(),
[](auto* dep) { return dep->is_write_dependency(); }));
auto* fin_dep = local_state->finishdependency();
if (fin_dep) {
_finish_dependencies.push_back(fin_dep);
@ -302,10 +296,8 @@ void PipelineXTask::finalize() {
PipelineTask::finalize();
std::unique_lock<std::mutex> lc(_release_lock);
_finished = true;
std::vector<DependencySPtr> {}.swap(_downstream_dependency);
_upstream_dependency.clear();
_source_dependency.clear();
_shared_states.clear();
_sink_shared_state.reset();
_op_shared_states.clear();
_le_state_map.clear();
}
@ -372,8 +364,10 @@ std::string PipelineXTask::debug_string() {
}
fmt::format_to(debug_string_buffer, "Write Dependency Information: \n");
fmt::format_to(debug_string_buffer, "{}. {}\n", i, _write_dependencies->debug_string(1));
i++;
for (size_t j = 0; j < _write_dependencies.size(); j++, i++) {
fmt::format_to(debug_string_buffer, "{}. {}\n", i,
_write_dependencies[j]->debug_string(i + 1));
}
if (_filter_dependency) {
fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n");

View File

@ -94,41 +94,36 @@ public:
bool is_pending_finish() override { return _finish_blocked_dependency() != nullptr; }
std::vector<DependencySPtr>& get_downstream_dependency() { return _downstream_dependency; }
std::map<int, std::shared_ptr<BasicSharedState>>& get_shared_states() { return _shared_states; }
std::shared_ptr<BasicSharedState> get_source_shared_state() {
return _op_shared_states.contains(_source->operator_id())
? _op_shared_states[_source->operator_id()]
: nullptr;
}
void add_upstream_dependency(std::vector<DependencySPtr>& multi_upstream_dependency,
std::map<int, std::shared_ptr<BasicSharedState>>& shared_states) {
for (auto dep : multi_upstream_dependency) {
int dst_id = dep->id();
if (!_upstream_dependency.contains(dst_id)) {
_upstream_dependency.insert({dst_id, {dep}});
} else {
_upstream_dependency[dst_id].push_back(dep);
}
if (shared_states.contains(dst_id) && !_shared_states.contains(dst_id)) {
// Shared state is created by upstream task's sink operator and shared by source operator of this task.
_shared_states.insert({dst_id, shared_states[dst_id]});
} else if (_shared_states.contains(dst_id) && !shared_states.contains(dst_id)) {
// Shared state is created by this task's source operator and shared by upstream task's sink operator.
shared_states.insert({dst_id, _shared_states[dst_id]});
void inject_shared_state(std::shared_ptr<BasicSharedState> shared_state) {
if (!shared_state) {
return;
}
// Shared state is created by upstream task's sink operator and shared by source operator of this task.
for (auto& op : _operators) {
if (shared_state->related_op_ids.contains(op->operator_id())) {
_op_shared_states.insert({op->operator_id(), shared_state});
return;
}
}
if (shared_state->related_op_ids.contains(_sink->dests_id().front())) {
DCHECK(_sink_shared_state == nullptr);
_sink_shared_state = shared_state;
}
}
std::vector<DependencySPtr>& get_upstream_dependency(int id) {
if (_upstream_dependency.find(id) == _upstream_dependency.end()) {
_upstream_dependency.insert({id, {}});
}
return _upstream_dependency[id];
}
std::shared_ptr<BasicSharedState> get_sink_shared_state() { return _sink_shared_state; }
BasicSharedState* get_shared_state(int id) {
if (!_shared_states.contains(id)) {
BasicSharedState* get_op_shared_state(int id) {
if (!_op_shared_states.contains(id)) {
return nullptr;
}
return _shared_states[id].get();
return _op_shared_states[id].get();
}
bool is_pipelineX() const override { return true; }
@ -161,10 +156,12 @@ public:
private:
Dependency* _write_blocked_dependency() {
_blocked_dep = _write_dependencies->is_blocked_by(this);
if (_blocked_dep != nullptr) {
static_cast<Dependency*>(_blocked_dep)->start_watcher();
return _blocked_dep;
for (auto* op_dep : _write_dependencies) {
_blocked_dep = op_dep->is_blocked_by(this);
if (_blocked_dep != nullptr) {
_blocked_dep->start_watcher();
return _blocked_dep;
}
}
return nullptr;
}
@ -203,18 +200,13 @@ private:
DataSinkOperatorXPtr _sink;
std::vector<Dependency*> _read_dependencies;
Dependency* _write_dependencies;
std::vector<Dependency*> _write_dependencies;
std::vector<Dependency*> _finish_dependencies;
RuntimeFilterDependency* _filter_dependency;
// Write dependencies of upstream pipeline tasks.
DependencyMap _upstream_dependency;
// Read dependencies of this pipeline task.
std::map<int, DependencySPtr> _source_dependency;
// Write dependencies of this pipeline tasks.
std::vector<DependencySPtr> _downstream_dependency;
// All shared states of this pipeline task.
std::map<int, std::shared_ptr<BasicSharedState>> _shared_states;
std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states;
std::shared_ptr<BasicSharedState> _sink_shared_state;
std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>>
_le_state_map;
int _task_idx;