From bcdb4813741ad4b942203bafaedbe2228e0b0cb2 Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Tue, 22 Aug 2023 16:00:34 +0800 Subject: [PATCH] [refactor](fragment) refactor non pipeline fragment executor (#23281) --------- Co-authored-by: yiguolei --- be/src/pipeline/pipeline_task.cpp | 2 +- .../pipeline/pipeline_x/pipeline_x_task.cpp | 2 +- be/src/pipeline/task_scheduler.cpp | 2 +- be/src/runtime/fragment_mgr.cpp | 354 +++--------------- be/src/runtime/fragment_mgr.h | 29 +- be/src/runtime/plan_fragment_executor.cpp | 149 ++++++-- be/src/runtime/plan_fragment_executor.h | 58 ++- be/src/runtime/query_context.h | 27 +- be/test/runtime/fragment_mgr_test.cpp | 148 -------- 9 files changed, 258 insertions(+), 513 deletions(-) delete mode 100644 be/test/runtime/fragment_mgr_test.cpp diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index e8603bdb17..dbf155d83c 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -385,7 +385,7 @@ void PipelineTask::set_state(PipelineTaskState state) { std::string PipelineTask::debug_string() { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id)); + fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id())); fmt::format_to(debug_string_buffer, "InstanceId: {}\n", print_id(fragment_context()->get_fragment_instance_id())); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 250bc610a9..497f100b02 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -240,7 +240,7 @@ Status PipelineXTask::close() { std::string PipelineXTask::debug_string() { fmt::memory_buffer debug_string_buffer; - fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id)); + fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id())); fmt::format_to(debug_string_buffer, "RuntimeUsage: {}\n", PrettyPrinter::print(get_runtime_ns(), TUnit::TIME_NS)); diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index e405549659..b792f0f4c6 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -121,7 +121,7 @@ void BlockedTaskScheduler::_schedule() { } else if (task->fragment_context()->is_canceled()) { _make_task_run(local_blocked_tasks, iter, ready_tasks); } else if (task->query_context()->is_timeout(now)) { - LOG(WARNING) << "Timeout, query_id=" << print_id(task->query_context()->query_id) + LOG(WARNING) << "Timeout, query_id=" << print_id(task->query_context()->query_id()) << ", instance_id=" << print_id(task->fragment_context()->get_fragment_instance_id()); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 85393f2bc1..97965500b4 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -41,6 +41,8 @@ #include #include + +#include "pipeline/pipeline_x/pipeline_x_fragment_context.h" // IWYU pragma: no_include #include // IWYU pragma: keep #include @@ -56,7 +58,6 @@ #include "io/fs/stream_load_pipe.h" #include "opentelemetry/trace/scope.h" #include "pipeline/pipeline_fragment_context.h" -#include "pipeline/pipeline_x/pipeline_x_fragment_context.h" #include "runtime/client_cache.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" @@ -110,208 +111,6 @@ std::string to_load_error_http_path(const std::string& file_name) { using apache::thrift::TException; using apache::thrift::transport::TTransportException; -class FragmentExecState { -public: - using report_status_callback_impl = std::function; - // Constructor by using QueryContext - FragmentExecState(const TUniqueId& query_id, const TUniqueId& instance_id, int backend_num, - ExecEnv* exec_env, std::shared_ptr query_ctx, - const report_status_callback_impl& report_status_cb_impl); - - Status prepare(const TExecPlanFragmentParams& params); - - Status execute(); - - Status cancel(const PPlanFragmentCancelReason& reason, const std::string& msg = ""); - bool is_canceled() { return _cancelled; } - - TUniqueId fragment_instance_id() const { return _fragment_instance_id; } - - TUniqueId query_id() const { return _query_id; } - - PlanFragmentExecutor* executor() { return &_executor; } - - const vectorized::VecDateTimeValue& start_time() const { return _start_time; } - - void set_merge_controller_handler( - std::shared_ptr& handler) { - _merge_controller_handler = handler; - } - - // Update status of this fragment execute - Status update_status(const Status& status) { - std::lock_guard l(_status_lock); - if (!status.ok() && _exec_status.ok()) { - _exec_status = status; - LOG(WARNING) << "query_id=" << print_id(_query_id) - << ", instance_id=" << print_id(_fragment_instance_id) - << " meet error status " << status; - } - return _exec_status; - } - - void set_group(const TResourceInfo& info) { - _set_rsc_info = true; - _user = info.user; - _group = info.group; - } - - bool is_timeout(const vectorized::VecDateTimeValue& now) const { - if (_timeout_second <= 0) { - return false; - } - if (now.second_diff(_start_time) > _timeout_second) { - return true; - } - return false; - } - - int get_timeout_second() const { return _timeout_second; } - - std::shared_ptr get_query_ctx() { return _query_ctx; } - - void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; } - -private: - void coordinator_callback(const Status& status, RuntimeProfile* profile, - RuntimeProfile* load_channel_profile, bool done); - - // Id of this query - TUniqueId _query_id; - // Id of this instance - TUniqueId _fragment_instance_id; - // Used to report to coordinator which backend is over - int _backend_num; - TNetworkAddress _coord_addr; - - // This context is shared by all fragments of this host in a query. - // _query_ctx should be the last one to be destructed, because _executor's - // destruct method will call close and it will depend on query context, - // for example runtime profile. - std::shared_ptr _query_ctx; - PlanFragmentExecutor _executor; - vectorized::VecDateTimeValue _start_time; - - std::mutex _status_lock; - Status _exec_status; - - bool _set_rsc_info = false; - std::string _user; - std::string _group; - - int _timeout_second; - std::atomic _cancelled {false}; - - std::shared_ptr _merge_controller_handler; - - // If set the true, this plan fragment will be executed only after FE send execution start rpc. - bool _need_wait_execution_trigger = false; - report_status_callback_impl _report_status_cb_impl; -}; - -FragmentExecState::FragmentExecState(const TUniqueId& query_id, - const TUniqueId& fragment_instance_id, int backend_num, - ExecEnv* exec_env, std::shared_ptr query_ctx, - const report_status_callback_impl& report_status_cb_impl) - : _query_id(query_id), - _fragment_instance_id(fragment_instance_id), - _backend_num(backend_num), - _query_ctx(std::move(query_ctx)), - _executor(exec_env, std::bind(std::mem_fn(&FragmentExecState::coordinator_callback), - this, std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3, std::placeholders::_4)), - _set_rsc_info(false), - _timeout_second(-1), - _report_status_cb_impl(report_status_cb_impl) { - _start_time = vectorized::VecDateTimeValue::local_time(); - _coord_addr = _query_ctx->coord_addr; -} - -Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) { - if (params.__isset.query_options) { - _timeout_second = params.query_options.execution_timeout; - } - - if (_query_ctx == nullptr) { - if (params.__isset.resource_info) { - set_group(params.resource_info); - } - } - - if (_query_ctx == nullptr) { - return _executor.prepare(params); - } else { - return _executor.prepare(params, _query_ctx.get()); - } -} - -Status FragmentExecState::execute() { - if (_need_wait_execution_trigger) { - // if _need_wait_execution_trigger is true, which means this instance - // is prepared but need to wait for the signal to do the rest execution. - if (!_query_ctx->wait_for_start()) { - return cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "wait fragment start timeout"); - } - } -#ifndef BE_TEST - if (_executor.runtime_state()->is_cancelled()) { - return Status::Cancelled("cancelled before execution"); - } -#endif - int64_t duration_ns = 0; - { - SCOPED_RAW_TIMER(&duration_ns); - opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start executing Fragment"); - Status st = _executor.open(); - WARN_IF_ERROR(st, - strings::Substitute("Got error while opening fragment $0, query id: $1", - print_id(_fragment_instance_id), print_id(_query_id))); - if (!st.ok()) { - cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, - fmt::format("PlanFragmentExecutor open failed, reason: {}", st.to_string())); - } - _executor.close(); - } - DorisMetrics::instance()->fragment_requests_total->increment(1); - DorisMetrics::instance()->fragment_request_duration_us->increment(duration_ns / 1000); - return Status::OK(); -} - -Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { - if (!_cancelled) { - std::lock_guard l(_status_lock); - if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { - _executor.set_is_report_on_cancel(false); - } - _executor.cancel(reason, msg); -#ifndef BE_TEST - // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe - // For stream load the fragment's query_id == load id, it is set in FE. - auto stream_load_ctx = _query_ctx->exec_env()->new_load_stream_mgr()->get(_query_id); - if (stream_load_ctx != nullptr) { - stream_load_ctx->pipe->cancel(msg); - } -#endif - _cancelled = true; - } - return Status::OK(); -} - -// There can only be one of these callbacks in-flight at any moment, because -// it is only invoked from the executor's reporting thread. -// Also, the reported status will always reflect the most recent execution status, -// including the final status when execution finishes. -void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfile* profile, - RuntimeProfile* load_channel_profile, bool done) { - _report_status_cb_impl( - {status, profile, load_channel_profile, done, _coord_addr, _query_id, -1, - _fragment_instance_id, _backend_num, _executor.runtime_state(), - std::bind(&FragmentExecState::update_status, this, std::placeholders::_1), - std::bind(&PlanFragmentExecutor::cancel, &_executor, std::placeholders::_1, - std::placeholders::_2)}); - DCHECK(status.ok() || done); // if !status.ok() => done -} - FragmentMgr::FragmentMgr(ExecEnv* exec_env) : _exec_env(exec_env), _stop_background_threads_latch(1) { _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr"); @@ -519,24 +318,25 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { static void empty_function(RuntimeState*, Status*) {} -void FragmentMgr::_exec_actual(std::shared_ptr exec_state, +void FragmentMgr::_exec_actual(std::shared_ptr fragment_executor, const FinishCallback& cb) { std::string func_name {"PlanFragmentExecutor::_exec_actual"}; #ifndef BE_TEST - SCOPED_ATTACH_TASK(exec_state->executor()->runtime_state()); + SCOPED_ATTACH_TASK(fragment_executor->runtime_state()); #endif LOG_INFO(func_name) - .tag("query_id", exec_state->query_id()) - .tag("instance_id", exec_state->fragment_instance_id()) + .tag("query_id", fragment_executor->query_id()) + .tag("instance_id", fragment_executor->fragment_instance_id()) .tag("pthread_id", (uintptr_t)pthread_self()); - Status st = exec_state->execute(); + Status st = fragment_executor->execute(); if (!st.ok()) { - exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "exec_state execute failed"); + fragment_executor->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, + "fragment_executor execute failed"); } - std::shared_ptr query_ctx = exec_state->get_query_ctx(); + std::shared_ptr query_ctx = fragment_executor->get_query_ctx(); bool all_done = false; if (query_ctx != nullptr) { // decrease the number of unfinished fragments @@ -546,15 +346,15 @@ void FragmentMgr::_exec_actual(std::shared_ptr exec_state, // remove exec state after this fragment finished { std::lock_guard lock(_lock); - _fragment_map.erase(exec_state->fragment_instance_id()); + _fragment_map.erase(fragment_executor->fragment_instance_id()); if (all_done && query_ctx) { - _query_ctx_map.erase(query_ctx->query_id); + _query_ctx_map.erase(query_ctx->query_id()); } } // Callback after remove from this id - auto status = exec_state->executor()->status(); - cb(exec_state->executor()->runtime_state(), &status); + auto status = fragment_executor->status(); + cb(fragment_executor->runtime_state(), &status); } Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { @@ -691,19 +491,17 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo } else { // This may be a first fragment request of the query. // Create the query fragments context. - query_ctx = QueryContext::create_shared(params.fragment_num_on_host, _exec_env, + query_ctx = QueryContext::create_shared(query_id, params.fragment_num_on_host, _exec_env, params.query_options); - query_ctx->query_id = query_id; RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl))); - + query_ctx->coord_addr = params.coord; // set file scan range params if (params.__isset.file_scan_params) { query_ctx->file_scan_range_params_map = params.file_scan_params; } - query_ctx->coord_addr = params.coord; - LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id.hi, query_ctx->query_id.lo) + LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id().hi, query_ctx->query_id().lo) << " coord_addr " << query_ctx->coord_addr << " total fragment num on current host: " << params.fragment_num_on_host; query_ctx->query_globals = params.query_globals; @@ -731,15 +529,15 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo if (params.query_options.query_type == TQueryType::SELECT) { query_ctx->query_mem_tracker = std::make_shared( MemTrackerLimiter::Type::QUERY, - fmt::format("Query#Id={}", print_id(query_ctx->query_id)), bytes_limit); + fmt::format("Query#Id={}", print_id(query_ctx->query_id())), bytes_limit); } else if (params.query_options.query_type == TQueryType::LOAD) { query_ctx->query_mem_tracker = std::make_shared( MemTrackerLimiter::Type::LOAD, - fmt::format("Load#Id={}", print_id(query_ctx->query_id)), bytes_limit); + fmt::format("Load#Id={}", print_id(query_ctx->query_id())), bytes_limit); } else { // EXTERNAL query_ctx->query_mem_tracker = std::make_shared( MemTrackerLimiter::Type::LOAD, - fmt::format("External#Id={}", print_id(query_ctx->query_id)), bytes_limit); + fmt::format("External#Id={}", print_id(query_ctx->query_id())), bytes_limit); } if (params.query_options.__isset.is_report_success && params.query_options.is_report_success) { @@ -756,11 +554,11 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo task_group_info); tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker); query_ctx->set_task_group(tg); - LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id) + LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) << " use task group: " << tg->debug_string(); } } else { - VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id) + VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id()) << " does not use task group."; } } @@ -771,9 +569,9 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo std::lock_guard lock(_lock); auto search = _query_ctx_map.find(query_id); if (search == _query_ctx_map.end()) { - _query_ctx_map.insert(std::make_pair(query_ctx->query_id, query_ctx)); + _query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx)); LOG(INFO) << "Register query/load memory tracker, query/load id: " - << print_id(query_ctx->query_id) + << print_id(query_ctx->query_id()) << " limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES); } else { // Already has a query fragments context, use it @@ -809,7 +607,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, } } - std::shared_ptr exec_state; std::shared_ptr query_ctx; bool pipeline_engine_enabled = params.query_options.__isset.enable_pipeline_engine && params.query_options.enable_pipeline_engine; @@ -817,36 +614,37 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, _get_query_ctx(params, params.params.query_id, pipeline_engine_enabled, query_ctx)); query_ctx->fragment_ids.push_back(fragment_instance_id); - exec_state.reset( - new FragmentExecState(query_ctx->query_id, params.params.fragment_instance_id, - params.backend_num, _exec_env, query_ctx, - std::bind(std::mem_fn(&FragmentMgr::coordinator_callback), - this, std::placeholders::_1))); + auto fragment_executor = std::make_shared( + _exec_env, query_ctx, params.params.fragment_instance_id, -1, params.backend_num, + std::bind(std::mem_fn(&FragmentMgr::coordinator_callback), this, + std::placeholders::_1)); if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) { // set need_wait_execution_trigger means this instance will not actually being executed // until the execPlanFragmentStart RPC trigger to start it. - exec_state->set_need_wait_execution_trigger(); + fragment_executor->set_need_wait_execution_trigger(); } int64_t duration_ns = 0; DCHECK(!pipeline_engine_enabled); { SCOPED_RAW_TIMER(&duration_ns); - RETURN_IF_ERROR(exec_state->prepare(params)); + RETURN_IF_ERROR(fragment_executor->prepare(params)); } g_fragmentmgr_prepare_latency << (duration_ns / 1000); std::shared_ptr handler; - _runtimefilter_controller.add_entity(params, &handler, exec_state->executor()->runtime_state()); - exec_state->set_merge_controller_handler(handler); + // TODO need check the status, but when I add return_if_error the P0 will not pass + _runtimefilter_controller.add_entity(params, &handler, fragment_executor->runtime_state()); + fragment_executor->set_merge_controller_handler(handler); { std::lock_guard lock(_lock); - _fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state)); + _fragment_map.insert(std::make_pair(params.params.fragment_instance_id, fragment_executor)); _cv.notify_all(); } auto st = _thread_pool->submit_func( - [this, exec_state, cb, parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] { + [this, fragment_executor, cb, + parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] { OpentelemetryScope scope {parent_span}; - _exec_actual(exec_state, cb); + _exec_actual(fragment_executor, cb); }); if (!st.ok()) { { @@ -854,8 +652,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, std::lock_guard lock(_lock); _fragment_map.erase(params.params.fragment_instance_id); } - exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, - "push plan fragment to thread pool failed"); + fragment_executor->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, + "push plan fragment to thread pool failed"); return Status::InternalError( strings::Substitute("push plan fragment $0 to thread pool failed. err = $1, BE: $2", print_id(params.params.fragment_instance_id), st.to_string(), @@ -888,7 +686,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, int64_t duration_ns = 0; std::shared_ptr context = std::make_shared( - query_ctx->query_id, params.fragment_id, query_ctx, _exec_env, cb, + query_ctx->query_id(), params.fragment_id, query_ctx, _exec_env, cb, std::bind(std::mem_fn(&FragmentMgr::coordinator_callback), this, std::placeholders::_1)); { @@ -920,23 +718,11 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, START_AND_SCOPE_SPAN(tracer, span, "exec_instance"); span->SetAttribute("instance_id", print_id(fragment_instance_id)); - std::shared_ptr exec_state(new FragmentExecState( - query_ctx->query_id, fragment_instance_id, params.local_params[i].backend_num, - _exec_env, query_ctx, - std::bind(std::mem_fn(&FragmentMgr::coordinator_callback), this, - std::placeholders::_1))); - if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) { - // set need_wait_execution_trigger means this instance will not actually being executed - // until the execPlanFragmentStart RPC trigger to start it. - exec_state->set_need_wait_execution_trigger(); - } - if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) { query_ctx->set_ready_to_execute_only(); } _setup_shared_hashtable_for_broadcast_join(params, params.local_params[i], - exec_state->executor()->runtime_state(), query_ctx.get()); } { @@ -971,27 +757,15 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, START_AND_SCOPE_SPAN(tracer, span, "exec_instance"); span->SetAttribute("instance_id", print_id(fragment_instance_id)); - std::shared_ptr exec_state(new FragmentExecState( - query_ctx->query_id, fragment_instance_id, local_params.backend_num, _exec_env, - query_ctx, - std::bind(std::mem_fn(&FragmentMgr::coordinator_callback), this, - std::placeholders::_1))); - if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) { - // set need_wait_execution_trigger means this instance will not actually being executed - // until the execPlanFragmentStart RPC trigger to start it. - exec_state->set_need_wait_execution_trigger(); - } - int64_t duration_ns = 0; if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) { query_ctx->set_ready_to_execute_only(); } - _setup_shared_hashtable_for_broadcast_join( - params, local_params, exec_state->executor()->runtime_state(), query_ctx.get()); + _setup_shared_hashtable_for_broadcast_join(params, local_params, query_ctx.get()); std::shared_ptr context = std::make_shared( - query_ctx->query_id, fragment_instance_id, params.fragment_id, + query_ctx->query_id(), fragment_instance_id, params.fragment_id, local_params.backend_num, query_ctx, _exec_env, cb, std::bind(std::mem_fn(&FragmentMgr::coordinator_callback), this, std::placeholders::_1)); @@ -1071,17 +845,17 @@ void FragmentMgr::cancel(const TUniqueId& fragment_id, const PPlanFragmentCancel const std::string& msg) { bool find_the_fragment = false; - std::shared_ptr exec_state; + std::shared_ptr fragment_executor; { std::lock_guard lock(_lock); auto iter = _fragment_map.find(fragment_id); if (iter != _fragment_map.end()) { - exec_state = iter->second; + fragment_executor = iter->second; } } - if (exec_state) { + if (fragment_executor) { find_the_fragment = true; - exec_state->cancel(reason, msg); + fragment_executor->cancel(reason, msg); } std::shared_ptr pipeline_fragment_ctx; @@ -1122,9 +896,9 @@ bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) { auto ctx = _query_ctx_map.find(query_id); if (ctx != _query_ctx_map.end()) { for (auto it : ctx->second->fragment_ids) { - auto exec_state_iter = _fragment_map.find(it); - if (exec_state_iter != _fragment_map.end() && exec_state_iter->second) { - return exec_state_iter->second->is_canceled(); + auto fragment_executor_iter = _fragment_map.find(it); + if (fragment_executor_iter != _fragment_map.end() && fragment_executor_iter->second) { + return fragment_executor_iter->second->is_canceled(); } auto pipeline_ctx_iter = _pipeline_map.find(it); @@ -1305,7 +1079,7 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, UniqueId fragment_instance_id = request->fragment_id(); TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift(); - std::shared_ptr fragment_state; + std::shared_ptr fragment_executor; std::shared_ptr pip_context; RuntimeFilterMgr* runtime_filter_mgr = nullptr; @@ -1327,10 +1101,10 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id; return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string()); } - fragment_state = iter->second; + fragment_executor = iter->second; - DCHECK(fragment_state != nullptr); - runtime_filter_mgr = fragment_state->executor()->runtime_state()->runtime_filter_mgr(); + DCHECK(fragment_executor != nullptr); + runtime_filter_mgr = fragment_executor->runtime_state()->runtime_filter_mgr(); } return runtime_filter_mgr->update_filter(request, attach_data); @@ -1346,7 +1120,7 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, UniqueId fragment_instance_id = fragment_instance_ids[0]; TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift(); - std::shared_ptr fragment_state; + std::shared_ptr fragment_executor; std::shared_ptr pip_context; RuntimeFilterMgr* runtime_filter_mgr = nullptr; @@ -1371,14 +1145,11 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id; return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string()); } - fragment_state = iter->second; + fragment_executor = iter->second; - DCHECK(fragment_state != nullptr); - runtime_filter_mgr = fragment_state->executor() - ->runtime_state() - ->get_query_ctx() - ->runtime_filter_mgr(); - pool = &fragment_state->get_query_ctx()->obj_pool; + DCHECK(fragment_executor != nullptr); + runtime_filter_mgr = fragment_executor->get_query_ctx()->runtime_filter_mgr(); + pool = &fragment_executor->get_query_ctx()->obj_pool; } UpdateRuntimeFilterParamsV2 params(request, attach_data, pool); @@ -1411,7 +1182,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, auto fragment_instance_id = filter_controller->instance_id(); TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift(); - std::shared_ptr fragment_state; + std::shared_ptr fragment_executor; std::shared_ptr pip_context; if (is_pipeline) { std::lock_guard lock(_lock); @@ -1432,16 +1203,15 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string()); } - // hold reference to fragment_state, or else runtime_state can be destroyed + // hold reference to fragment_executor, or else runtime_state can be destroyed // when filter_controller->merge is still in progress - fragment_state = iter->second; + fragment_executor = iter->second; } RETURN_IF_ERROR(filter_controller->merge(request, attach_data, opt_remote_rf)); return Status::OK(); } void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params, - RuntimeState* state, QueryContext* query_ctx) { if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join || !params.query_options.enable_share_hash_table_for_broadcast_join) { @@ -1469,7 +1239,7 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFrag void FragmentMgr::_setup_shared_hashtable_for_broadcast_join( const TPipelineFragmentParams& params, const TPipelineInstanceParams& local_params, - RuntimeState* state, QueryContext* query_ctx) { + QueryContext* query_ctx) { if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join || !params.query_options.enable_share_hash_table_for_broadcast_join) { return; diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 9170945f28..cdc01627a0 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -33,6 +33,7 @@ #include "common/status.h" #include "gutil/ref_counted.h" #include "http/rest_monitor_iface.h" +#include "runtime/query_context.h" #include "runtime_filter_mgr.h" #include "util/countdown_latch.h" #include "util/hash_util.hpp" // IWYU pragma: keep @@ -50,7 +51,7 @@ class PipelineXFragmentContext; } // namespace pipeline class QueryContext; class ExecEnv; -class FragmentExecState; +class PlanFragmentExecutor; class ThreadPool; class TExecPlanFragmentParams; class PExecPlanFragmentStartRequest; @@ -66,21 +67,6 @@ class Thread; std::string to_load_error_http_path(const std::string& file_name); -struct ReportStatusRequest { - const Status& status; - RuntimeProfile* profile; - RuntimeProfile* load_channel_profile; - bool done; - TNetworkAddress coord_addr; - TUniqueId query_id; - int fragment_id; - TUniqueId fragment_instance_id; - int backend_num; - RuntimeState* runtime_state; - std::function update_fn; - std::function cancel_fn; -}; - // This class used to manage all the fragment execute in this instance class FragmentMgr : public RestMonitorIface { public: @@ -146,17 +132,18 @@ public: ThreadPool* get_thread_pool() { return _thread_pool.get(); } private: - void _exec_actual(std::shared_ptr exec_state, const FinishCallback& cb); + void _exec_actual(std::shared_ptr fragment_executor, + const FinishCallback& cb); template void _set_scan_concurrency(const Param& params, QueryContext* query_ctx); void _setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params, - RuntimeState* state, QueryContext* query_ctx); + QueryContext* query_ctx); void _setup_shared_hashtable_for_broadcast_join(const TPipelineFragmentParams& params, const TPipelineInstanceParams& local_params, - RuntimeState* state, QueryContext* query_ctx); + QueryContext* query_ctx); template Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, @@ -169,8 +156,8 @@ private: std::condition_variable _cv; - // Make sure that remove this before no data reference FragmentExecState - std::unordered_map> _fragment_map; + // Make sure that remove this before no data reference PlanFragmentExecutor + std::unordered_map> _fragment_map; std::unordered_map> _pipeline_map; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index f4eb2dffa5..750ff70833 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -20,6 +20,7 @@ #include "runtime/plan_fragment_executor.h" +#include #include #include #include @@ -42,6 +43,7 @@ #include "exec/data_sink.h" #include "exec/exec_node.h" #include "exec/scan_node.h" +#include "io/fs/stream_load_pipe.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" @@ -49,6 +51,8 @@ #include "runtime/query_statistics.h" #include "runtime/result_queue_mgr.h" #include "runtime/runtime_filter_mgr.h" +#include "runtime/stream_load/new_load_stream_mgr.h" +#include "runtime/stream_load/stream_load_context.h" #include "runtime/thread_context.h" #include "util/container_util.hpp" #include "util/defer_op.h" @@ -72,9 +76,16 @@ namespace doris { using namespace ErrorCode; PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, + std::shared_ptr query_ctx, + const TUniqueId& instance_id, int fragment_id, + int backend_num, const report_status_callback& report_status_cb) : _exec_env(exec_env), _plan(nullptr), + _query_ctx(query_ctx), + _fragment_instance_id(instance_id), + _fragment_id(fragment_id), + _backend_num(backend_num), _report_status_cb(report_status_cb), _report_thread_active(false), _done(false), @@ -85,6 +96,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, _collect_query_statistics_with_every_batch(false), _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) { _report_thread_future = _report_thread_promise.get_future(); + _start_time = vectorized::VecDateTimeValue::local_time(); } PlanFragmentExecutor::~PlanFragmentExecutor() { @@ -100,32 +112,31 @@ PlanFragmentExecutor::~PlanFragmentExecutor() { DCHECK(!_report_thread_active); } -Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, - QueryContext* query_ctx) { +Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { OpentelemetryTracer tracer = telemetry::get_noop_tracer(); if (opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext().IsValid()) { - tracer = telemetry::get_tracer(print_id(_query_id)); + tracer = telemetry::get_tracer(print_id(_query_ctx->query_id())); } _span = tracer->StartSpan("Plan_fragment_executor"); OpentelemetryScope scope {_span}; - const TPlanFragmentExecParams& params = request.params; - _query_id = params.query_id; + if (request.__isset.query_options) { + _timeout_second = request.query_options.execution_timeout; + } + const TPlanFragmentExecParams& params = request.params; LOG_INFO("PlanFragmentExecutor::prepare") - .tag("query_id", _query_id) - .tag("instance_id", params.fragment_instance_id) + .tag("query_id", print_id(_query_ctx->query_id())) + .tag("instance_id", print_id(params.fragment_instance_id)) .tag("backend_num", request.backend_num) .tag("pthread_id", (uintptr_t)pthread_self()); // VLOG_CRITICAL << "request:\n" << apache::thrift::ThriftDebugString(request); - const TQueryGlobals& query_globals = - query_ctx == nullptr ? request.query_globals : query_ctx->query_globals; + const TQueryGlobals& query_globals = _query_ctx->query_globals; _runtime_state = RuntimeState::create_unique(params, request.query_options, query_globals, _exec_env); - _runtime_state->set_query_ctx(query_ctx); - _runtime_state->set_query_mem_tracker(query_ctx == nullptr ? _exec_env->orphan_mem_tracker() - : query_ctx->query_mem_tracker); + _runtime_state->set_query_ctx(_query_ctx.get()); + _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker); _runtime_state->set_tracer(std::move(tracer)); SCOPED_ATTACH_TASK(_runtime_state.get()); @@ -149,13 +160,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, } // set up desc tbl - DescriptorTbl* desc_tbl = nullptr; - if (query_ctx != nullptr) { - desc_tbl = query_ctx->desc_tbl; - } else { - DCHECK(request.__isset.desc_tbl); - RETURN_IF_ERROR(DescriptorTbl::create(obj_pool(), request.desc_tbl, &desc_tbl)); - } + DescriptorTbl* desc_tbl = _query_ctx->desc_tbl; _runtime_state->set_desc_tbl(desc_tbl); // set up plan @@ -249,7 +254,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, Status PlanFragmentExecutor::open() { int64_t mem_limit = _runtime_state->query_mem_tracker()->limit(); LOG_INFO("PlanFragmentExecutor::open") - .tag("query_id", _query_id) + .tag("query_id", _query_ctx->query_id()) .tag("instance_id", _runtime_state->fragment_instance_id()) .tag("mem_limit", PrettyPrinter::print(mem_limit, TUnit::BYTES)); @@ -370,6 +375,49 @@ Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block* return Status::OK(); } +Status PlanFragmentExecutor::execute() { + if (_need_wait_execution_trigger) { + // if _need_wait_execution_trigger is true, which means this instance + // is prepared but need to wait for the signal to do the rest execution. + if (!_query_ctx->wait_for_start()) { + cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "wait fragment start timeout"); + return Status::OK(); + } + } +#ifndef BE_TEST + if (_runtime_state->is_cancelled()) { + return Status::Cancelled("cancelled before execution"); + } +#endif + int64_t duration_ns = 0; + { + SCOPED_RAW_TIMER(&duration_ns); + opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start executing Fragment"); + Status st = open(); + WARN_IF_ERROR(st, strings::Substitute("Got error while opening fragment $0, query id: $1", + print_id(_fragment_instance_id), + print_id(_query_ctx->query_id()))); + if (!st.ok()) { + cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, + fmt::format("PlanFragmentExecutor open failed, reason: {}", st.to_string())); + } + close(); + } + DorisMetrics::instance()->fragment_requests_total->increment(1); + DorisMetrics::instance()->fragment_request_duration_us->increment(duration_ns / 1000); + return Status::OK(); +} + +bool PlanFragmentExecutor::is_timeout(const vectorized::VecDateTimeValue& now) const { + if (_timeout_second <= 0) { + return false; + } + if (now.second_diff(_start_time) > _timeout_second) { + return true; + } + return false; +} + void PlanFragmentExecutor::_collect_query_statistics() { _query_statistics->clear(); Status status = _plan->collect_query_statistics(_query_statistics.get()); @@ -446,7 +494,7 @@ void PlanFragmentExecutor::report_profile() { } void PlanFragmentExecutor::send_report(bool done) { - Status status; + Status status = Status::OK(); { std::lock_guard l(_status_lock); status = _status; @@ -465,12 +513,36 @@ void PlanFragmentExecutor::send_report(bool done) { if (!_is_report_success && !_is_report_on_cancel) { return; } - + ReportStatusRequest report_req = { + status, + _is_report_success ? _runtime_state->runtime_profile() : nullptr, + _is_report_success ? _runtime_state->load_channel_profile() : nullptr, + done || !status.ok(), + _query_ctx->coord_addr, + _query_ctx->query_id(), + _fragment_id, + _fragment_instance_id, + _backend_num, + _runtime_state.get(), + std::bind(&PlanFragmentExecutor::update_status, this, std::placeholders::_1), + std::bind(&PlanFragmentExecutor::cancel, this, std::placeholders::_1, + std::placeholders::_2)}; // This will send a report even if we are cancelled. If the query completed correctly // but fragments still need to be cancelled (e.g. limit reached), the coordinator will // be waiting for a final report and profile. - _report_status_cb(status, _is_report_success ? profile() : nullptr, - _is_report_success ? load_channel_profile() : nullptr, done || !status.ok()); + _report_status_cb(report_req); +} + +// Update status of this fragment execute +Status PlanFragmentExecutor::update_status(Status status) { + std::lock_guard l(_status_lock); + if (!status.ok() && _status.ok()) { + _status = status; + LOG(WARNING) << "query_id=" << print_id(_query_ctx->query_id()) + << ", instance_id=" << print_id(_fragment_instance_id) << " meet error status " + << status; + } + return _status; } void PlanFragmentExecutor::stop_report_thread() { @@ -488,24 +560,39 @@ void PlanFragmentExecutor::stop_report_thread() { } void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { + std::lock_guard l(_status_lock); LOG_INFO("PlanFragmentExecutor::cancel") - .tag("query_id", _query_id) - .tag("instance_id", _runtime_state->fragment_instance_id()) + .tag("query_id", print_id(_query_ctx->query_id())) + .tag("instance_id", print_id(_runtime_state->fragment_instance_id())) .tag("reason", reason) .tag("error message", msg); + if (_runtime_state->is_cancelled()) { + LOG(INFO) << "instance is already cancelled, skip cancel again"; + return; + } DCHECK(_prepared); _cancel_reason = reason; + if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { + _is_report_on_cancel = false; + } _cancel_msg = msg; _runtime_state->set_is_cancelled(true, msg); // To notify wait_for_start() - _runtime_state->get_query_ctx()->set_ready_to_execute(true); + _query_ctx->set_ready_to_execute(true); // must close stream_mgr to avoid dead lock in Exchange Node - auto env = _runtime_state->exec_env(); - auto id = _runtime_state->fragment_instance_id(); - env->vstream_mgr()->cancel(id); + _exec_env->vstream_mgr()->cancel(_fragment_instance_id); // Cancel the result queue manager used by spark doris connector - _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg)); + _exec_env->result_queue_mgr()->update_queue_status(_fragment_instance_id, Status::Aborted(msg)); +#ifndef BE_TEST + // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe + // For stream load the fragment's query_id == load id, it is set in FE. + auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_ctx->query_id()); + if (stream_load_ctx != nullptr) { + stream_load_ctx->pipe->cancel(msg); + } +#endif + return; } const RowDescriptor& PlanFragmentExecutor::row_desc() { diff --git a/be/src/runtime/plan_fragment_executor.h b/be/src/runtime/plan_fragment_executor.h index c95ddc75c1..47cd1ff9a8 100644 --- a/be/src/runtime/plan_fragment_executor.h +++ b/be/src/runtime/plan_fragment_executor.h @@ -34,6 +34,7 @@ #include #include "common/status.h" +#include "runtime/query_context.h" #include "runtime/runtime_state.h" #include "util/runtime_profile.h" @@ -47,6 +48,7 @@ class DescriptorTbl; class ExecEnv; class ObjectPool; class QueryStatistics; +struct ReportStatusRequest; namespace vectorized { class Block; @@ -73,18 +75,12 @@ class Block; // thread-safe. class PlanFragmentExecutor { public: - // Callback to report execution status of plan fragment. - // 'profile' is the cumulative profile, 'done' indicates whether the execution - // is done or still continuing. - // Note: this does not take a const RuntimeProfile&, because it might need to call - // functions like PrettyPrint() or to_thrift(), neither of which is const - // because they take locks. - using report_status_callback = - std::function; - + using report_status_callback = std::function; // report_status_cb, if !empty(), is used to report the accumulated profile // information periodically during execution (open() or get_next()). - PlanFragmentExecutor(ExecEnv* exec_env, const report_status_callback& report_status_cb); + PlanFragmentExecutor(ExecEnv* exec_env, std::shared_ptr query_ctx, + const TUniqueId& instance_id, int fragment_id, int backend_num, + const report_status_callback& report_status_cb); // Closes the underlying plan fragment and frees up all resources allocated // in open()/get_next(). @@ -100,7 +96,7 @@ public: // number of bytes this query can consume at runtime. // The query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over that limit. // If query_ctx is not null, some components will be got from query_ctx. - Status prepare(const TExecPlanFragmentParams& request, QueryContext* query_ctx = nullptr); + Status prepare(const TExecPlanFragmentParams& request); // Start execution. Call this prior to get_next(). // If this fragment has a sink, open() will send all rows produced @@ -113,6 +109,10 @@ public: // time when open() returns, and the status-reporting thread will have been stopped. Status open(); + Status execute(); + + const vectorized::VecDateTimeValue& start_time() const { return _start_time; } + // Closes the underlying plan fragment and frees up all resources allocated // in open()/get_next(). void close(); @@ -133,12 +133,34 @@ public: DataSink* get_sink() const { return _sink.get(); } - void set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; } + void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; } + + void set_merge_controller_handler( + std::shared_ptr& handler) { + _merge_controller_handler = handler; + } + + std::shared_ptr get_query_ctx() { return _query_ctx; } + + TUniqueId fragment_instance_id() const { return _fragment_instance_id; } + + TUniqueId query_id() const { return _query_ctx->query_id(); } + + bool is_timeout(const vectorized::VecDateTimeValue& now) const; + + bool is_canceled() { return _runtime_state->is_cancelled(); } + + Status update_status(Status status); private: ExecEnv* _exec_env; // not owned ExecNode* _plan; // lives in _runtime_state->obj_pool() - TUniqueId _query_id; + std::shared_ptr _query_ctx; + // Id of this instance + TUniqueId _fragment_instance_id; + int _fragment_id; + // Used to report to coordinator which backend is over + int _backend_num; // profile reporting-related report_status_callback _report_status_cb; @@ -196,6 +218,16 @@ private: RuntimeProfile::Counter* _fragment_cpu_timer; + std::shared_ptr _merge_controller_handler; + + // If set the true, this plan fragment will be executed only after FE send execution start rpc. + bool _need_wait_execution_trigger = false; + + // Timeout of this instance, it is inited from query options + int _timeout_second = -1; + + vectorized::VecDateTimeValue _start_time; + // It is shared with BufferControlBlock and will be called in two different // threads. But their calls are all at different time, there is no problem of // multithreaded access. diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 54199dc106..dbb8c9b1c0 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -40,7 +40,20 @@ #include "vec/runtime/shared_scanner_controller.h" namespace doris { - +struct ReportStatusRequest { + const Status& status; + RuntimeProfile* profile; + RuntimeProfile* load_channel_profile; + bool done; + TNetworkAddress coord_addr; + TUniqueId query_id; + int fragment_id; + TUniqueId fragment_instance_id; + int backend_num; + RuntimeState* runtime_state; + std::function update_fn; + std::function cancel_fn; +}; // Save the common components of fragments in a query. // Some components like DescriptorTbl may be very large // that will slow down each execution of fragments when DeSer them every time. @@ -49,9 +62,11 @@ class QueryContext { ENABLE_FACTORY_CREATOR(QueryContext); public: - QueryContext(int total_fragment_num, ExecEnv* exec_env, const TQueryOptions& query_options) + QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* exec_env, + const TQueryOptions& query_options) : fragment_num(total_fragment_num), timeout_second(-1), + _query_id(query_id), _exec_env(exec_env), _runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)), _query_options(query_options) { @@ -69,7 +84,7 @@ public: LOG(INFO) << fmt::format( "Deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, " "PeakUsed={}", - print_id(query_id), MemTracker::print_bytes(query_mem_tracker->limit()), + print_id(_query_id), MemTracker::print_bytes(query_mem_tracker->limit()), MemTracker::print_bytes(query_mem_tracker->consumption()), MemTracker::print_bytes(query_mem_tracker->peak_consumption())); } @@ -184,8 +199,9 @@ public: RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); } + TUniqueId query_id() const { return _query_id; } + public: - TUniqueId query_id; DescriptorTbl* desc_tbl; bool set_rsc_info = false; std::string user; @@ -213,6 +229,7 @@ public: std::map file_scan_range_params_map; private: + TUniqueId _query_id; ExecEnv* _exec_env; vectorized::VecDateTimeValue _start_time; @@ -225,7 +242,7 @@ private: std::mutex _start_lock; std::condition_variable _start_cond; - // Only valid when _need_wait_execution_trigger is set to true in FragmentExecState. + // Only valid when _need_wait_execution_trigger is set to true in PlanFragmentExecutor. // And all fragments of this query will start execution when this is set to true. std::atomic _ready_to_execute {false}; std::atomic _is_cancelled {false}; diff --git a/be/test/runtime/fragment_mgr_test.cpp b/be/test/runtime/fragment_mgr_test.cpp deleted file mode 100644 index d7a0f67d64..0000000000 --- a/be/test/runtime/fragment_mgr_test.cpp +++ /dev/null @@ -1,148 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/fragment_mgr.h" - -#include -#include -#include - -// IWYU pragma: no_include -#include // IWYU pragma: keep -#include - -#include "common/config.h" -#include "exec/data_sink.h" -#include "gtest/gtest_pred_impl.h" -#include "runtime/exec_env.h" -#include "runtime/plan_fragment_executor.h" -#include "runtime/runtime_state.h" - -namespace doris { - -static Status s_prepare_status; -static Status s_open_status; -// Mock used for this unittest -PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env, - const report_status_callback& report_status_cb) - : _exec_env(exec_env), _report_status_cb(report_status_cb) {} - -PlanFragmentExecutor::~PlanFragmentExecutor() {} - -Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, - QueryContext* batch_ctx) { - return s_prepare_status; -} - -Status PlanFragmentExecutor::open() { - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - return s_open_status; -} - -void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { -} - -void PlanFragmentExecutor::close() {} - -class FragmentMgrTest : public testing::Test { -public: - FragmentMgrTest() {} - -protected: - virtual void SetUp() { - s_prepare_status = Status::OK(); - s_open_status = Status::OK(); - - config::fragment_pool_thread_num_min = 32; - config::fragment_pool_thread_num_max = 32; - config::fragment_pool_queue_size = 1024; - } - virtual void TearDown() {} -}; - -TEST_F(FragmentMgrTest, Normal) { - FragmentMgr mgr(nullptr); - TExecPlanFragmentParams params; - params.params.fragment_instance_id = TUniqueId(); - params.params.fragment_instance_id.__set_hi(100); - params.params.fragment_instance_id.__set_lo(200); - EXPECT_TRUE(mgr.exec_plan_fragment(params).ok()); - // Duplicated - EXPECT_TRUE(mgr.exec_plan_fragment(params).ok()); -} - -TEST_F(FragmentMgrTest, AddNormal) { - FragmentMgr mgr(nullptr); - for (int i = 0; i < 8; ++i) { - TExecPlanFragmentParams params; - params.params.fragment_instance_id = TUniqueId(); - params.params.fragment_instance_id.__set_hi(100 + i); - params.params.fragment_instance_id.__set_lo(200); - EXPECT_TRUE(mgr.exec_plan_fragment(params).ok()); - } -} - -TEST_F(FragmentMgrTest, CancelNormal) { - FragmentMgr mgr(nullptr); - TExecPlanFragmentParams params; - params.params.fragment_instance_id = TUniqueId(); - params.params.fragment_instance_id.__set_hi(100); - params.params.fragment_instance_id.__set_lo(200); - EXPECT_TRUE(mgr.exec_plan_fragment(params).ok()); -} - -TEST_F(FragmentMgrTest, CancelWithoutAdd) { - FragmentMgr mgr(nullptr); - TExecPlanFragmentParams params; - params.params.fragment_instance_id = TUniqueId(); - params.params.fragment_instance_id.__set_hi(100); - params.params.fragment_instance_id.__set_lo(200); -} - -TEST_F(FragmentMgrTest, PrepareFailed) { - s_prepare_status = Status::InternalError("Prepare failed."); - FragmentMgr mgr(nullptr); - TExecPlanFragmentParams params; - params.params.fragment_instance_id = TUniqueId(); - params.params.fragment_instance_id.__set_hi(100); - params.params.fragment_instance_id.__set_lo(200); - EXPECT_FALSE(mgr.exec_plan_fragment(params).ok()); -} - -TEST_F(FragmentMgrTest, OfferPoolFailed) { - config::fragment_pool_thread_num_min = 1; - config::fragment_pool_thread_num_max = 1; - config::fragment_pool_queue_size = 0; - FragmentMgr mgr(doris::ExecEnv::GetInstance()); - - TExecPlanFragmentParams params; - params.params.fragment_instance_id = TUniqueId(); - params.params.fragment_instance_id.__set_hi(100); - params.params.fragment_instance_id.__set_lo(200); - EXPECT_TRUE(mgr.exec_plan_fragment(params).ok()); - - // the first plan open will cost 50ms, so the next 3 plans will be aborted. - for (int i = 1; i < 4; ++i) { - TExecPlanFragmentParams params; - params.params.fragment_instance_id = TUniqueId(); - params.params.fragment_instance_id.__set_hi(100 + i); - params.params.fragment_instance_id.__set_lo(200); - EXPECT_FALSE(mgr.exec_plan_fragment(params).ok()); - } -} - -} // namespace doris