diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 95934d9599..2c50928ec4 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -206,9 +206,9 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink ? params.send_query_statistics_with_every_batch : false; // TODO: figure out good buffer size based on size of output row - sink->reset(new vectorized::VDataStreamSender(state, pool, local_params.sender_id, row_desc, - thrift_sink.stream_sink, params.destinations, - send_query_statistics_with_every_batch)); + *sink = std::make_unique( + state, pool, local_params.sender_id, row_desc, thrift_sink.stream_sink, + params.destinations, send_query_statistics_with_every_batch); // RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink)); break; } diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index e910eaffa8..c9a1283ede 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -262,8 +262,7 @@ ColumnMapping* BlockChanger::get_mutable_column_mapping(size_t column_index) { Status BlockChanger::change_block(vectorized::Block* ref_block, vectorized::Block* new_block) const { - ObjectPool pool; - RuntimeState* state = pool.add(RuntimeState::create_unique().release()); + std::unique_ptr state = RuntimeState::create_unique(); state->set_desc_tbl(&_desc_tbl); state->set_be_exec_version(_fe_compatible_version); RowDescriptor row_desc = @@ -272,8 +271,8 @@ Status BlockChanger::change_block(vectorized::Block* ref_block, if (_where_expr != nullptr) { vectorized::VExprContextSPtr ctx = nullptr; RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*_where_expr, ctx)); - RETURN_IF_ERROR(ctx->prepare(state, row_desc)); - RETURN_IF_ERROR(ctx->open(state)); + RETURN_IF_ERROR(ctx->prepare(state.get(), row_desc)); + RETURN_IF_ERROR(ctx->open(state.get())); RETURN_IF_ERROR( vectorized::VExprContext::filter_block(ctx.get(), ref_block, ref_block->columns())); @@ -288,8 +287,8 @@ Status BlockChanger::change_block(vectorized::Block* ref_block, if (_schema_mapping[idx].expr != nullptr) { vectorized::VExprContextSPtr ctx; RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*_schema_mapping[idx].expr, ctx)); - RETURN_IF_ERROR(ctx->prepare(state, row_desc)); - RETURN_IF_ERROR(ctx->open(state)); + RETURN_IF_ERROR(ctx->prepare(state.get(), row_desc)); + RETURN_IF_ERROR(ctx->open(state.get())); int result_column_id = -1; RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id)); diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index dcc1b3d85e..a75366e989 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -99,8 +99,8 @@ std::string ScanOperator::debug_string() const { } template -ScanLocalState::ScanLocalState(RuntimeState* state_, OperatorXBase* parent_) - : ScanLocalStateBase(state_, parent_) {} +ScanLocalState::ScanLocalState(RuntimeState* state, OperatorXBase* parent) + : ScanLocalStateBase(state, parent) {} template bool ScanLocalState::ready_to_read() { @@ -171,7 +171,7 @@ Status ScanLocalState::open(RuntimeState* state) { _finish_dependency->block(); DCHECK(!_eos && _num_scanners->value() > 0); RETURN_IF_ERROR(_scanner_ctx->init()); - RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get())); + RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx)); } _opened = true; return status; @@ -1420,7 +1420,7 @@ Status ScanOperatorX::open(RuntimeState* state) { template Status ScanOperatorX::try_close(RuntimeState* state) { auto& local_state = get_local_state(state); - if (local_state._scanner_ctx.get()) { + if (local_state._scanner_ctx) { // mark this scanner ctx as should_stop to make sure scanners will not be scheduled anymore // TODO: there is a lock in `set_should_stop` may cause some slight impact local_state._scanner_ctx->set_should_stop(); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 1709c9c703..7957983366 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -189,7 +189,9 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, PipelinePtr PipelineFragmentContext::add_pipeline() { // _prepared、_submitted, _canceled should do not add pipeline PipelineId id = _next_pipeline_id++; - auto pipeline = std::make_shared(id, _num_instances, weak_from_this()); + auto pipeline = std::make_shared( + id, _num_instances, + std::dynamic_pointer_cast(shared_from_this())); _pipelines.emplace_back(pipeline); return pipeline; } @@ -197,7 +199,9 @@ PipelinePtr PipelineFragmentContext::add_pipeline() { PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) { // _prepared、_submitted, _canceled should do not add pipeline PipelineId id = _next_pipeline_id++; - auto pipeline = std::make_shared(id, _num_instances, weak_from_this()); + auto pipeline = std::make_shared( + id, _num_instances, + std::dynamic_pointer_cast(shared_from_this())); if (idx >= 0) { _pipelines.insert(_pipelines.begin() + idx, pipeline); } else { @@ -213,7 +217,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re return Status::InternalError("Already prepared"); } const auto& local_params = request.local_params[idx]; - _runtime_profile.reset(new RuntimeProfile("PipelineContext")); + _runtime_profile = std::make_unique("PipelineContext"); _start_timer = ADD_TIMER(_runtime_profile, "StartTime"); COUNTER_UPDATE(_start_timer, _fragment_watcher.elapsed_time()); _prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime"); @@ -231,6 +235,8 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re if (local_params.__isset.runtime_filter_params) { _runtime_state->set_runtime_filter_params(local_params.runtime_filter_params); } + + _runtime_state->set_task_execution_context(shared_from_this()); _runtime_state->set_query_ctx(_query_ctx.get()); _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); @@ -885,7 +891,8 @@ void PipelineFragmentContext::_close_fragment_instance() { _fragment_watcher.elapsed_time()); static_cast(send_report(true)); // all submitted tasks done - _exec_env->fragment_mgr()->remove_pipeline_context(shared_from_this()); + _exec_env->fragment_mgr()->remove_pipeline_context( + std::dynamic_pointer_cast(shared_from_this())); } void PipelineFragmentContext::close_a_pipeline() { @@ -935,7 +942,7 @@ Status PipelineFragmentContext::send_report(bool done) { std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1, std::placeholders::_2), _dml_query_statistics()}, - shared_from_this()); + std::dynamic_pointer_cast(shared_from_this())); } bool PipelineFragmentContext::_has_inverted_index_or_partial_update(TOlapTableSink sink) { diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 0800da22ad..99aac70b1e 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -36,6 +36,7 @@ #include "pipeline/pipeline_task.h" #include "runtime/query_context.h" #include "runtime/runtime_state.h" +#include "runtime/task_execution_context.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" @@ -50,7 +51,7 @@ class TPipelineFragmentParams; namespace pipeline { -class PipelineFragmentContext : public std::enable_shared_from_this { +class PipelineFragmentContext : public TaskExecutionContext { public: // Callback to report execution status of plan fragment. // 'profile' is the cumulative profile, 'done' indicates whether the execution diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index c166195729..0ac4bc2bd8 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -475,10 +475,11 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( auto fragment_instance_id = local_params.fragment_instance_id; _fragment_instance_ids.push_back(fragment_instance_id); std::unique_ptr runtime_filter_mgr; - auto set_runtime_state = [&](std::unique_ptr& runtime_state) { + auto init_runtime_state = [&](std::unique_ptr& runtime_state) { runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); runtime_state->set_query_ctx(_query_ctx.get()); + runtime_state->set_task_execution_context(shared_from_this()); runtime_state->set_be_number(local_params.backend_num); if (request.__isset.backend_id) { @@ -567,7 +568,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( request.fragment_id, request.query_options, _query_ctx->query_globals, _exec_env)); auto& task_runtime_state = _task_runtime_states.back(); - set_runtime_state(task_runtime_state); + init_runtime_state(task_runtime_state); auto cur_task_id = _total_tasks++; auto task = std::make_unique( pipeline, cur_task_id, get_task_runtime_state(cur_task_id), this, @@ -1270,7 +1271,8 @@ void PipelineXFragmentContext::_close_fragment_instance() { _runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time()); static_cast(send_report(true)); // all submitted tasks done - _exec_env->fragment_mgr()->remove_pipeline_context(shared_from_this()); + _exec_env->fragment_mgr()->remove_pipeline_context( + std::dynamic_pointer_cast(shared_from_this())); } Status PipelineXFragmentContext::send_report(bool done) { @@ -1307,7 +1309,7 @@ Status PipelineXFragmentContext::send_report(bool done) { std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1, std::placeholders::_2), nullptr}, - shared_from_this()); + std::dynamic_pointer_cast(shared_from_this())); } bool PipelineXFragmentContext::_has_inverted_index_or_partial_update(TOlapTableSink sink) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index b18d5b82d6..81672ac465 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -210,7 +210,6 @@ private: void clear() { _build_side_pipelines.clear(); } } _pipeline_parent_map; - std::map _instance_id_to_runtime_state; std::mutex _state_map_lock; int _operator_id = 0; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 71c31920ac..d387680310 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -48,12 +48,13 @@ namespace doris::pipeline { -BlockedTaskScheduler::BlockedTaskScheduler() : _started(false), _shutdown(false) {} +BlockedTaskScheduler::BlockedTaskScheduler(std::string name) + : _name(name), _started(false), _shutdown(false) {} -Status BlockedTaskScheduler::start(std::string sche_name) { +Status BlockedTaskScheduler::start() { LOG(INFO) << "BlockedTaskScheduler start"; RETURN_IF_ERROR(Thread::create( - "BlockedTaskScheduler", sche_name, [this]() { this->_schedule(); }, &_thread)); + "BlockedTaskScheduler", _name, [this]() { this->_schedule(); }, &_thread)); while (!this->_started.load()) { std::this_thread::sleep_for(std::chrono::milliseconds(5)); } @@ -349,7 +350,7 @@ void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state, Status exec_status) { // close_a_pipeline may delete fragment context and will core in some defer // code, because the defer code will access fragment context it self. - std::shared_ptr lock_for_context = + std::shared_ptr lock_for_context = task->fragment_context()->shared_from_this(); auto status = task->try_close(exec_status); auto cancel = [&]() { diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 070f2b6a5a..af12376c5b 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -46,16 +46,17 @@ namespace doris::pipeline { class BlockedTaskScheduler { public: - explicit BlockedTaskScheduler(); + explicit BlockedTaskScheduler(std::string name); ~BlockedTaskScheduler() = default; - Status start(std::string sche_name); + Status start(); void shutdown(); Status add_blocked_task(PipelineTask* task); private: std::mutex _task_mutex; + std::string _name; std::condition_variable _task_cond; std::list _blocked_tasks; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 94ba0720fb..ce534c7ce0 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -175,6 +175,7 @@ public: ThreadPool* s3_file_upload_thread_pool() { return _s3_file_upload_thread_pool.get(); } ThreadPool* send_report_thread_pool() { return _send_report_thread_pool.get(); } ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get(); } + ThreadPool* lazy_release_obj_pool() { return _lazy_release_obj_pool.get(); } void set_serial_download_cache_thread_token() { _serial_download_cache_thread_token = @@ -334,6 +335,8 @@ private: std::unique_ptr _send_report_thread_pool; // Pool used by join node to build hash table std::unique_ptr _join_node_thread_pool; + // Pool to use a new thread to release object + std::unique_ptr _lazy_release_obj_pool; // ThreadPoolToken -> buffer std::unordered_map> _download_cache_buf_map; FragmentMgr* _fragment_mgr = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index f4d26903b5..422826790d 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -193,6 +193,11 @@ Status ExecEnv::_init(const std::vector& store_paths, .set_max_threads(std::numeric_limits::max()) .set_max_queue_size(config::fragment_pool_queue_size) .build(&_join_node_thread_pool)); + static_cast(ThreadPoolBuilder("LazyReleaseMemoryThreadPool") + .set_min_threads(1) + .set_max_threads(1) + .set_max_queue_size(1000000) + .build(&_lazy_release_obj_pool)); init_file_cache_factory(); RETURN_IF_ERROR(init_pipeline_task_scheduler()); _task_group_manager = new taskgroup::TaskGroupManager(); @@ -282,21 +287,22 @@ Status ExecEnv::init_pipeline_task_scheduler() { // TODO pipeline task group combie two blocked schedulers. auto t_queue = std::make_shared(executors_size); - _without_group_block_scheduler = std::make_shared(); + _without_group_block_scheduler = + std::make_shared("PipeNoGSchePool"); _without_group_task_scheduler = new pipeline::TaskScheduler( - this, _without_group_block_scheduler, t_queue, "WithoutGroupTaskSchePool", nullptr); + this, _without_group_block_scheduler, t_queue, "PipeNoGSchePool", nullptr); RETURN_IF_ERROR(_without_group_task_scheduler->start()); - RETURN_IF_ERROR(_without_group_block_scheduler->start("WithoutGroupBlockSche")); + RETURN_IF_ERROR(_without_group_block_scheduler->start()); auto tg_queue = std::make_shared(executors_size); - _with_group_block_scheduler = std::make_shared(); - _with_group_task_scheduler = new pipeline::TaskScheduler( - this, _with_group_block_scheduler, tg_queue, "WithGroupTaskSchePool", nullptr); + _with_group_block_scheduler = std::make_shared("PipeGSchePool"); + _with_group_task_scheduler = new pipeline::TaskScheduler(this, _with_group_block_scheduler, + tg_queue, "PipeGSchePool", nullptr); RETURN_IF_ERROR(_with_group_task_scheduler->start()); - RETURN_IF_ERROR(_with_group_block_scheduler->start("WithGroupBlockSche")); + RETURN_IF_ERROR(_with_group_block_scheduler->start()); - _global_block_scheduler = std::make_shared(); - RETURN_IF_ERROR(_global_block_scheduler->start("GlobalBlockSche")); + _global_block_scheduler = std::make_shared("PipeGBlockSche"); + RETURN_IF_ERROR(_global_block_scheduler->start()); _runtime_filter_timer_queue = new doris::pipeline::RuntimeFilterTimerQueue(); _runtime_filter_timer_queue->run(); return Status::OK(); @@ -563,6 +569,7 @@ void ExecEnv::destroy() { SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool); SAFE_SHUTDOWN(_s3_file_upload_thread_pool); SAFE_SHUTDOWN(_join_node_thread_pool); + SAFE_SHUTDOWN(_lazy_release_obj_pool); SAFE_SHUTDOWN(_send_report_thread_pool); SAFE_SHUTDOWN(_send_batch_thread_pool); SAFE_SHUTDOWN(_serial_download_cache_thread_token); @@ -623,6 +630,7 @@ void ExecEnv::destroy() { SAFE_DELETE(_runtime_filter_timer_queue); // TODO(zhiqiang): Maybe we should call shutdown before release thread pool? _join_node_thread_pool.reset(nullptr); + _lazy_release_obj_pool.reset(nullptr); _send_report_thread_pool.reset(nullptr); _buffered_reader_prefetch_thread_pool.reset(nullptr); _s3_file_upload_thread_pool.reset(nullptr); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index af51cdfd4c..dcbf6346fd 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -128,6 +128,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { _runtime_state = RuntimeState::create_unique(params, request.query_options, query_globals, _exec_env); _runtime_state->set_query_ctx(_query_ctx.get()); + _runtime_state->set_task_execution_context(shared_from_this()); _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); SCOPED_ATTACH_TASK(_runtime_state.get()); @@ -218,9 +219,10 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { RuntimeProfile* sink_profile = nullptr; // set up sink, if required if (request.fragment.__isset.output_sink) { - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink( - obj_pool(), request.fragment.output_sink, request.fragment.output_exprs, params, - row_desc(), runtime_state(), &_sink, *_desc_tbl)); + RETURN_IF_ERROR_OR_CATCH_EXCEPTION( + DataSink::create_data_sink(_runtime_state->obj_pool(), request.fragment.output_sink, + request.fragment.output_exprs, params, row_desc(), + runtime_state(), &_sink, *_desc_tbl)); RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_sink->prepare(runtime_state())); sink_profile = _sink->profile(); if (sink_profile != nullptr) { diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index 6d374c78f9..41817e5308 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -73,7 +73,7 @@ class Block; // // Aside from Cancel(), which may be called asynchronously, this class is not // thread-safe. -class PlanFragmentExecutor { +class PlanFragmentExecutor : public TaskExecutionContext { public: using report_status_callback = std::function; // report_status_cb, if !empty(), is used to report the accumulated profile diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 18b713650f..a9886da5cd 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -22,6 +22,14 @@ namespace doris { +class DelayReleaseToken : public Runnable { +public: + DelayReleaseToken(std::unique_ptr&& token) { token_ = std::move(token); } + ~DelayReleaseToken() override = default; + void run() override {} + std::unique_ptr token_; +}; + QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* exec_env, const TQueryOptions& query_options) : fragment_num(total_fragment_num), @@ -55,6 +63,14 @@ QueryContext::~QueryContext() { } LOG_INFO("Query {} deconstructed, {}", print_id(_query_id), mem_tracker_msg); + // Not release the the thread token in query context's dector method, because the query + // conext may be dectored in the thread token it self. It is very dangerous and may core. + // And also thread token need shutdown, it may take some time, may cause the thread that + // release the token hang, the thread maybe a pipeline task scheduler thread. + if (_thread_token) { + static_cast(ExecEnv::GetInstance()->lazy_release_obj_pool()->submit( + std::make_shared(std::move(_thread_token)))); + } } void QueryContext::set_ready_to_execute(bool is_cancelled) { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index a5bf011e32..cec230dc34 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -39,6 +39,7 @@ #include "common/factory_creator.h" #include "common/status.h" #include "gutil/integral_types.h" +#include "runtime/task_execution_context.h" #include "util/debug_util.h" #include "util/runtime_profile.h" @@ -531,13 +532,24 @@ public: auto& pipeline_id_to_profile() { return _pipeline_id_to_profile; } + void set_task_execution_context(std::shared_ptr context) { + _task_execution_context = context; + } + + std::weak_ptr get_task_execution_context() { + return _task_execution_context; + } + private: Status create_error_log_file(); - static const int DEFAULT_BATCH_SIZE = 2048; + static const int DEFAULT_BATCH_SIZE = 4062; std::shared_ptr _query_mem_tracker; + // Hold execution context for other threads + std::weak_ptr _task_execution_context; + // put runtime state before _obj_pool, so that it will be deconstructed after // _obj_pool. Because some of object in _obj_pool will use profile when deconstructing. RuntimeProfile _profile; diff --git a/be/src/runtime/task_execution_context.h b/be/src/runtime/task_execution_context.h new file mode 100644 index 0000000000..08564840f5 --- /dev/null +++ b/be/src/runtime/task_execution_context.h @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +namespace doris { + +// This class act as a super class of all context like things +class TaskExecutionContext : public std::enable_shared_from_this { +public: + TaskExecutionContext() = default; + virtual ~TaskExecutionContext() = default; +}; + +} // namespace doris diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp b/be/src/vec/exec/scan/scan_task_queue.cpp index edf3f6f767..7c2068e571 100644 --- a/be/src/vec/exec/scan/scan_task_queue.cpp +++ b/be/src/vec/exec/scan/scan_task_queue.cpp @@ -26,7 +26,8 @@ namespace taskgroup { static void empty_function() {} ScanTask::ScanTask() : ScanTask(empty_function, nullptr, nullptr, 1) {} -ScanTask::ScanTask(WorkFunction scan_func, vectorized::ScannerContext* scanner_context, +ScanTask::ScanTask(WorkFunction scan_func, + std::shared_ptr scanner_context, TGSTEntityPtr scan_entity, int priority) : scan_func(std::move(scan_func)), scanner_context(scanner_context), diff --git a/be/src/vec/exec/scan/scan_task_queue.h b/be/src/vec/exec/scan/scan_task_queue.h index 6ee339c785..18ef18872e 100644 --- a/be/src/vec/exec/scan/scan_task_queue.h +++ b/be/src/vec/exec/scan/scan_task_queue.h @@ -32,7 +32,7 @@ using WorkFunction = std::function; // Like PriorityThreadPool::Task struct ScanTask { ScanTask(); - ScanTask(WorkFunction scan_func, vectorized::ScannerContext* scanner_context, + ScanTask(WorkFunction scan_func, std::shared_ptr scanner_context, TGSTEntityPtr scan_entity, int priority); bool operator<(const ScanTask& o) const { return priority < o.priority; } ScanTask& operator++() { @@ -41,7 +41,7 @@ struct ScanTask { } WorkFunction scan_func; - vectorized::ScannerContext* scanner_context = nullptr; + std::shared_ptr scanner_context = nullptr; TGSTEntityPtr scan_entity; int priority; }; diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index c35917749a..8a6f6de6e6 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -45,27 +45,30 @@ namespace doris::vectorized { using namespace std::chrono_literals; -ScannerContext::ScannerContext(RuntimeState* state_, const TupleDescriptor* output_tuple_desc, - const std::list& scanners_, int64_t limit_, - int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances, +ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* output_tuple_desc, + const std::list& scanners, int64_t limit_, + int64_t max_bytes_in_blocks_queue, const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state, std::shared_ptr dependency, std::shared_ptr finish_dependency) - : _state(state_), + : _state(state), _parent(nullptr), _local_state(local_state), _output_tuple_desc(output_tuple_desc), _process_status(Status::OK()), - _batch_size(state_->batch_size()), + _batch_size(state->batch_size()), limit(limit_), - _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue_, (int64_t)1024) * + _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) * num_parallel_instances), - _scanner_scheduler(state_->exec_env()->scanner_scheduler()), - _scanners(scanners_), - _scanners_ref(scanners_.begin(), scanners_.end()), + _scanner_scheduler(state->exec_env()->scanner_scheduler()), + _scanners(scanners), + _scanners_ref(scanners.begin(), scanners.end()), _num_parallel_instances(num_parallel_instances), _dependency(dependency), _finish_dependency(finish_dependency) { + // Use the task exec context as a lock between scanner threads and fragment exection threads + _task_exec_ctx = _state->get_task_execution_context(); + _query_id = _state->get_query_ctx()->query_id(); ctx_id = UniqueId::gen_uid().to_string(); if (_scanners.empty()) { _is_finished = true; @@ -87,24 +90,27 @@ ScannerContext::ScannerContext(RuntimeState* state_, const TupleDescriptor* outp } } -ScannerContext::ScannerContext(doris::RuntimeState* state_, doris::vectorized::VScanNode* parent, +ScannerContext::ScannerContext(doris::RuntimeState* state, doris::vectorized::VScanNode* parent, const doris::TupleDescriptor* output_tuple_desc, - const std::list& scanners_, int64_t limit_, - int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances, + const std::list& scanners, int64_t limit_, + int64_t max_bytes_in_blocks_queue, const int num_parallel_instances, pipeline::ScanLocalStateBase* local_state) - : _state(state_), + : _state(state), _parent(parent), _local_state(local_state), _output_tuple_desc(output_tuple_desc), _process_status(Status::OK()), - _batch_size(state_->batch_size()), + _batch_size(state->batch_size()), limit(limit_), - _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue_, (int64_t)1024) * + _max_bytes_in_queue(std::max(max_bytes_in_blocks_queue, (int64_t)1024) * num_parallel_instances), - _scanner_scheduler(state_->exec_env()->scanner_scheduler()), - _scanners(scanners_), - _scanners_ref(scanners_.begin(), scanners_.end()), + _scanner_scheduler(state->exec_env()->scanner_scheduler()), + _scanners(scanners), + _scanners_ref(scanners.begin(), scanners.end()), _num_parallel_instances(num_parallel_instances) { + // Use the task exec context as a lock between scanner threads and fragment exection threads + _task_exec_ctx = _state->get_task_execution_context(); + _query_id = _state->get_query_ctx()->query_id(); ctx_id = UniqueId::gen_uid().to_string(); if (_scanners.empty()) { _is_finished = true; @@ -269,7 +275,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo bool is_scheduled = false; if (to_be_schedule && _num_running_scanners == 0) { is_scheduled = true; - auto state = _scanner_scheduler->submit(this); + auto state = _scanner_scheduler->submit(shared_from_this()); if (state.ok()) { _num_scheduling_ctx++; } else { @@ -486,10 +492,6 @@ void ScannerContext::clear_and_join(Parent* parent, RuntimeState* state) { break; } } while (false); - - for (const auto& tid : _btids) { - bthread_join(tid, nullptr); - } // Must wait all running scanners stop running. // So that we can make sure to close all scanners. static_cast(_close_and_clear_scanners(parent, state)); @@ -522,7 +524,7 @@ std::string ScannerContext::debug_string() { void ScannerContext::reschedule_scanner_ctx() { std::lock_guard l(_transfer_lock); - auto state = _scanner_scheduler->submit(this); + auto state = _scanner_scheduler->submit(shared_from_this()); //todo(wb) rethinking is it better to mark current scan_context failed when submit failed many times? if (state.ok()) { _num_scheduling_ctx++; @@ -545,7 +547,7 @@ void ScannerContext::push_back_scanner_and_reschedule(VScannerSPtr scanner) { set_ready_to_finish(); if (should_be_scheduled()) { - auto state = _scanner_scheduler->submit(this); + auto state = _scanner_scheduler->submit(shared_from_this()); if (state.ok()) { _num_scheduling_ctx++; } else { diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index ec0c017ec2..a64b544471 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -65,14 +65,13 @@ class SimplifiedScanScheduler; // ScannerContext is also the scheduling unit of ScannerScheduler. // ScannerScheduler schedules a ScannerContext at a time, // and submits the Scanners to the scanner thread pool for data scanning. -class ScannerContext { +class ScannerContext : public std::enable_shared_from_this { ENABLE_FACTORY_CREATOR(ScannerContext); public: - ScannerContext(RuntimeState* state_, VScanNode* parent, - const TupleDescriptor* output_tuple_desc, - const std::list& scanners_, int64_t limit_, - int64_t max_bytes_in_blocks_queue_, const int num_parallel_instances = 1, + ScannerContext(RuntimeState* state, VScanNode* parent, const TupleDescriptor* output_tuple_desc, + const std::list& scanners, int64_t limit_, + int64_t max_bytes_in_blocks_queue, const int num_parallel_instances = 1, pipeline::ScanLocalStateBase* local_state = nullptr); virtual ~ScannerContext() = default; @@ -175,12 +174,14 @@ public: // the unique id of this context std::string ctx_id; + TUniqueId _query_id; int32_t queue_idx = -1; ThreadPoolToken* thread_token = nullptr; - std::vector _btids; bool _should_reset_thread_name = true; + std::weak_ptr get_task_execution_context() { return _task_exec_ctx; } + private: template Status _close_and_clear_scanners(Parent* parent, RuntimeState* state); @@ -197,6 +198,7 @@ protected: void _set_scanner_done(); RuntimeState* _state = nullptr; + std::weak_ptr _task_exec_ctx; VScanNode* _parent = nullptr; pipeline::ScanLocalStateBase* _local_state = nullptr; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 2666387956..2e4db75a24 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -111,9 +111,9 @@ Status ScannerScheduler::init(ExecEnv* env) { .set_max_threads(QUEUE_NUM) .build(&_scheduler_pool)); - _pending_queues = new BlockingQueue*[QUEUE_NUM]; + _pending_queues = new BlockingQueue>*[QUEUE_NUM]; for (int i = 0; i < QUEUE_NUM; i++) { - _pending_queues[i] = new BlockingQueue(INT32_MAX); + _pending_queues[i] = new BlockingQueue>(INT32_MAX); static_cast(_scheduler_pool->submit_func([this, i] { this->_schedule_thread(i); })); } @@ -154,7 +154,7 @@ Status ScannerScheduler::init(ExecEnv* env) { return Status::OK(); } -Status ScannerScheduler::submit(ScannerContext* ctx) { +Status ScannerScheduler::submit(std::shared_ptr ctx) { if (ctx->done()) { return Status::EndOfFile("ScannerContext is done"); } @@ -171,9 +171,9 @@ std::unique_ptr ScannerScheduler::new_limited_scan_pool_token( } void ScannerScheduler::_schedule_thread(int queue_id) { - BlockingQueue* queue = _pending_queues[queue_id]; + BlockingQueue>* queue = _pending_queues[queue_id]; while (!_is_closed) { - ScannerContext* ctx; + std::shared_ptr ctx; bool ok = queue->blocking_get(&ctx); if (!ok) { // maybe closed @@ -186,14 +186,13 @@ void ScannerScheduler::_schedule_thread(int queue_id) { } } -[[maybe_unused]] static void* run_scanner_bthread(void* arg) { - auto* f = reinterpret_cast*>(arg); - (*f)(); - delete f; - return nullptr; -} - -void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { +void ScannerScheduler::_schedule_scanners(std::shared_ptr ctx) { + auto task_lock = ctx->get_task_execution_context().lock(); + if (task_lock == nullptr) { + // LOG(WARNING) << "could not lock task execution context, query " << print_id(_query_id) + // << " maybe finished"; + return; + } MonotonicStopWatch watch; watch.reset(); watch.start(); @@ -227,72 +226,74 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { // TODO(cmy): How to handle this "nice"? int nice = 1; auto iter = this_run.begin(); - auto submit_to_thread_pool = [&] { - if (ctx->thread_token != nullptr) { - // TODO llj tg how to treat this? - while (iter != this_run.end()) { - (*iter)->start_wait_worker_timer(); - auto s = ctx->thread_token->submit_func( - [this, scanner = *iter, ctx] { this->_scanner_scan(this, ctx, scanner); }); - if (s.ok()) { - this_run.erase(iter++); - } else { - ctx->set_status_on_error(s); - break; - } + if (ctx->thread_token != nullptr) { + // TODO llj tg how to treat this? + while (iter != this_run.end()) { + (*iter)->start_wait_worker_timer(); + auto s = ctx->thread_token->submit_func( + [this, scanner = *iter, ctx] { this->_scanner_scan(this, ctx, scanner); }); + if (s.ok()) { + this_run.erase(iter++); + } else { + ctx->set_status_on_error(s); + break; } - } else { - while (iter != this_run.end()) { - (*iter)->start_wait_worker_timer(); - TabletStorageType type = (*iter)->get_storage_type(); - bool ret = false; - if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { - if (auto* scan_sche = ctx->get_simple_scan_scheduler()) { - auto work_func = [this, scanner = *iter, ctx] { - this->_scanner_scan(this, ctx, scanner); - }; - SimplifiedScanTask simple_scan_task = {work_func, ctx}; - ret = scan_sche->get_scan_queue()->try_put(simple_scan_task); - } else if (ctx->get_task_group() && config::enable_workload_group_for_scan) { - auto work_func = [this, scanner = *iter, ctx] { - this->_scanner_scan(this, ctx, scanner); - }; - taskgroup::ScanTask scan_task = { - work_func, ctx, ctx->get_task_group()->local_scan_task_entity(), - nice}; - ret = _task_group_local_scan_queue->push_back(scan_task); - } else { - PriorityThreadPool::Task task; - task.work_function = [this, scanner = *iter, ctx] { - this->_scanner_scan(this, ctx, scanner); - }; - task.priority = nice; - ret = _local_scan_thread_pool->offer(task); - } + } + } else { + while (iter != this_run.end()) { + (*iter)->start_wait_worker_timer(); + TabletStorageType type = (*iter)->get_storage_type(); + bool ret = false; + if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { + if (auto* scan_sche = ctx->get_simple_scan_scheduler()) { + auto work_func = [this, scanner = *iter, ctx] { + this->_scanner_scan(this, ctx, scanner); + }; + SimplifiedScanTask simple_scan_task = {work_func, ctx}; + ret = scan_sche->get_scan_queue()->try_put(simple_scan_task); + } else if (ctx->get_task_group() && config::enable_workload_group_for_scan) { + auto work_func = [this, scanner = *iter, ctx] { + this->_scanner_scan(this, ctx, scanner); + }; + taskgroup::ScanTask scan_task = { + work_func, ctx, ctx->get_task_group()->local_scan_task_entity(), nice}; + ret = _task_group_local_scan_queue->push_back(scan_task); } else { PriorityThreadPool::Task task; task.work_function = [this, scanner = *iter, ctx] { this->_scanner_scan(this, ctx, scanner); }; task.priority = nice; - ret = _remote_scan_thread_pool->offer(task); - } - if (ret) { - this_run.erase(iter++); - } else { - ctx->set_status_on_error( - Status::InternalError("failed to submit scanner to scanner pool")); - break; + ret = _local_scan_thread_pool->offer(task); } + } else { + PriorityThreadPool::Task task; + task.work_function = [this, scanner = *iter, ctx] { + this->_scanner_scan(this, ctx, scanner); + }; + task.priority = nice; + ret = _remote_scan_thread_pool->offer(task); + } + if (ret) { + this_run.erase(iter++); + } else { + ctx->set_status_on_error( + Status::InternalError("failed to submit scanner to scanner pool")); + break; } } - }; - submit_to_thread_pool(); + } ctx->incr_ctx_scheduling_time(watch.elapsed_time()); } -void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, - VScannerSPtr scanner) { +void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, + std::shared_ptr ctx, VScannerSPtr scanner) { + auto task_lock = ctx->get_task_execution_context().lock(); + if (task_lock == nullptr) { + // LOG(WARNING) << "could not lock task execution context, query " << print_id(_query_id) + // << " maybe finished"; + return; + } SCOPED_ATTACH_TASK(scanner->runtime_state()); // for cpu hard limit, thread name should not be reset if (ctx->_should_reset_thread_name) { diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index c561dba227..91d341613d 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -66,7 +66,7 @@ public: [[nodiscard]] Status init(ExecEnv* env); - [[nodiscard]] Status submit(ScannerContext* ctx); + [[nodiscard]] Status submit(std::shared_ptr ctx); void stop(); @@ -82,9 +82,10 @@ private: // scheduling thread function void _schedule_thread(int queue_id); // schedule scanners in a certain ScannerContext - void _schedule_scanners(ScannerContext* ctx); + void _schedule_scanners(std::shared_ptr ctx); // execution thread function - void _scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, VScannerSPtr scanner); + void _scanner_scan(ScannerScheduler* scheduler, std::shared_ptr ctx, + VScannerSPtr scanner); void _task_group_scanner_scan(ScannerScheduler* scheduler, taskgroup::ScanTaskTaskGroupQueue* scan_queue); @@ -102,7 +103,7 @@ private: // and put it to the _scheduling_map. // If any scanner finish, it will take ctx from and put it to pending queue again. std::atomic_uint _queue_idx = {0}; - BlockingQueue** _pending_queues = nullptr; + BlockingQueue>** _pending_queues = nullptr; // scheduling thread pool std::unique_ptr _scheduler_pool; @@ -126,13 +127,13 @@ private: struct SimplifiedScanTask { SimplifiedScanTask() = default; SimplifiedScanTask(std::function scan_func, - vectorized::ScannerContext* scanner_context) { + std::shared_ptr scanner_context) { this->scan_func = scan_func; this->scanner_context = scanner_context; } std::function scan_func; - vectorized::ScannerContext* scanner_context = nullptr; + std::shared_ptr scanner_context = nullptr; }; // used for cpu hard limit diff --git a/be/src/vec/exec/scan/vscan_node.cpp b/be/src/vec/exec/scan/vscan_node.cpp index deed723e26..5176d7900b 100644 --- a/be/src/vec/exec/scan/vscan_node.cpp +++ b/be/src/vec/exec/scan/vscan_node.cpp @@ -197,8 +197,7 @@ Status VScanNode::alloc_resource(RuntimeState* state) { if (_scanner_ctx) { DCHECK(!_eos && _num_scanners->value() > 0); RETURN_IF_ERROR(_scanner_ctx->init()); - RETURN_IF_ERROR( - _state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get())); + RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx)); } if (_shared_scan_opt) { _shared_scanner_controller->set_scanner_context(id(), @@ -218,7 +217,7 @@ Status VScanNode::alloc_resource(RuntimeState* state) { : Status::OK()); if (_scanner_ctx) { RETURN_IF_ERROR(_scanner_ctx->init()); - RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get())); + RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx)); } }