[pipelineX](bug) Fix meta scan operator (#24963)

This commit is contained in:
Gabriel
2023-09-27 20:34:47 +08:00
committed by GitHub
parent 671b5f0a0a
commit 1fb9022d07
7 changed files with 182 additions and 140 deletions

View File

@ -43,6 +43,10 @@ void MetaScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& sc
_scan_ranges = scan_ranges;
}
Status MetaScanLocalState::_process_conjuncts() {
return Status::OK();
}
MetaScanOperatorX::MetaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ScanOperatorX<MetaScanLocalState>(pool, tnode, descs),

View File

@ -50,6 +50,7 @@ private:
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override;
Status _process_conjuncts() override;
std::vector<TScanRangeParams> _scan_ranges;
};

View File

@ -172,9 +172,8 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
RETURN_IF_ERROR(_acquire_runtime_filter());
RETURN_IF_ERROR(_process_conjuncts());
auto status = _eos_dependency->read_blocked_by() == nullptr
? Status::OK()
: _prepare_scanners(state->query_parallel_instance_num());
auto status =
_eos_dependency->read_blocked_by() == nullptr ? Status::OK() : _prepare_scanners();
if (_scanner_ctx) {
DCHECK(_eos_dependency->read_blocked_by() != nullptr && _num_scanners->value() > 0);
RETURN_IF_ERROR(_scanner_ctx->init());
@ -1163,21 +1162,21 @@ Status ScanLocalState<Derived>::_normalize_match_predicate(
}
template <typename Derived>
Status ScanLocalState<Derived>::_prepare_scanners(const int query_parallel_instance_num) {
Status ScanLocalState<Derived>::_prepare_scanners() {
std::list<vectorized::VScannerSPtr> scanners;
RETURN_IF_ERROR(_init_scanners(&scanners));
if (scanners.empty()) {
_eos_dependency->set_ready_for_read();
} else {
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
RETURN_IF_ERROR(_start_scanners(scanners, query_parallel_instance_num));
RETURN_IF_ERROR(_start_scanners(scanners));
}
return Status::OK();
}
template <typename Derived>
Status ScanLocalState<Derived>::_start_scanners(const std::list<vectorized::VScannerSPtr>& scanners,
const int query_parallel_instance_num) {
Status ScanLocalState<Derived>::_start_scanners(
const std::list<vectorized::VScannerSPtr>& scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = PipScannerContext::create_shared(state(), this, p._output_tuple_desc, scanners,
p.limit(), state()->scan_queue_mem_limit(),

View File

@ -134,7 +134,7 @@ public:
[[nodiscard]] virtual int runtime_filter_num() const = 0;
Status virtual clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) = 0;
virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) = 0;
virtual void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) = 0;
virtual TPushAggOp::type get_push_down_agg_type() = 0;
@ -351,11 +351,10 @@ protected:
const ChangeFixedValueRangeFunc& func, const std::string& fn_name,
int slot_ref_child = -1);
Status _prepare_scanners(const int query_parallel_instance_num);
Status _prepare_scanners();
// Submit the scanner to the thread pool and start execution
Status _start_scanners(const std::list<vectorized::VScannerSPtr>& scanners,
const int query_parallel_instance_num);
Status _start_scanners(const std::list<vectorized::VScannerSPtr>& scanners);
// Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in this vector
// so that it will be destroyed uniformly at the end of the query.

View File

@ -337,6 +337,96 @@ Status OperatorX<UnionSourceLocalState>::setup_local_states(RuntimeState* state,
return Status::OK();
}
template <typename DependencyType>
Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalStateInfo& info) {
_runtime_profile.reset(new RuntimeProfile(_parent->get_name() +
" (id=" + std::to_string(_parent->id()) + ")"));
_runtime_profile->set_metadata(_parent->id());
info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
if constexpr (!std::is_same_v<FakeDependency, Dependency>) {
_dependency = (DependencyType*)info.dependency;
if (_dependency) {
_shared_state = (typename DependencyType::SharedState*)_dependency->shared_state();
_wait_for_dependency_timer = ADD_TIMER(
_runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time");
}
}
_conjuncts.resize(_parent->_conjuncts.size());
_projections.resize(_parent->_projections.size());
for (size_t i = 0; i < _conjuncts.size(); i++) {
RETURN_IF_ERROR(_parent->_conjuncts[i]->clone(state, _conjuncts[i]));
}
for (size_t i = 0; i < _projections.size(); i++) {
RETURN_IF_ERROR(_parent->_projections[i]->clone(state, _projections[i]));
}
_rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", TUnit::UNIT);
_blocks_returned_counter = ADD_COUNTER(_runtime_profile, "BlocksReturned", TUnit::UNIT);
_projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime");
_open_timer = ADD_TIMER(_runtime_profile, "OpenTime");
_close_timer = ADD_TIMER(_runtime_profile, "CloseTime");
_rows_returned_rate = profile()->add_derived_counter(
doris::ExecNode::ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
std::bind<int64_t>(&RuntimeProfile::units_per_second, _rows_returned_counter,
profile()->total_time_counter()),
"");
_mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" + _runtime_profile->name());
_memory_used_counter = ADD_LABEL_COUNTER(_runtime_profile, "MemoryUsage");
_peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
"PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
return Status::OK();
}
template <typename DependencyType>
Status PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
if (_dependency) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->read_watcher_elapse_time());
}
if (_rows_returned_counter != nullptr) {
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
profile()->add_to_span(_span);
_closed = true;
return Status::OK();
}
template <typename DependencyType>
Status PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
LocalSinkStateInfo& info) {
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(
_parent->get_name() + " (id=" + std::to_string(_parent->id()) + ")"));
if constexpr (!std::is_same_v<FakeDependency, Dependency>) {
_dependency = (DependencyType*)info.dependency;
if (_dependency) {
_shared_state = (typename DependencyType::SharedState*)_dependency->shared_state();
_wait_for_dependency_timer =
ADD_TIMER(_profile, "WaitForDependency[" + _dependency->name() + "]Time");
}
}
_rows_input_counter = ADD_COUNTER(_profile, "InputRows", TUnit::UNIT);
_open_timer = ADD_TIMER(_profile, "OpenTime");
_close_timer = ADD_TIMER(_profile, "CloseTime");
info.parent_profile->add_child(_profile, true, nullptr);
_mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
return Status::OK();
}
template <typename DependencyType>
Status PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
if (_dependency) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->write_watcher_elapse_time());
}
_closed = true;
return Status::OK();
}
template <typename LocalStateType>
Status StreamingOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) {
@ -377,6 +467,70 @@ Status StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectori
return Status::OK();
}
template <typename Writer, typename Parent>
Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info));
_output_vexpr_ctxs.resize(_parent->cast<Parent>()._output_vexpr_ctxs.size());
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
RETURN_IF_ERROR(
_parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i]));
}
_writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
_async_writer_dependency = AsyncWriterDependency::create_shared(_parent->id());
_writer->set_dependency(_async_writer_dependency.get());
_wait_for_dependency_timer =
ADD_TIMER(_profile, "WaitForDependency[" + _async_writer_dependency->name() + "]Time");
return Status::OK();
}
template <typename Writer, typename Parent>
Status AsyncWriterSink<Writer, Parent>::open(RuntimeState* state) {
RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state));
_writer->start_writer(state, _profile);
return Status::OK();
}
template <typename Writer, typename Parent>
Status AsyncWriterSink<Writer, Parent>::sink(RuntimeState* state, vectorized::Block* block,
SourceState source_state) {
return _writer->sink(block, source_state == SourceState::FINISHED);
}
template <typename Writer, typename Parent>
WriteDependency* AsyncWriterSink<Writer, Parent>::write_blocked_by() {
return _writer->write_blocked_by();
}
template <typename Writer, typename Parent>
Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
COUNTER_SET(_wait_for_dependency_timer, _async_writer_dependency->write_watcher_elapse_time());
if (_writer->need_normal_close()) {
if (exec_status.ok() && !state->is_cancelled()) {
RETURN_IF_ERROR(_writer->commit_trans());
}
RETURN_IF_ERROR(_writer->close(exec_status));
}
return PipelineXSinkLocalState<>::close(state, exec_status);
}
template <typename Writer, typename Parent>
Status AsyncWriterSink<Writer, Parent>::try_close(RuntimeState* state, Status exec_status) {
if (state->is_cancelled() || !exec_status.ok()) {
_writer->force_close(!exec_status.ok() ? exec_status : Status::Cancelled("Cancelled"));
}
return Status::OK();
}
template <typename Writer, typename Parent>
bool AsyncWriterSink<Writer, Parent>::is_pending_finish() {
return _writer->is_pending_finish();
}
#define DECLARE_OPERATOR_X(LOCAL_STATE) template class DataSinkOperatorX<LOCAL_STATE>;
DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState)
DECLARE_OPERATOR_X(ResultSinkLocalState)
@ -445,4 +599,6 @@ template class PipelineXLocalState<MultiCastDependency>;
template class PipelineXSinkLocalState<MultiCastDependency>;
template class PipelineXLocalState<PartitionSortDependency>;
template class AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileSinkOperatorX>;
} // namespace doris::pipeline

