diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index f3e8df44e1..040fc5ae21 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1158,6 +1158,9 @@ DEFINE_mBool(enable_column_type_check, "true"); // 128 MB DEFINE_mInt64(local_exchange_buffer_mem_limit, "134217728"); +// Default 300s, if its value <= 0, then log is disabled +DEFINE_mInt64(enable_debug_log_timeout_secs, "0"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index cfb8ed02f3..9a287a0c24 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1232,6 +1232,8 @@ DECLARE_mInt32(variant_max_merged_tablet_schema_size); DECLARE_mInt64(local_exchange_buffer_mem_limit); +DECLARE_mInt64(enable_debug_log_timeout_secs); + DECLARE_mBool(enable_column_type_check); #ifdef BE_TEST diff --git a/be/src/common/status.h b/be/src/common/status.h index c0d9215287..d7f4481c1d 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -434,14 +434,6 @@ public: bool ok() const { return _code == ErrorCode::OK; } - // Convert into TStatus. Call this if 'status_container' contains an optional - // TStatus field named 'status'. This also sets __isset.status. - template - void set_t_status(T* status_container) const { - to_thrift(&status_container->status); - status_container->__isset.status = true; - } - // Convert into TStatus. void to_thrift(TStatus* status) const; TStatus to_thrift() const; diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h index 69f7771767..aaa2f6ee07 100644 --- a/be/src/exec/exec_node.h +++ b/be/src/exec/exec_node.h @@ -239,6 +239,10 @@ public: size_t children_count() const { return _children.size(); } + // when the fragment is normal finished, call this method to do some finish work + // such as send the last buffer to remote. + virtual Status try_close(RuntimeState* state) { return Status::OK(); } + protected: friend class DataSink; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index de3214b046..37be7a0181 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -53,6 +53,7 @@ bool ScanOperator::can_read() { return false; } } else { + // If scanner meet any error, done == true if (_node->_eos || _node->_scanner_ctx->done()) { // _eos: need eos // _scanner_ctx->done(): need finish @@ -64,14 +65,6 @@ bool ScanOperator::can_read() { } } -bool ScanOperator::is_pending_finish() const { - return _node->_scanner_ctx && !_node->_scanner_ctx->no_schedule(); -} - -Status ScanOperator::try_close(RuntimeState* state) { - return _node->try_close(state); -} - bool ScanOperator::runtime_filters_are_ready_or_timeout() { return _node->runtime_filters_are_ready_or_timeout(); } @@ -81,9 +74,8 @@ std::string ScanOperator::debug_string() const { fmt::format_to(debug_string_buffer, "{}, scanner_ctx is null: {} ", SourceOperator::debug_string(), _node->_scanner_ctx == nullptr); if (_node->_scanner_ctx) { - fmt::format_to(debug_string_buffer, ", num_running_scanners = {}, num_scheduling_ctx = {} ", - _node->_scanner_ctx->get_num_running_scanners(), - _node->_scanner_ctx->get_num_scheduling_ctx()); + fmt::format_to(debug_string_buffer, ", num_running_scanners = {}", + _node->_scanner_ctx->debug_string()); } return fmt::to_string(debug_string_buffer); } @@ -101,9 +93,6 @@ std::string ScanOperator::debug_string() const { template ScanLocalState::ScanLocalState(RuntimeState* state, OperatorXBase* parent) : ScanLocalStateBase(state, parent) { - _finish_dependency = std::make_shared( - parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY", - state->get_query_ctx()); _filter_dependency = std::make_shared( parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", state->get_query_ctx()); @@ -175,7 +164,6 @@ Status ScanLocalState::open(RuntimeState* state) { auto status = _eos ? Status::OK() : _prepare_scanners(); if (_scanner_ctx) { - _finish_dependency->block(); DCHECK(!_eos && _num_scanners->value() > 0); RETURN_IF_ERROR(_scanner_ctx->init()); RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx)); @@ -569,15 +557,14 @@ std::string ScanLocalState::debug_string(int indentation_level) const { _eos.load()); if (_scanner_ctx) { fmt::format_to(debug_string_buffer, ""); - fmt::format_to( - debug_string_buffer, - ", Scanner Context: (_is_finished = {}, _should_stop = {}, " - "_num_running_scanners={}, " - "_num_scheduling_ctx = {}, _num_unfinished_scanners = {}, status = {}, error = {})", - _scanner_ctx->is_finished(), _scanner_ctx->should_stop(), - _scanner_ctx->get_num_running_scanners(), _scanner_ctx->get_num_scheduling_ctx(), - _scanner_ctx->get_num_unfinished_scanners(), _scanner_ctx->status().to_string(), - _scanner_ctx->status_error()); + fmt::format_to(debug_string_buffer, + ", Scanner Context: (_is_finished = {}, _should_stop = {}, " + "_num_running_scanners={}, " + " _num_unfinished_scanners = {}, status = {})", + _scanner_ctx->is_finished(), _scanner_ctx->should_stop(), + _scanner_ctx->get_num_running_scanners(), + _scanner_ctx->get_num_unfinished_scanners(), + _scanner_ctx->status().to_string()); } return fmt::to_string(debug_string_buffer); @@ -1225,23 +1212,27 @@ template Status ScanLocalState::_prepare_scanners() { std::list scanners; RETURN_IF_ERROR(_init_scanners(&scanners)); + // Init scanner wrapper + for (auto it = scanners.begin(); it != scanners.end(); ++it) { + _scanners.emplace_back(std::make_shared(*it)); + } if (scanners.empty()) { _eos = true; _scan_dependency->set_ready(); } else { COUNTER_SET(_num_scanners, static_cast(scanners.size())); - RETURN_IF_ERROR(_start_scanners(scanners)); + RETURN_IF_ERROR(_start_scanners(_scanners)); } return Status::OK(); } template Status ScanLocalState::_start_scanners( - const std::list& scanners) { + const std::list>& scanners) { auto& p = _parent->cast(); _scanner_ctx = PipXScannerContext::create_shared( state(), this, p._output_tuple_desc, p.output_row_descriptor(), scanners, p.limit(), - state()->scan_queue_mem_limit(), _scan_dependency, _finish_dependency); + state()->scan_queue_mem_limit(), _scan_dependency); return Status::OK(); } @@ -1317,9 +1308,6 @@ Status ScanLocalState::_init_profile() { _max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT); - _wait_for_finish_dependency_timer = - ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency"); - return Status::OK(); } @@ -1427,17 +1415,6 @@ Status ScanOperatorX::open(RuntimeState* state) { return Status::OK(); } -template -Status ScanOperatorX::try_close(RuntimeState* state) { - auto& local_state = get_local_state(state); - if (local_state._scanner_ctx) { - // mark this scanner ctx as should_stop to make sure scanners will not be scheduled anymore - // TODO: there is a lock in `set_should_stop` may cause some slight impact - local_state._scanner_ctx->set_should_stop(); - } - return Status::OK(); -} - template Status ScanLocalState::close(RuntimeState* state) { if (_closed) { @@ -1449,10 +1426,9 @@ Status ScanLocalState::close(RuntimeState* state) { SCOPED_TIMER(exec_time_counter()); if (_scanner_ctx) { - _scanner_ctx->clear_and_join(reinterpret_cast(this), state); + _scanner_ctx->stop_scanners(state); } COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time()); - COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time()); return PipelineXLocalState::close(state); @@ -1509,7 +1485,7 @@ Status ScanOperatorX::get_block(RuntimeState* state, vectorized: if (eos) { source_state = SourceState::FINISHED; // reach limit, stop the scanners. - local_state._scanner_ctx->set_should_stop(); + local_state._scanner_ctx->stop_scanners(state); } return Status::OK(); diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 3ad1ac1536..fa7e44a8fb 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -32,6 +32,9 @@ namespace doris { class ExecNode; } // namespace doris +namespace doris::vectorized { +class ScannerDelegate; +} namespace doris::pipeline { class PipScannerContext; @@ -49,13 +52,9 @@ public: bool can_read() override; // for source - bool is_pending_finish() const override; - bool runtime_filters_are_ready_or_timeout() override; std::string debug_string() const override; - - Status try_close(RuntimeState* state) override; }; class ScanDependency final : public Dependency { @@ -174,7 +173,6 @@ protected: RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr; // time of prefilter input block from scanner RuntimeProfile::Counter* _wait_for_eos_timer = nullptr; - RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr; RuntimeProfile::Counter* _wait_for_rf_timer = nullptr; }; @@ -215,7 +213,6 @@ class ScanLocalState : public ScanLocalStateBase { int64_t get_push_down_count() override; RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }; - Dependency* finishdependency() override { return _finish_dependency.get(); } protected: template @@ -351,7 +348,7 @@ protected: Status _prepare_scanners(); // Submit the scanner to the thread pool and start execution - Status _start_scanners(const std::list& scanners); + Status _start_scanners(const std::list>& scanners); // For some conjunct there is chance to elimate cast operator // Eg. Variant's sub column could eliminate cast in storage layer if @@ -414,14 +411,13 @@ protected: std::shared_ptr _filter_dependency; - std::shared_ptr _finish_dependency; + // ScanLocalState owns the ownership of scanner, scanner context only has its weakptr + std::list> _scanners; }; template class ScanOperatorX : public OperatorX { public: - Status try_close(RuntimeState* state) override; - Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override { return OperatorXBase::prepare(state); } Status open(RuntimeState* state) override; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 700741f3f5..30022792e8 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -23,6 +23,7 @@ #include #include +#include "common/config.h" #include "common/status.h" #include "exec/operator.h" #include "pipeline.h" @@ -264,6 +265,32 @@ public: bool is_running() { return _running.load(); } void set_running(bool running) { _running = running; } + bool is_exceed_debug_timeout() { + if (_has_exceed_timeout) { + return true; + } + // If enable_debug_log_timeout_secs <= 0, then disable the log + if (_pipeline_task_watcher.elapsed_time() > + config::enable_debug_log_timeout_secs * 1000l * 1000l * 1000l) { + _has_exceed_timeout = true; + return true; + } + return false; + } + + void log_detail_if_need() { + if (config::enable_debug_log_timeout_secs < 1) { + return; + } + if (is_exceed_debug_timeout()) { + LOG(INFO) << "query id|instanceid " << print_id(_state->query_id()) << "|" + << print_id(_state->fragment_instance_id()) + << " current pipeline exceed run time " + << config::enable_debug_log_timeout_secs << " seconds. Task state " + << get_state_name(get_state()) << debug_string(); + } + } + protected: void _finish_p_dependency() { for (const auto& p : _pipeline->_parents) { @@ -278,7 +305,7 @@ protected: uint32_t _index; PipelinePtr _pipeline; bool _dependency_finish = false; - + bool _has_exceed_timeout = false; bool _prepared; bool _opened; RuntimeState* _state = nullptr; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 91980aeec7..d253c6a589 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -119,6 +119,7 @@ void BlockedTaskScheduler::_schedule() { while (iter != local_blocked_tasks.end()) { auto* task = *iter; auto state = task->get_state(); + task->log_detail_if_need(); if (state == PipelineTaskState::PENDING_FINISH) { // should cancel or should finish if (task->is_pending_finish()) { @@ -236,6 +237,7 @@ void TaskScheduler::_do_work(size_t index) { static_cast(_task_queue->push_back(task, index)); continue; } + task->log_detail_if_need(); task->set_running(true); task->set_task_queue(_task_queue.get()); auto* fragment_ctx = task->fragment_context(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 6d8b99c62f..0d32c16186 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -213,7 +213,7 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { params.__set_backend_num(req.backend_num); params.__set_fragment_instance_id(req.fragment_instance_id); params.__set_fragment_id(req.fragment_id); - exec_status.set_t_status(¶ms); + params.__set_status(exec_status.to_thrift()); params.__set_done(req.done); params.__set_query_type(req.runtime_state->query_type()); params.__set_finished_scan_ranges(req.runtime_state->num_finished_range()); diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 432f38697c..fb98d60dce 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -405,7 +405,7 @@ void BackendService::exec_plan_fragment(TExecPlanFragmentResult& return_val, const TExecPlanFragmentParams& params) { LOG(INFO) << "exec_plan_fragment() instance_id=" << print_id(params.params.fragment_instance_id) << " coord=" << params.coord << " backend#=" << params.backend_num; - start_plan_fragment_execution(params).set_t_status(&return_val); + return_val.__set_status(start_plan_fragment_execution(params).to_thrift()); } Status BackendService::start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params) { diff --git a/be/src/vec/exec/scan/pip_scanner_context.h b/be/src/vec/exec/scan/pip_scanner_context.h index 9a717ec08b..71db343840 100644 --- a/be/src/vec/exec/scan/pip_scanner_context.h +++ b/be/src/vec/exec/scan/pip_scanner_context.h @@ -30,9 +30,9 @@ public: PipScannerContext(RuntimeState* state, vectorized::VScanNode* parent, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, - const std::list& scanners, int64_t limit_, - int64_t max_bytes_in_blocks_queue, const std::vector& col_distribute_ids, - const int num_parallel_instances) + const std::list>& scanners, + int64_t limit_, int64_t max_bytes_in_blocks_queue, + const std::vector& col_distribute_ids, const int num_parallel_instances) : vectorized::ScannerContext(state, parent, output_tuple_desc, output_row_descriptor, scanners, limit_, max_bytes_in_blocks_queue, num_parallel_instances), @@ -55,6 +55,9 @@ public: std::vector merge_blocks; { std::unique_lock l(*_queue_mutexs[id]); + // The pipeline maybe wake up by scanner.done. If there are still any data + // in the queue, should read the data first and then check if the scanner.done + // if done, then eos is returned to indicate that the scan operator finished. if (_blocks_queues[id].empty()) { *eos = done(); return Status::OK(); @@ -97,9 +100,6 @@ public: return Status::OK(); } - // We should make those method lock free. - bool done() override { return _is_finished || _should_stop; } - void append_blocks_to_queue(std::vector& blocks) override { const int queue_size = _blocks_queues.size(); const int block_size = blocks.size(); @@ -277,13 +277,12 @@ public: PipXScannerContext(RuntimeState* state, ScanLocalStateBase* local_state, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, - const std::list& scanners, int64_t limit_, - int64_t max_bytes_in_blocks_queue, - std::shared_ptr dependency, - std::shared_ptr finish_dependency) + const std::list>& scanners, + int64_t limit_, int64_t max_bytes_in_blocks_queue, + std::shared_ptr dependency) : vectorized::ScannerContext(state, output_tuple_desc, output_row_descriptor, scanners, limit_, max_bytes_in_blocks_queue, 1, local_state, - dependency, finish_dependency) {} + dependency) {} Status get_block_from_queue(RuntimeState* state, vectorized::BlockUPtr* block, bool* eos, int id, bool wait = false) override { std::unique_lock l(_transfer_lock); diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 089f52cf61..c1d064c601 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -47,11 +47,11 @@ using namespace std::chrono_literals; ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, - const std::list& scanners, int64_t limit_, - int64_t max_bytes_in_blocks_queue, const int num_parallel_instances, + const std::list>& scanners, + int64_t limit_, int64_t max_bytes_in_blocks_queue, + const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state, - std::shared_ptr dependency, - std::shared_ptr finish_dependency) + std::shared_ptr dependency) : HasTaskExecutionCtx(state), _state(state), _parent(nullptr), @@ -66,11 +66,10 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) * num_parallel_instances), _scanner_scheduler(state->exec_env()->scanner_scheduler()), - _scanners(scanners), - _scanners_ref(scanners.begin(), scanners.end()), + _scanners(scanners.begin(), scanners.end()), + _all_scanners(scanners.begin(), scanners.end()), _num_parallel_instances(num_parallel_instances), - _dependency(dependency), - _finish_dependency(finish_dependency) { + _dependency(dependency) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); _query_id = _state->get_query_ctx()->query_id(); @@ -98,8 +97,9 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VScanNode* parent, const doris::TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, - const std::list& scanners, int64_t limit_, - int64_t max_bytes_in_blocks_queue, const int num_parallel_instances, + const std::list>& scanners, + int64_t limit_, int64_t max_bytes_in_blocks_queue, + const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state) : HasTaskExecutionCtx(state), _state(state), @@ -115,8 +115,8 @@ ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VS _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) * num_parallel_instances), _scanner_scheduler(state->exec_env()->scanner_scheduler()), - _scanners(scanners), - _scanners_ref(scanners.begin(), scanners.end()), + _scanners(scanners.begin(), scanners.end()), + _all_scanners(scanners.begin(), scanners.end()), _num_parallel_instances(num_parallel_instances) { DCHECK(_output_row_descriptor == nullptr || _output_row_descriptor->tuple_descriptors().size() == 1); @@ -192,10 +192,6 @@ Status ScannerContext::init() { } #endif - // 4. This ctx will be submitted to the scanner scheduler right after init. - // So set _num_scheduling_ctx to 1 here. - _num_scheduling_ctx = 1; - _num_unfinished_scanners = _scanners.size(); if (_parent) { @@ -288,11 +284,9 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo bool is_scheduled = false; if (!done() && to_be_schedule && _num_running_scanners == 0) { is_scheduled = true; - auto state = _scanner_scheduler->submit(shared_from_this()); - if (state.ok()) { - _num_scheduling_ctx++; - } else { - set_status_on_error(state, false); + auto submit_status = _scanner_scheduler->submit(shared_from_this()); + if (!submit_status.ok()) { + set_status_on_error(submit_status, false); } } @@ -384,60 +378,36 @@ Status ScannerContext::validate_block_schema(Block* block) { return Status::OK(); } -void ScannerContext::set_should_stop() { - std::lock_guard l(_transfer_lock); - _should_stop = true; - _set_scanner_done(); - for (const VScannerWPtr& scanner : _scanners_ref) { - if (VScannerSPtr sc = scanner.lock()) { - sc->try_stop(); - } - } - _blocks_queue_added_cv.notify_one(); - set_ready_to_finish(); -} - void ScannerContext::inc_num_running_scanners(int32_t inc) { std::lock_guard l(_transfer_lock); _num_running_scanners += inc; } -void ScannerContext::dec_num_scheduling_ctx() { - std::lock_guard l(_transfer_lock); - _num_scheduling_ctx--; - set_ready_to_finish(); - if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) { - _ctx_finish_cv.notify_one(); - } -} - -void ScannerContext::set_ready_to_finish() { - // `_should_stop == true` means this task has already ended and wait for pending finish now. - if (_finish_dependency && done() && _num_running_scanners == 0 && _num_scheduling_ctx == 0) { - _finish_dependency->set_ready(); - } -} - -bool ScannerContext::set_status_on_error(const Status& status, bool need_lock) { +void ScannerContext::set_status_on_error(const Status& status, bool need_lock) { std::unique_lock l(_transfer_lock, std::defer_lock); if (need_lock) { l.lock(); } if (this->status().ok()) { _process_status = status; - _status_error = true; _blocks_queue_added_cv.notify_one(); _should_stop = true; _state->get_query_ctx()->set_exec_status(_process_status); _set_scanner_done(); - return true; } - return false; } -template -Status ScannerContext::_close_and_clear_scanners(Parent* parent, RuntimeState* state) { - std::unique_lock l(_scanners_lock); +void ScannerContext::stop_scanners(RuntimeState* state) { + std::unique_lock l(_transfer_lock); + _should_stop = true; + _set_scanner_done(); + for (const std::weak_ptr& scanner : _all_scanners) { + if (std::shared_ptr sc = scanner.lock()) { + sc->_scanner->try_stop(); + } + } + _blocks_queue.clear(); + // TODO yiguolei, call mark close to scanners if (state->enable_profile()) { std::stringstream scanner_statistics; std::stringstream scanner_rows_read; @@ -445,76 +415,38 @@ Status ScannerContext::_close_and_clear_scanners(Parent* parent, RuntimeState* s scanner_statistics << "["; scanner_rows_read << "["; scanner_wait_worker_time << "["; - for (auto finished_scanner_time : _finished_scanner_runtime) { - scanner_statistics << PrettyPrinter::print(finished_scanner_time, TUnit::TIME_NS) - << ", "; - } - for (auto finished_scanner_rows : _finished_scanner_rows_read) { - scanner_rows_read << PrettyPrinter::print(finished_scanner_rows, TUnit::UNIT) << ", "; - } - for (auto finished_scanner_wait_time : _finished_scanner_wait_worker_time) { - scanner_wait_worker_time - << PrettyPrinter::print(finished_scanner_wait_time, TUnit::TIME_NS) << ", "; - } - // Only unfinished scanners here - for (auto& scanner : _scanners) { - // Scanners are in ObjPool in ScanNode, - // so no need to delete them here. + // Scanners can in 3 state + // state 1: in scanner context, not scheduled + // state 2: in scanner worker pool's queue, scheduled but not running + // state 3: scanner is running. + for (auto& scanner_ref : _all_scanners) { + auto scanner = scanner_ref.lock(); + if (scanner == nullptr) { + continue; + } // Add per scanner running time before close them - scanner_statistics << PrettyPrinter::print(scanner->get_time_cost_ns(), TUnit::TIME_NS) + scanner_statistics << PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(), + TUnit::TIME_NS) << ", "; - scanner_rows_read << PrettyPrinter::print(scanner->get_rows_read(), TUnit::UNIT) + scanner_rows_read << PrettyPrinter::print(scanner->_scanner->get_rows_read(), + TUnit::UNIT) << ", "; scanner_wait_worker_time - << PrettyPrinter::print(scanner->get_scanner_wait_worker_timer(), + << PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(), TUnit::TIME_NS) << ", "; + // since there are all scanners, some scanners is running, so that could not call scanner + // close here. } scanner_statistics << "]"; scanner_rows_read << "]"; scanner_wait_worker_time << "]"; - parent->scanner_profile()->add_info_string("PerScannerRunningTime", - scanner_statistics.str()); - parent->scanner_profile()->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); - parent->scanner_profile()->add_info_string("PerScannerWaitTime", - scanner_wait_worker_time.str()); + _scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str()); + _scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str()); + _scanner_profile->add_info_string("PerScannerWaitTime", scanner_wait_worker_time.str()); } - // Only unfinished scanners here - for (auto& scanner : _scanners) { - static_cast(scanner->close(state)); - // Scanners are in ObjPool in ScanNode, - // so no need to delete them here. - } - _scanners.clear(); - return Status::OK(); -} -template -void ScannerContext::clear_and_join(Parent* parent, RuntimeState* state) { - std::unique_lock l(_transfer_lock); - do { - if (_num_running_scanners == 0 && _num_scheduling_ctx == 0) { - break; - } else { - DCHECK(!state->enable_pipeline_exec()) - << " _num_running_scanners: " << _num_running_scanners - << " _num_scheduling_ctx: " << _num_scheduling_ctx; - while (!(_num_running_scanners == 0 && _num_scheduling_ctx == 0)) { - _ctx_finish_cv.wait(l); - } - break; - } - } while (false); - // Must wait all running scanners stop running. - // So that we can make sure to close all scanners. - static_cast(_close_and_clear_scanners(parent, state)); - - _blocks_queue.clear(); -} - -bool ScannerContext::no_schedule() { - std::unique_lock l(_transfer_lock); - return _num_running_scanners == 0 && _num_scheduling_ctx == 0; + _blocks_queue_added_cv.notify_one(); } void ScannerContext::_set_scanner_done() { @@ -527,12 +459,13 @@ std::string ScannerContext::debug_string() { return fmt::format( "id: {}, sacnners: {}, blocks in queue: {}," " status: {}, _should_stop: {}, _is_finished: {}, free blocks: {}," - " limit: {}, _num_running_scanners: {}, _num_scheduling_ctx: {}, _max_thread_num: {}," - " _block_per_scanner: {}, _cur_bytes_in_queue: {}, MAX_BYTE_OF_QUEUE: {}", + " limit: {}, _num_running_scanners: {}, _max_thread_num: {}," + " _block_per_scanner: {}, _cur_bytes_in_queue: {}, MAX_BYTE_OF_QUEUE: {}, " + "num_ctx_scheduled: {}, query_id: {}", ctx_id, _scanners.size(), _blocks_queue.size(), status().ok(), _should_stop, - _is_finished, _free_blocks.size_approx(), limit, _num_running_scanners, - _num_scheduling_ctx, _max_thread_num, _block_per_scanner, _cur_bytes_in_queue, - _max_bytes_in_queue); + _is_finished, _free_blocks.size_approx(), limit, _num_running_scanners, _max_thread_num, + _block_per_scanner, _cur_bytes_in_queue, _max_bytes_in_queue, num_ctx_scheduled(), + print_id(_query_id)); } void ScannerContext::reschedule_scanner_ctx() { @@ -540,84 +473,71 @@ void ScannerContext::reschedule_scanner_ctx() { if (done()) { return; } - auto state = _scanner_scheduler->submit(shared_from_this()); + auto submit_status = _scanner_scheduler->submit(shared_from_this()); //todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times? - if (state.ok()) { - _num_scheduling_ctx++; - } else { - set_status_on_error(state, false); + if (!submit_status.ok()) { + set_status_on_error(submit_status, false); } } -void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { - { - std::unique_lock l(_scanners_lock); - _scanners.push_front(scanner); - } +void ScannerContext::push_back_scanner_and_reschedule(std::shared_ptr scanner) { std::lock_guard l(_transfer_lock); - - // In pipeline engine, doris will close scanners when `no_schedule`. - // We have to decrease _num_running_scanners before schedule, otherwise - // schedule does not woring due to _num_running_scanners. - _num_running_scanners--; - set_ready_to_finish(); - - if (!done() && should_be_scheduled()) { - auto state = _scanner_scheduler->submit(shared_from_this()); - if (state.ok()) { - _num_scheduling_ctx++; + // Use a transfer lock to avoid the scanner be scheduled concurrently. For example, that after + // calling "_scanners.push_front(scanner)", there may be other ctx in scheduler + // to schedule that scanner right away, and in that schedule run, the scanner may be marked as closed + // before we call the following if() block. + //LOG(INFO) << "yyyy one scanner finished " << debug_string(); + { + --_num_running_scanners; + if (scanner->_scanner->need_to_close()) { + --_num_unfinished_scanners; + if (_num_unfinished_scanners == 0) { + _dispose_coloate_blocks_not_in_queue(); + _is_finished = true; + _set_scanner_done(); + _blocks_queue_added_cv.notify_one(); + return; + } } else { - set_status_on_error(state, false); + _scanners.push_front(scanner); } } - // Notice that after calling "_scanners.push_front(scanner)", there may be other ctx in scheduler - // to schedule that scanner right away, and in that schedule run, the scanner may be marked as closed - // before we call the following if() block. - // So we need "scanner->set_counted_down()" to avoid "_num_unfinished_scanners" being decreased twice by - // same scanner. - if (scanner->need_to_close() && scanner->set_counted_down() && - (--_num_unfinished_scanners) == 0) { - _dispose_coloate_blocks_not_in_queue(); - _is_finished = true; - _set_scanner_done(); - _blocks_queue_added_cv.notify_one(); + if (should_be_scheduled()) { + auto submit_status = _scanner_scheduler->submit(shared_from_this()); + if (!submit_status.ok()) { + set_status_on_error(submit_status, false); + } } - _ctx_finish_cv.notify_one(); } -void ScannerContext::get_next_batch_of_scanners(std::list* current_run) { +// This method is called in scanner scheduler, and task context is hold +void ScannerContext::get_next_batch_of_scanners( + std::list>* current_run) { + std::lock_guard l(_transfer_lock); + // Update the sched counter for profile + Defer defer {[&]() { _scanner_sched_counter->update(current_run->size()); }}; // 1. Calculate how many scanners should be scheduled at this run. - int thread_slot_num = 0; - { - // If there are enough space in blocks queue, - // the scanner number depends on the _free_blocks numbers - thread_slot_num = get_available_thread_slot_num(); - } + // If there are enough space in blocks queue, + // the scanner number depends on the _free_blocks numbers + int thread_slot_num = get_available_thread_slot_num(); // 2. get #thread_slot_num scanners from ctx->scanners // and put them into "this_run". - { - std::unique_lock l(_scanners_lock); - for (int i = 0; i < thread_slot_num && !_scanners.empty();) { - VScannerSPtr scanner = _scanners.front(); - _scanners.pop_front(); - if (scanner->need_to_close()) { - _finished_scanner_runtime.push_back(scanner->get_time_cost_ns()); - _finished_scanner_rows_read.push_back(scanner->get_rows_read()); - _finished_scanner_wait_worker_time.push_back( - scanner->get_scanner_wait_worker_timer()); - static_cast(scanner->close(_state)); - } else { - current_run->push_back(scanner); - i++; - } + for (int i = 0; i < thread_slot_num && !_scanners.empty();) { + std::weak_ptr scanner_ref = _scanners.front(); + std::shared_ptr scanner = scanner_ref.lock(); + _scanners.pop_front(); + if (scanner == nullptr) { + continue; + } + if (scanner->_scanner->need_to_close()) { + static_cast(scanner->_scanner->close(_state)); + } else { + current_run->push_back(scanner_ref); + i++; } } } -template void ScannerContext::clear_and_join(pipeline::ScanLocalStateBase* parent, - RuntimeState* state); -template void ScannerContext::clear_and_join(VScanNode* parent, RuntimeState* state); - } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 8e840a4746..c4df896bac 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -53,6 +53,7 @@ class TaskGroup; namespace vectorized { class VScanner; +class ScannerDelegate; class VScanNode; class ScannerScheduler; class SimplifiedScanScheduler; @@ -72,7 +73,7 @@ class ScannerContext : public std::enable_shared_from_this, public: ScannerContext(RuntimeState* state, VScanNode* parent, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, - const std::list& scanners, int64_t limit_, + const std::list>& scanners, int64_t limit_, int64_t max_bytes_in_blocks_queue, const int num_parallel_instances = 1, pipeline::ScanLocalStateBase* local_state = nullptr); @@ -94,9 +95,9 @@ public: // When a scanner complete a scan, this method will be called // to return the scanner to the list for next scheduling. - void push_back_scanner_and_reschedule(VScannerSPtr scanner); + void push_back_scanner_and_reschedule(std::shared_ptr scanner); - bool set_status_on_error(const Status& status, bool need_lock = true); + void set_status_on_error(const Status& status, bool need_lock = true); Status status() { if (_process_status.is()) { @@ -105,42 +106,27 @@ public: return _process_status; } - // Called by ScanNode. - // Used to notify the scheduler that this ScannerContext can stop working. - void set_should_stop(); - // Return true if this ScannerContext need no more process - virtual bool done() { return _is_finished || _should_stop; } + bool done() const { return _is_finished || _should_stop; } bool is_finished() { return _is_finished.load(); } bool should_stop() { return _should_stop.load(); } - bool status_error() { return _status_error.load(); } void inc_num_running_scanners(int32_t scanner_inc); - void set_ready_to_finish(); - int get_num_running_scanners() const { return _num_running_scanners; } int get_num_unfinished_scanners() const { return _num_unfinished_scanners; } - void dec_num_scheduling_ctx(); - - int get_num_scheduling_ctx() const { return _num_scheduling_ctx; } - - void get_next_batch_of_scanners(std::list* current_run); - - template - void clear_and_join(Parent* parent, RuntimeState* state); - - bool no_schedule(); + void get_next_batch_of_scanners(std::list>* current_run); virtual std::string debug_string(); RuntimeState* state() { return _state; } void incr_num_ctx_scheduling(int64_t num) { _scanner_ctx_sched_counter->update(num); } + + int64_t num_ctx_scheduled() { return _scanner_ctx_sched_counter->value(); } void incr_ctx_scheduling_time(int64_t num) { _scanner_ctx_sched_time->update(num); } - void incr_num_scanner_scheduling(int64_t num) { _scanner_sched_counter->update(num); } std::string parent_name(); @@ -172,6 +158,7 @@ public: SimplifiedScanScheduler* get_simple_scan_scheduler() { return _simple_scan_scheduler; } virtual void reschedule_scanner_ctx(); + void stop_scanners(RuntimeState* state); // the unique id of this context std::string ctx_id; @@ -181,18 +168,13 @@ public: bool _should_reset_thread_name = true; -private: - template - Status _close_and_clear_scanners(Parent* parent, RuntimeState* state); - protected: ScannerContext(RuntimeState* state_, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, - const std::list& scanners_, int64_t limit_, + const std::list>& scanners_, int64_t limit_, int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state, - std::shared_ptr dependency, - std::shared_ptr finish_dependency); + std::shared_ptr dependency); virtual void _dispose_coloate_blocks_not_in_queue() {} void _set_scanner_done(); @@ -232,7 +214,6 @@ protected: // Always be set by ScannerScheduler. // True means all scanners are finished to scan. Status _process_status; - std::atomic_bool _status_error = false; std::atomic_bool _should_stop = false; std::atomic_bool _is_finished = false; @@ -276,9 +257,11 @@ protected: // and then if the scanner is not finished, will be pushed back to this list. // Not need to protect by lock, because only one scheduler thread will access to it. std::mutex _scanners_lock; - std::list _scanners; + // Scanner's ownership belong to vscannode or scanoperator, scanner context does not own it. + // ScannerContext has to check if scanner is deconstructed before use it. + std::list> _scanners; // weak pointer for _scanners, used in stop function - std::vector _scanners_ref; + std::vector> _all_scanners; std::vector _finished_scanner_runtime; std::vector _finished_scanner_rows_read; std::vector _finished_scanner_wait_worker_time; @@ -295,7 +278,6 @@ protected: RuntimeProfile::Counter* _scanner_wait_batch_timer = nullptr; std::shared_ptr _dependency = nullptr; - std::shared_ptr _finish_dependency = nullptr; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 29b53b3935..4223bd7abb 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -143,6 +143,8 @@ Status ScannerScheduler::submit(std::shared_ptr ctx) { if (ctx->done()) { return Status::EndOfFile("ScannerContext is done"); } + //LOG(WARNING) << "yyyy " << Status::InternalError("Too many scheduled"); + //LOG(WARNING) << "yyyy " << ctx->debug_string(); ctx->queue_idx = (_queue_idx++ % QUEUE_NUM); if (!_pending_queues[ctx->queue_idx]->blocking_put(ctx)) { return Status::InternalError("failed to submit scanner context to scheduler"); @@ -174,28 +176,23 @@ void ScannerScheduler::_schedule_thread(int queue_id) { void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { auto task_lock = ctx->task_exec_ctx(); if (task_lock == nullptr) { - // LOG(WARNING) << "could not lock task execution context, query " << print_id(_query_id) - // << " maybe finished"; + LOG(WARNING) << "could not lock task execution context, query " << ctx->debug_string() + << " maybe finished"; return; } + //LOG(INFO) << "yyyy scheduled, query " << ctx->debug_string() << " maybe finished"; MonotonicStopWatch watch; watch.reset(); watch.start(); ctx->incr_num_ctx_scheduling(1); - size_t size = 0; - Defer defer {[&]() { - ctx->incr_num_scanner_scheduling(size); - ctx->dec_num_scheduling_ctx(); - }}; if (ctx->done()) { return; } - std::list this_run; + std::list> this_run; ctx->get_next_batch_of_scanners(&this_run); - size = this_run.size(); - if (!size) { + if (this_run.empty()) { // There will be 2 cases when this_run is empty: // 1. The blocks queue reaches limit. // The consumer will continue scheduling the ctx. @@ -214,9 +211,14 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { if (ctx->thread_token != nullptr) { // TODO llj tg how to treat this? while (iter != this_run.end()) { - (*iter)->start_wait_worker_timer(); - auto s = ctx->thread_token->submit_func( - [this, scanner = *iter, ctx] { this->_scanner_scan(this, ctx, scanner); }); + std::shared_ptr scanner_delegate = (*iter).lock(); + if (scanner_delegate == nullptr) { + continue; + } + scanner_delegate->_scanner->start_wait_worker_timer(); + auto s = ctx->thread_token->submit_func([this, scanner_ref = *iter, ctx]() { + this->_scanner_scan(this, ctx, scanner_ref); + }); if (s.ok()) { this_run.erase(iter++); } else { @@ -226,28 +228,32 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { } } else { while (iter != this_run.end()) { - (*iter)->start_wait_worker_timer(); - TabletStorageType type = (*iter)->get_storage_type(); + std::shared_ptr scanner_delegate = (*iter).lock(); + if (scanner_delegate == nullptr) { + continue; + } + scanner_delegate->_scanner->start_wait_worker_timer(); + TabletStorageType type = scanner_delegate->_scanner->get_storage_type(); bool ret = false; if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { if (auto* scan_sche = ctx->get_simple_scan_scheduler()) { - auto work_func = [this, scanner = *iter, ctx] { - this->_scanner_scan(this, ctx, scanner); + auto work_func = [this, scanner_ref = *iter, ctx]() { + this->_scanner_scan(this, ctx, scanner_ref); }; SimplifiedScanTask simple_scan_task = {work_func, ctx}; ret = scan_sche->get_scan_queue()->try_put(simple_scan_task); } else { PriorityThreadPool::Task task; - task.work_function = [this, scanner = *iter, ctx] { - this->_scanner_scan(this, ctx, scanner); + task.work_function = [this, scanner_ref = *iter, ctx]() { + this->_scanner_scan(this, ctx, scanner_ref); }; task.priority = nice; ret = _local_scan_thread_pool->offer(task); } } else { PriorityThreadPool::Task task; - task.work_function = [this, scanner = *iter, ctx] { - this->_scanner_scan(this, ctx, scanner); + task.work_function = [this, scanner_ref = *iter, ctx]() { + this->_scanner_scan(this, ctx, scanner_ref); }; task.priority = nice; ret = _remote_scan_thread_pool->offer(task); @@ -265,13 +271,22 @@ void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { } void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, - std::shared_ptr ctx, VScannerSPtr scanner) { + std::shared_ptr ctx, + std::weak_ptr scanner_ref) { auto task_lock = ctx->task_exec_ctx(); if (task_lock == nullptr) { // LOG(WARNING) << "could not lock task execution context, query " << print_id(_query_id) // << " maybe finished"; return; } + //LOG_EVERY_N(INFO, 100) << "start running scanner from ctx " << ctx->debug_string(); + // will release scanner if it is the last one, task lock is hold here, to ensure + // that scanner could call scannode's method during deconstructor + std::shared_ptr scanner_delegate = scanner_ref.lock(); + auto& scanner = scanner_delegate->_scanner; + if (scanner_delegate == nullptr) { + return; + } SCOPED_ATTACH_TASK(scanner->runtime_state()); // for cpu hard limit, thread name should not be reset if (ctx->_should_reset_thread_name) { @@ -404,7 +419,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, if (eos || should_stop) { scanner->mark_to_need_to_close(); } - ctx->push_back_scanner_and_reschedule(scanner); + ctx->push_back_scanner_and_reschedule(scanner_delegate); } void ScannerScheduler::_register_metrics() { diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index eb4d1380e3..9fedd27dbd 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -36,7 +36,7 @@ class BlockingQueue; } // namespace doris namespace doris::vectorized { - +class ScannerDelegate; class ScannerContext; // Responsible for the scheduling and execution of all Scanners of a BE node. @@ -79,7 +79,7 @@ private: void _schedule_scanners(std::shared_ptr ctx); // execution thread function void _scanner_scan(ScannerScheduler* scheduler, std::shared_ptr ctx, - VScannerSPtr scanner); + std::weak_ptr scanner); void _register_metrics(); diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index c45468e051..88f44edfdb 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -197,9 +197,13 @@ Status VScanNode::alloc_resource(RuntimeState* state) { if (_scanner_ctx) { DCHECK(!_eos && _num_scanners->value() > 0); RETURN_IF_ERROR(_scanner_ctx->init()); + //LOG(INFO) << "yyyy instance " << print_id(state->fragment_instance_id()) + // << " submit scanner ctx " << _scanner_ctx->debug_string(); RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx)); } if (_shared_scan_opt) { + LOG(INFO) << "instance shared scan enabled" + << print_id(state->fragment_instance_id()); _shared_scanner_controller->set_scanner_context(id(), _eos ? nullptr : _scanner_ctx); } @@ -273,7 +277,7 @@ Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* reached_limit(block, eos); if (*eos) { // reach limit, stop the scanners. - _scanner_ctx->set_should_stop(); + _scanner_ctx->stop_scanners(state); } return Status::OK(); @@ -318,8 +322,8 @@ Status VScanNode::_init_profile() { return Status::OK(); } -Status VScanNode::_start_scanners(const std::list& scanners, - const int query_parallel_instance_num) { +void VScanNode::_start_scanners(const std::list>& scanners, + const int query_parallel_instance_num) { if (_is_pipeline_scan) { int max_queue_size = _shared_scan_opt ? std::max(query_parallel_instance_num, 1) : 1; _scanner_ctx = pipeline::PipScannerContext::create_shared( @@ -330,41 +334,29 @@ Status VScanNode::_start_scanners(const std::list& scanners, _output_row_descriptor.get(), scanners, limit(), _state->scan_queue_mem_limit()); } - return Status::OK(); } Status VScanNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); } + RETURN_IF_ERROR(ExecNode::close(state)); return Status::OK(); } void VScanNode::release_resource(RuntimeState* state) { if (_scanner_ctx) { - if (!state->enable_pipeline_exec()) { + if (!state->enable_pipeline_exec() || _should_create_scanner) { // stop and wait the scanner scheduler to be done // _scanner_ctx may not be created for some short circuit case. - _scanner_ctx->set_should_stop(); - _scanner_ctx->clear_and_join(this, state); - } else if (_should_create_scanner) { - _scanner_ctx->clear_and_join(this, state); + _scanner_ctx->stop_scanners(state); } } - + _scanners.clear(); ExecNode::release_resource(state); } -Status VScanNode::try_close(RuntimeState* state) { - if (_scanner_ctx) { - // mark this scanner ctx as should_stop to make sure scanners will not be scheduled anymore - // TODO: there is a lock in `set_should_stop` may cause some slight impact - _scanner_ctx->set_should_stop(); - } - return Status::OK(); -} - Status VScanNode::_normalize_conjuncts() { // The conjuncts is always on output tuple, so use _output_tuple_desc; std::vector slots = _output_tuple_desc->slots(); @@ -1330,11 +1322,15 @@ VScanNode::PushDownType VScanNode::_should_push_down_in_predicate(VInPredicate* Status VScanNode::_prepare_scanners(const int query_parallel_instance_num) { std::list scanners; RETURN_IF_ERROR(_init_scanners(&scanners)); + // Init scanner wrapper + for (auto it = scanners.begin(); it != scanners.end(); ++it) { + _scanners.emplace_back(std::make_shared(*it)); + } if (scanners.empty()) { _eos = true; } else { COUNTER_SET(_num_scanners, static_cast(scanners.size())); - RETURN_IF_ERROR(_start_scanners(scanners, query_parallel_instance_num)); + _start_scanners(_scanners, query_parallel_instance_num); } return Status::OK(); } diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index 7cd8b15991..1d54df4f4b 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -87,6 +87,21 @@ struct FilterPredicates { std::vector>> in_filters; }; +// We want to close scanner automatically, so using a delegate class +// and call close method in the delegate class's dctor. +class ScannerDelegate { +public: + VScannerSPtr _scanner; + ScannerDelegate(VScannerSPtr& scanner_ptr) : _scanner(scanner_ptr) {} + ~ScannerDelegate() { + Status st = _scanner->close(_scanner->runtime_state()); + if (!st.ok()) { + LOG(WARNING) << "close scanner failed, st = " << st; + } + } + ScannerDelegate(ScannerDelegate&&) = delete; +}; + class VScanNode : public ExecNode, public RuntimeFilterConsumer { public: VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) @@ -156,8 +171,6 @@ public: Status alloc_resource(RuntimeState* state) override; void release_resource(RuntimeState* state) override; - Status try_close(RuntimeState* state); - bool should_run_serial() const { return _should_run_serial || _state->enable_scan_node_run_serial(); } @@ -276,8 +289,11 @@ protected: int _max_scan_key_num; int _max_pushdown_conditions_per_column; - // Each scan node will generates a ScannerContext to manage all Scanners. - // See comments of ScannerContext for more details + // ScanNode owns the ownership of scanner, scanner context only has its weakptr + std::list> _scanners; + + // Each scan node will generates a ScannerContext to do schedule work + // ScannerContext will be added to scanner scheduler std::shared_ptr _scanner_ctx = nullptr; // indicate this scan node has no more data to return @@ -451,8 +467,8 @@ private: const std::string& fn_name, int slot_ref_child = -1); // Submit the scanner to the thread pool and start execution - Status _start_scanners(const std::list& scanners, - const int query_parallel_instance_num); + void _start_scanners(const std::list>& scanners, + const int query_parallel_instance_num); }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h index b00c3f348f..9fdaddcea2 100644 --- a/be/src/vec/exec/scan/vscanner.h +++ b/be/src/vec/exec/scan/vscanner.h @@ -148,16 +148,6 @@ public: void set_status_on_failure(const Status& st) { _status = st; } - // return false if _is_counted_down is already true, - // otherwise, set _is_counted_down to true and return true. - bool set_counted_down() { - if (_is_counted_down) { - return false; - } - _is_counted_down = true; - return true; - } - protected: void _discard_conjuncts() { for (auto& conjunct : _conjuncts) { @@ -221,8 +211,6 @@ protected: int64_t _scan_cpu_timer = 0; bool _is_load = false; - // set to true after decrease the "_num_unfinished_scanners" in scanner context - bool _is_counted_down = false; bool _is_init = true; @@ -233,6 +221,5 @@ protected: }; using VScannerSPtr = std::shared_ptr; -using VScannerWPtr = std::weak_ptr; } // namespace doris::vectorized diff --git a/regression-test/pipeline/p0/conf/be.conf b/regression-test/pipeline/p0/conf/be.conf index 2a25830a08..0aca39d4f1 100644 --- a/regression-test/pipeline/p0/conf/be.conf +++ b/regression-test/pipeline/p0/conf/be.conf @@ -77,3 +77,5 @@ enable_feature_binlog=true max_sys_mem_available_low_water_mark_bytes=69206016 user_files_secure_path=/ enable_debug_points=true +# debug scanner context dead loop +enable_debug_log_timeout_secs=300