View File

@ -319,60 +319,9 @@ public:
: PipelineXLocalStateBase(state, parent) {}
~PipelineXLocalState() override = default;
Status init(RuntimeState* state, LocalStateInfo& info) override {
_runtime_profile.reset(new RuntimeProfile(_parent->get_name() +
" (id=" + std::to_string(_parent->id()) + ")"));
_runtime_profile->set_metadata(_parent->id());
info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
if constexpr (!std::is_same_v<FakeDependency, Dependency>) {
_dependency = (DependencyType*)info.dependency;
if (_dependency) {
_shared_state = (typename DependencyType::SharedState*)_dependency->shared_state();
_wait_for_dependency_timer = ADD_TIMER(
_runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time");
}
}
Status init(RuntimeState* state, LocalStateInfo& info) override;
_conjuncts.resize(_parent->_conjuncts.size());
_projections.resize(_parent->_projections.size());
for (size_t i = 0; i < _conjuncts.size(); i++) {
RETURN_IF_ERROR(_parent->_conjuncts[i]->clone(state, _conjuncts[i]));
}
for (size_t i = 0; i < _projections.size(); i++) {
RETURN_IF_ERROR(_parent->_projections[i]->clone(state, _projections[i]));
}
_rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", TUnit::UNIT);
_blocks_returned_counter = ADD_COUNTER(_runtime_profile, "BlocksReturned", TUnit::UNIT);
_projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime");
_open_timer = ADD_TIMER(_runtime_profile, "OpenTime");
_close_timer = ADD_TIMER(_runtime_profile, "CloseTime");
_rows_returned_rate = profile()->add_derived_counter(
doris::ExecNode::ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
std::bind<int64_t>(&RuntimeProfile::units_per_second, _rows_returned_counter,
profile()->total_time_counter()),
"");
_mem_tracker =
std::make_unique<MemTracker>("PipelineXLocalState:" + _runtime_profile->name());
_memory_used_counter = ADD_LABEL_COUNTER(_runtime_profile, "MemoryUsage");
_peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
"PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
return Status::OK();
}
Status close(RuntimeState* state) override {
if (_closed) {
return Status::OK();
}
if (_dependency) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->read_watcher_elapse_time());
}
if (_rows_returned_counter != nullptr) {
COUNTER_SET(_rows_returned_counter, _num_rows_returned);
}
profile()->add_to_span(_span);
_closed = true;
return Status::OK();
}
Status close(RuntimeState* state) override;
[[nodiscard]] std::string debug_string(int indentation_level = 0) const override;
@ -596,40 +545,13 @@ public:
: PipelineXSinkLocalStateBase(parent, state) {}
~PipelineXSinkLocalState() override = default;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
// create profile
_profile = state->obj_pool()->add(new RuntimeProfile(
_parent->get_name() + " (id=" + std::to_string(_parent->id()) + ")"));
if constexpr (!std::is_same_v<FakeDependency, Dependency>) {
_dependency = (DependencyType*)info.dependency;
if (_dependency) {
_shared_state = (typename DependencyType::SharedState*)_dependency->shared_state();
_wait_for_dependency_timer =
ADD_TIMER(_profile, "WaitForDependency[" + _dependency->name() + "]Time");
}
}
_rows_input_counter = ADD_COUNTER(_profile, "InputRows", TUnit::UNIT);
_open_timer = ADD_TIMER(_profile, "OpenTime");
_close_timer = ADD_TIMER(_profile, "CloseTime");
info.parent_profile->add_child(_profile, true, nullptr);
_mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
return Status::OK();
}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override { return Status::OK(); }
Status try_close(RuntimeState* state, Status exec_status) override { return Status::OK(); }
Status close(RuntimeState* state, Status exec_status) override {
if (_closed) {
return Status::OK();
}
if (_dependency) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->write_watcher_elapse_time());
}
_closed = true;
return Status::OK();
}
Status close(RuntimeState* state, Status exec_status) override;
[[nodiscard]] std::string debug_string(int indentation_level) const override;
typename DependencyType::SharedState*& get_shared_state() { return _shared_state; }
@ -687,58 +609,19 @@ public:
AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
: PipelineXSinkLocalState<>(parent, state), _async_writer_dependency(nullptr) {}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info));
_output_vexpr_ctxs.resize(_parent->cast<Parent>()._output_vexpr_ctxs.size());
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
RETURN_IF_ERROR(_parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(
state, _output_vexpr_ctxs[i]));
}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
_writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
_async_writer_dependency = AsyncWriterDependency::create_shared(_parent->id());
_writer->set_dependency(_async_writer_dependency.get());
Status open(RuntimeState* state) override;
_wait_for_dependency_timer = ADD_TIMER(
_profile, "WaitForDependency[" + _async_writer_dependency->name() + "]Time");
return Status::OK();
}
Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state);
Status open(RuntimeState* state) override {
RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state));
_writer->start_writer(state, _profile);
return Status::OK();
}
WriteDependency* write_blocked_by();
Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) {
return _writer->sink(block, source_state == SourceState::FINISHED);
}
Status close(RuntimeState* state, Status exec_status) override;
WriteDependency* write_blocked_by() { return _writer->write_blocked_by(); }
Status try_close(RuntimeState* state, Status exec_status) override;
Status close(RuntimeState* state, Status exec_status) override {
if (_closed) {
return Status::OK();
}
COUNTER_SET(_wait_for_dependency_timer,
_async_writer_dependency->write_watcher_elapse_time());
if (_writer->need_normal_close()) {
if (exec_status.ok() && !state->is_cancelled()) {
RETURN_IF_ERROR(_writer->commit_trans());
}
RETURN_IF_ERROR(_writer->close(exec_status));
}
return PipelineXSinkLocalState<>::close(state, exec_status);
}
Status try_close(RuntimeState* state, Status exec_status) override {
if (state->is_cancelled() || !exec_status.ok()) {
_writer->force_close(!exec_status.ok() ? exec_status : Status::Cancelled("Cancelled"));
}
return Status::OK();
}
bool is_pending_finish() { return _writer->is_pending_finish(); }
bool is_pending_finish();
protected:
vectorized::VExprContextSPtrs _output_vexpr_ctxs;

View File

@ -1654,7 +1654,7 @@ public class SessionVariable implements Serializable, Writable {
int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize();
int autoInstance = (size + 1) / 2;
return Math.min(autoInstance, maxInstanceNum);
} else if (enablePipelineEngine) {
} else if (getEnablePipelineEngine()) {
return parallelPipelineTaskNum;
} else {
return parallelExecInstanceNum;