[refactor](fragment) refactor non pipeline fragment executor (#23281)

---------

Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
yiguolei
2023-08-22 16:00:34 +08:00
committed by GitHub
parent 820d328ad7
commit bcdb481374
9 changed files with 258 additions and 513 deletions

View File

@ -385,7 +385,7 @@ void PipelineTask::set_state(PipelineTaskState state) {
std::string PipelineTask::debug_string() {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id));
fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id()));
fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
print_id(fragment_context()->get_fragment_instance_id()));

View File

@ -240,7 +240,7 @@ Status PipelineXTask::close() {
std::string PipelineXTask::debug_string() {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id));
fmt::format_to(debug_string_buffer, "QueryId: {}\n", print_id(query_context()->query_id()));
fmt::format_to(debug_string_buffer, "RuntimeUsage: {}\n",
PrettyPrinter::print(get_runtime_ns(), TUnit::TIME_NS));

View File

@ -121,7 +121,7 @@ void BlockedTaskScheduler::_schedule() {
} else if (task->fragment_context()->is_canceled()) {
_make_task_run(local_blocked_tasks, iter, ready_tasks);
} else if (task->query_context()->is_timeout(now)) {
LOG(WARNING) << "Timeout, query_id=" << print_id(task->query_context()->query_id)
LOG(WARNING) << "Timeout, query_id=" << print_id(task->query_context()->query_id())
<< ", instance_id="
<< print_id(task->fragment_context()->get_fragment_instance_id());

View File

@ -41,6 +41,8 @@
#include <thrift/transport/TTransportException.h>
#include <atomic>
#include "pipeline/pipeline_x/pipeline_x_fragment_context.h"
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <map>
@ -56,7 +58,6 @@
#include "io/fs/stream_load_pipe.h"
#include "opentelemetry/trace/scope.h"
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_x/pipeline_x_fragment_context.h"
#include "runtime/client_cache.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
@ -110,208 +111,6 @@ std::string to_load_error_http_path(const std::string& file_name) {
using apache::thrift::TException;
using apache::thrift::transport::TTransportException;
class FragmentExecState {
public:
using report_status_callback_impl = std::function<void(const ReportStatusRequest)>;
// Constructor by using QueryContext
FragmentExecState(const TUniqueId& query_id, const TUniqueId& instance_id, int backend_num,
ExecEnv* exec_env, std::shared_ptr<QueryContext> query_ctx,
const report_status_callback_impl& report_status_cb_impl);
Status prepare(const TExecPlanFragmentParams& params);
Status execute();
Status cancel(const PPlanFragmentCancelReason& reason, const std::string& msg = "");
bool is_canceled() { return _cancelled; }
TUniqueId fragment_instance_id() const { return _fragment_instance_id; }
TUniqueId query_id() const { return _query_id; }
PlanFragmentExecutor* executor() { return &_executor; }
const vectorized::VecDateTimeValue& start_time() const { return _start_time; }
void set_merge_controller_handler(
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
_merge_controller_handler = handler;
}
// Update status of this fragment execute
Status update_status(const Status& status) {
std::lock_guard<std::mutex> l(_status_lock);
if (!status.ok() && _exec_status.ok()) {
_exec_status = status;
LOG(WARNING) << "query_id=" << print_id(_query_id)
<< ", instance_id=" << print_id(_fragment_instance_id)
<< " meet error status " << status;
}
return _exec_status;
}
void set_group(const TResourceInfo& info) {
_set_rsc_info = true;
_user = info.user;
_group = info.group;
}
bool is_timeout(const vectorized::VecDateTimeValue& now) const {
if (_timeout_second <= 0) {
return false;
}
if (now.second_diff(_start_time) > _timeout_second) {
return true;
}
return false;
}
int get_timeout_second() const { return _timeout_second; }
std::shared_ptr<QueryContext> get_query_ctx() { return _query_ctx; }
void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; }
private:
void coordinator_callback(const Status& status, RuntimeProfile* profile,
RuntimeProfile* load_channel_profile, bool done);
// Id of this query
TUniqueId _query_id;
// Id of this instance
TUniqueId _fragment_instance_id;
// Used to report to coordinator which backend is over
int _backend_num;
TNetworkAddress _coord_addr;
// This context is shared by all fragments of this host in a query.
// _query_ctx should be the last one to be destructed, because _executor's
// destruct method will call close and it will depend on query context,
// for example runtime profile.
std::shared_ptr<QueryContext> _query_ctx;
PlanFragmentExecutor _executor;
vectorized::VecDateTimeValue _start_time;
std::mutex _status_lock;
Status _exec_status;
bool _set_rsc_info = false;
std::string _user;
std::string _group;
int _timeout_second;
std::atomic<bool> _cancelled {false};
std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
// If set the true, this plan fragment will be executed only after FE send execution start rpc.
bool _need_wait_execution_trigger = false;
report_status_callback_impl _report_status_cb_impl;
};
FragmentExecState::FragmentExecState(const TUniqueId& query_id,
const TUniqueId& fragment_instance_id, int backend_num,
ExecEnv* exec_env, std::shared_ptr<QueryContext> query_ctx,
const report_status_callback_impl& report_status_cb_impl)
: _query_id(query_id),
_fragment_instance_id(fragment_instance_id),
_backend_num(backend_num),
_query_ctx(std::move(query_ctx)),
_executor(exec_env, std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback),
this, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4)),
_set_rsc_info(false),
_timeout_second(-1),
_report_status_cb_impl(report_status_cb_impl) {
_start_time = vectorized::VecDateTimeValue::local_time();
_coord_addr = _query_ctx->coord_addr;
}
Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) {
if (params.__isset.query_options) {
_timeout_second = params.query_options.execution_timeout;
}
if (_query_ctx == nullptr) {
if (params.__isset.resource_info) {
set_group(params.resource_info);
}
}
if (_query_ctx == nullptr) {
return _executor.prepare(params);
} else {
return _executor.prepare(params, _query_ctx.get());
}
}
Status FragmentExecState::execute() {
if (_need_wait_execution_trigger) {
// if _need_wait_execution_trigger is true, which means this instance
// is prepared but need to wait for the signal to do the rest execution.
if (!_query_ctx->wait_for_start()) {
return cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "wait fragment start timeout");
}
}
#ifndef BE_TEST
if (_executor.runtime_state()->is_cancelled()) {
return Status::Cancelled("cancelled before execution");
}
#endif
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start executing Fragment");
Status st = _executor.open();
WARN_IF_ERROR(st,
strings::Substitute("Got error while opening fragment $0, query id: $1",
print_id(_fragment_instance_id), print_id(_query_id)));
if (!st.ok()) {
cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
fmt::format("PlanFragmentExecutor open failed, reason: {}", st.to_string()));
}
_executor.close();
}
DorisMetrics::instance()->fragment_requests_total->increment(1);
DorisMetrics::instance()->fragment_request_duration_us->increment(duration_ns / 1000);
return Status::OK();
}
Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) {
if (!_cancelled) {
std::lock_guard<std::mutex> l(_status_lock);
if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
_executor.set_is_report_on_cancel(false);
}
_executor.cancel(reason, msg);
#ifndef BE_TEST
// Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
// For stream load the fragment's query_id == load id, it is set in FE.
auto stream_load_ctx = _query_ctx->exec_env()->new_load_stream_mgr()->get(_query_id);
if (stream_load_ctx != nullptr) {
stream_load_ctx->pipe->cancel(msg);
}
#endif
_cancelled = true;
}
return Status::OK();
}
// There can only be one of these callbacks in-flight at any moment, because
// it is only invoked from the executor's reporting thread.
// Also, the reported status will always reflect the most recent execution status,
// including the final status when execution finishes.
void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfile* profile,
RuntimeProfile* load_channel_profile, bool done) {
_report_status_cb_impl(
{status, profile, load_channel_profile, done, _coord_addr, _query_id, -1,
_fragment_instance_id, _backend_num, _executor.runtime_state(),
std::bind(&FragmentExecState::update_status, this, std::placeholders::_1),
std::bind(&PlanFragmentExecutor::cancel, &_executor, std::placeholders::_1,
std::placeholders::_2)});
DCHECK(status.ok() || done); // if !status.ok() => done
}
FragmentMgr::FragmentMgr(ExecEnv* exec_env)
: _exec_env(exec_env), _stop_background_threads_latch(1) {
_entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
@ -519,24 +318,25 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
static void empty_function(RuntimeState*, Status*) {}
void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state,
void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_executor,
const FinishCallback& cb) {
std::string func_name {"PlanFragmentExecutor::_exec_actual"};
#ifndef BE_TEST
SCOPED_ATTACH_TASK(exec_state->executor()->runtime_state());
SCOPED_ATTACH_TASK(fragment_executor->runtime_state());
#endif
LOG_INFO(func_name)
.tag("query_id", exec_state->query_id())
.tag("instance_id", exec_state->fragment_instance_id())
.tag("query_id", fragment_executor->query_id())
.tag("instance_id", fragment_executor->fragment_instance_id())
.tag("pthread_id", (uintptr_t)pthread_self());
Status st = exec_state->execute();
Status st = fragment_executor->execute();
if (!st.ok()) {
exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "exec_state execute failed");
fragment_executor->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
"fragment_executor execute failed");
}
std::shared_ptr<QueryContext> query_ctx = exec_state->get_query_ctx();
std::shared_ptr<QueryContext> query_ctx = fragment_executor->get_query_ctx();
bool all_done = false;
if (query_ctx != nullptr) {
// decrease the number of unfinished fragments
@ -546,15 +346,15 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state,
// remove exec state after this fragment finished
{
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.erase(exec_state->fragment_instance_id());
_fragment_map.erase(fragment_executor->fragment_instance_id());
if (all_done && query_ctx) {
_query_ctx_map.erase(query_ctx->query_id);
_query_ctx_map.erase(query_ctx->query_id());
}
}
// Callback after remove from this id
auto status = exec_state->executor()->status();
cb(exec_state->executor()->runtime_state(), &status);
auto status = fragment_executor->status();
cb(fragment_executor->runtime_state(), &status);
}
Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) {
@ -691,19 +491,17 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
} else {
// This may be a first fragment request of the query.
// Create the query fragments context.
query_ctx = QueryContext::create_shared(params.fragment_num_on_host, _exec_env,
query_ctx = QueryContext::create_shared(query_id, params.fragment_num_on_host, _exec_env,
params.query_options);
query_ctx->query_id = query_id;
RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl,
&(query_ctx->desc_tbl)));
query_ctx->coord_addr = params.coord;
// set file scan range params
if (params.__isset.file_scan_params) {
query_ctx->file_scan_range_params_map = params.file_scan_params;
}
query_ctx->coord_addr = params.coord;
LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id.hi, query_ctx->query_id.lo)
LOG(INFO) << "query_id: " << UniqueId(query_ctx->query_id().hi, query_ctx->query_id().lo)
<< " coord_addr " << query_ctx->coord_addr
<< " total fragment num on current host: " << params.fragment_num_on_host;
query_ctx->query_globals = params.query_globals;
@ -731,15 +529,15 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
if (params.query_options.query_type == TQueryType::SELECT) {
query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::QUERY,
fmt::format("Query#Id={}", print_id(query_ctx->query_id)), bytes_limit);
fmt::format("Query#Id={}", print_id(query_ctx->query_id())), bytes_limit);
} else if (params.query_options.query_type == TQueryType::LOAD) {
query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
fmt::format("Load#Id={}", print_id(query_ctx->query_id)), bytes_limit);
fmt::format("Load#Id={}", print_id(query_ctx->query_id())), bytes_limit);
} else { // EXTERNAL
query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
MemTrackerLimiter::Type::LOAD,
fmt::format("External#Id={}", print_id(query_ctx->query_id)), bytes_limit);
fmt::format("External#Id={}", print_id(query_ctx->query_id())), bytes_limit);
}
if (params.query_options.__isset.is_report_success &&
params.query_options.is_report_success) {
@ -756,11 +554,11 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
task_group_info);
tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
query_ctx->set_task_group(tg);
LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id)
LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id())
<< " use task group: " << tg->debug_string();
}
} else {
VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id)
VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id())
<< " does not use task group.";
}
}
@ -771,9 +569,9 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
std::lock_guard<std::mutex> lock(_lock);
auto search = _query_ctx_map.find(query_id);
if (search == _query_ctx_map.end()) {
_query_ctx_map.insert(std::make_pair(query_ctx->query_id, query_ctx));
_query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx));
LOG(INFO) << "Register query/load memory tracker, query/load id: "
<< print_id(query_ctx->query_id)
<< print_id(query_ctx->query_id())
<< " limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES);
} else {
// Already has a query fragments context, use it
@ -809,7 +607,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
}
}
std::shared_ptr<FragmentExecState> exec_state;
std::shared_ptr<QueryContext> query_ctx;
bool pipeline_engine_enabled = params.query_options.__isset.enable_pipeline_engine &&
params.query_options.enable_pipeline_engine;
@ -817,36 +614,37 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
_get_query_ctx(params, params.params.query_id, pipeline_engine_enabled, query_ctx));
query_ctx->fragment_ids.push_back(fragment_instance_id);
exec_state.reset(
new FragmentExecState(query_ctx->query_id, params.params.fragment_instance_id,
params.backend_num, _exec_env, query_ctx,
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback),
this, std::placeholders::_1)));
auto fragment_executor = std::make_shared<PlanFragmentExecutor>(
_exec_env, query_ctx, params.params.fragment_instance_id, -1, params.backend_num,
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
std::placeholders::_1));
if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) {
// set need_wait_execution_trigger means this instance will not actually being executed
// until the execPlanFragmentStart RPC trigger to start it.
exec_state->set_need_wait_execution_trigger();
fragment_executor->set_need_wait_execution_trigger();
}
int64_t duration_ns = 0;
DCHECK(!pipeline_engine_enabled);
{
SCOPED_RAW_TIMER(&duration_ns);
RETURN_IF_ERROR(exec_state->prepare(params));
RETURN_IF_ERROR(fragment_executor->prepare(params));
}
g_fragmentmgr_prepare_latency << (duration_ns / 1000);
std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
_runtimefilter_controller.add_entity(params, &handler, exec_state->executor()->runtime_state());
exec_state->set_merge_controller_handler(handler);
// TODO need check the status, but when I add return_if_error the P0 will not pass
_runtimefilter_controller.add_entity(params, &handler, fragment_executor->runtime_state());
fragment_executor->set_merge_controller_handler(handler);
{
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state));
_fragment_map.insert(std::make_pair(params.params.fragment_instance_id, fragment_executor));
_cv.notify_all();
}
auto st = _thread_pool->submit_func(
[this, exec_state, cb, parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] {
[this, fragment_executor, cb,
parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] {
OpentelemetryScope scope {parent_span};
_exec_actual(exec_state, cb);
_exec_actual(fragment_executor, cb);
});
if (!st.ok()) {
{
@ -854,8 +652,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.erase(params.params.fragment_instance_id);
}
exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
"push plan fragment to thread pool failed");
fragment_executor->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
"push plan fragment to thread pool failed");
return Status::InternalError(
strings::Substitute("push plan fragment $0 to thread pool failed. err = $1, BE: $2",
print_id(params.params.fragment_instance_id), st.to_string(),
@ -888,7 +686,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
int64_t duration_ns = 0;
std::shared_ptr<pipeline::PipelineFragmentContext> context =
std::make_shared<pipeline::PipelineXFragmentContext>(
query_ctx->query_id, params.fragment_id, query_ctx, _exec_env, cb,
query_ctx->query_id(), params.fragment_id, query_ctx, _exec_env, cb,
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
std::placeholders::_1));
{
@ -920,23 +718,11 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
START_AND_SCOPE_SPAN(tracer, span, "exec_instance");
span->SetAttribute("instance_id", print_id(fragment_instance_id));
std::shared_ptr<FragmentExecState> exec_state(new FragmentExecState(
query_ctx->query_id, fragment_instance_id, params.local_params[i].backend_num,
_exec_env, query_ctx,
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
std::placeholders::_1)));
if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) {
// set need_wait_execution_trigger means this instance will not actually being executed
// until the execPlanFragmentStart RPC trigger to start it.
exec_state->set_need_wait_execution_trigger();
}
if (!params.__isset.need_wait_execution_trigger ||
!params.need_wait_execution_trigger) {
query_ctx->set_ready_to_execute_only();
}
_setup_shared_hashtable_for_broadcast_join(params, params.local_params[i],
exec_state->executor()->runtime_state(),
query_ctx.get());
}
{
@ -971,27 +757,15 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
START_AND_SCOPE_SPAN(tracer, span, "exec_instance");
span->SetAttribute("instance_id", print_id(fragment_instance_id));
std::shared_ptr<FragmentExecState> exec_state(new FragmentExecState(
query_ctx->query_id, fragment_instance_id, local_params.backend_num, _exec_env,
query_ctx,
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
std::placeholders::_1)));
if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) {
// set need_wait_execution_trigger means this instance will not actually being executed
// until the execPlanFragmentStart RPC trigger to start it.
exec_state->set_need_wait_execution_trigger();
}
int64_t duration_ns = 0;
if (!params.__isset.need_wait_execution_trigger ||
!params.need_wait_execution_trigger) {
query_ctx->set_ready_to_execute_only();
}
_setup_shared_hashtable_for_broadcast_join(
params, local_params, exec_state->executor()->runtime_state(), query_ctx.get());
_setup_shared_hashtable_for_broadcast_join(params, local_params, query_ctx.get());
std::shared_ptr<pipeline::PipelineFragmentContext> context =
std::make_shared<pipeline::PipelineFragmentContext>(
query_ctx->query_id, fragment_instance_id, params.fragment_id,
query_ctx->query_id(), fragment_instance_id, params.fragment_id,
local_params.backend_num, query_ctx, _exec_env, cb,
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
std::placeholders::_1));
@ -1071,17 +845,17 @@ void FragmentMgr::cancel(const TUniqueId& fragment_id, const PPlanFragmentCancel
const std::string& msg) {
bool find_the_fragment = false;
std::shared_ptr<FragmentExecState> exec_state;
std::shared_ptr<PlanFragmentExecutor> fragment_executor;
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(fragment_id);
if (iter != _fragment_map.end()) {
exec_state = iter->second;
fragment_executor = iter->second;
}
}
if (exec_state) {
if (fragment_executor) {
find_the_fragment = true;
exec_state->cancel(reason, msg);
fragment_executor->cancel(reason, msg);
}
std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_fragment_ctx;
@ -1122,9 +896,9 @@ bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) {
auto ctx = _query_ctx_map.find(query_id);
if (ctx != _query_ctx_map.end()) {
for (auto it : ctx->second->fragment_ids) {
auto exec_state_iter = _fragment_map.find(it);
if (exec_state_iter != _fragment_map.end() && exec_state_iter->second) {
return exec_state_iter->second->is_canceled();
auto fragment_executor_iter = _fragment_map.find(it);
if (fragment_executor_iter != _fragment_map.end() && fragment_executor_iter->second) {
return fragment_executor_iter->second->is_canceled();
}
auto pipeline_ctx_iter = _pipeline_map.find(it);
@ -1305,7 +1079,7 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
UniqueId fragment_instance_id = request->fragment_id();
TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
std::shared_ptr<FragmentExecState> fragment_state;
std::shared_ptr<PlanFragmentExecutor> fragment_executor;
std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
RuntimeFilterMgr* runtime_filter_mgr = nullptr;
@ -1327,10 +1101,10 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id;
return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string());
}
fragment_state = iter->second;
fragment_executor = iter->second;
DCHECK(fragment_state != nullptr);
runtime_filter_mgr = fragment_state->executor()->runtime_state()->runtime_filter_mgr();
DCHECK(fragment_executor != nullptr);
runtime_filter_mgr = fragment_executor->runtime_state()->runtime_filter_mgr();
}
return runtime_filter_mgr->update_filter(request, attach_data);
@ -1346,7 +1120,7 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
UniqueId fragment_instance_id = fragment_instance_ids[0];
TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
std::shared_ptr<FragmentExecState> fragment_state;
std::shared_ptr<PlanFragmentExecutor> fragment_executor;
std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
RuntimeFilterMgr* runtime_filter_mgr = nullptr;
@ -1371,14 +1145,11 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id;
return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string());
}
fragment_state = iter->second;
fragment_executor = iter->second;
DCHECK(fragment_state != nullptr);
runtime_filter_mgr = fragment_state->executor()
->runtime_state()
->get_query_ctx()
->runtime_filter_mgr();
pool = &fragment_state->get_query_ctx()->obj_pool;
DCHECK(fragment_executor != nullptr);
runtime_filter_mgr = fragment_executor->get_query_ctx()->runtime_filter_mgr();
pool = &fragment_executor->get_query_ctx()->obj_pool;
}
UpdateRuntimeFilterParamsV2 params(request, attach_data, pool);
@ -1411,7 +1182,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
auto fragment_instance_id = filter_controller->instance_id();
TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
std::shared_ptr<FragmentExecState> fragment_state;
std::shared_ptr<PlanFragmentExecutor> fragment_executor;
std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
if (is_pipeline) {
std::lock_guard<std::mutex> lock(_lock);
@ -1432,16 +1203,15 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string());
}
// hold reference to fragment_state, or else runtime_state can be destroyed
// hold reference to fragment_executor, or else runtime_state can be destroyed
// when filter_controller->merge is still in progress
fragment_state = iter->second;
fragment_executor = iter->second;
}
RETURN_IF_ERROR(filter_controller->merge(request, attach_data, opt_remote_rf));
return Status::OK();
}
void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params,
RuntimeState* state,
QueryContext* query_ctx) {
if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join ||
!params.query_options.enable_share_hash_table_for_broadcast_join) {
@ -1469,7 +1239,7 @@ void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const TExecPlanFrag
void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(
const TPipelineFragmentParams& params, const TPipelineInstanceParams& local_params,
RuntimeState* state, QueryContext* query_ctx) {
QueryContext* query_ctx) {
if (!params.query_options.__isset.enable_share_hash_table_for_broadcast_join ||
!params.query_options.enable_share_hash_table_for_broadcast_join) {
return;

View File

@ -33,6 +33,7 @@
#include "common/status.h"
#include "gutil/ref_counted.h"
#include "http/rest_monitor_iface.h"
#include "runtime/query_context.h"
#include "runtime_filter_mgr.h"
#include "util/countdown_latch.h"
#include "util/hash_util.hpp" // IWYU pragma: keep
@ -50,7 +51,7 @@ class PipelineXFragmentContext;
} // namespace pipeline
class QueryContext;
class ExecEnv;
class FragmentExecState;
class PlanFragmentExecutor;
class ThreadPool;
class TExecPlanFragmentParams;
class PExecPlanFragmentStartRequest;
@ -66,21 +67,6 @@ class Thread;
std::string to_load_error_http_path(const std::string& file_name);
struct ReportStatusRequest {
const Status& status;
RuntimeProfile* profile;
RuntimeProfile* load_channel_profile;
bool done;
TNetworkAddress coord_addr;
TUniqueId query_id;
int fragment_id;
TUniqueId fragment_instance_id;
int backend_num;
RuntimeState* runtime_state;
std::function<Status(Status)> update_fn;
std::function<void(const PPlanFragmentCancelReason&, const std::string&)> cancel_fn;
};
// This class used to manage all the fragment execute in this instance
class FragmentMgr : public RestMonitorIface {
public:
@ -146,17 +132,18 @@ public:
ThreadPool* get_thread_pool() { return _thread_pool.get(); }
private:
void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, const FinishCallback& cb);
void _exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_executor,
const FinishCallback& cb);
template <typename Param>
void _set_scan_concurrency(const Param& params, QueryContext* query_ctx);
void _setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params,
RuntimeState* state, QueryContext* query_ctx);
QueryContext* query_ctx);
void _setup_shared_hashtable_for_broadcast_join(const TPipelineFragmentParams& params,
const TPipelineInstanceParams& local_params,
RuntimeState* state, QueryContext* query_ctx);
QueryContext* query_ctx);
template <typename Params>
Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline,
@ -169,8 +156,8 @@ private:
std::condition_variable _cv;
// Make sure that remove this before no data reference FragmentExecState
std::unordered_map<TUniqueId, std::shared_ptr<FragmentExecState>> _fragment_map;
// Make sure that remove this before no data reference PlanFragmentExecutor
std::unordered_map<TUniqueId, std::shared_ptr<PlanFragmentExecutor>> _fragment_map;
std::unordered_map<TUniqueId, std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map;

View File

@ -20,6 +20,7 @@
#include "runtime/plan_fragment_executor.h"
#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Planner_types.h>
@ -42,6 +43,7 @@
#include "exec/data_sink.h"
#include "exec/exec_node.h"
#include "exec/scan_node.h"
#include "io/fs/stream_load_pipe.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
@ -49,6 +51,8 @@
#include "runtime/query_statistics.h"
#include "runtime/result_queue_mgr.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/thread_context.h"
#include "util/container_util.hpp"
#include "util/defer_op.h"
@ -72,9 +76,16 @@ namespace doris {
using namespace ErrorCode;
PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
std::shared_ptr<QueryContext> query_ctx,
const TUniqueId& instance_id, int fragment_id,
int backend_num,
const report_status_callback& report_status_cb)
: _exec_env(exec_env),
_plan(nullptr),
_query_ctx(query_ctx),
_fragment_instance_id(instance_id),
_fragment_id(fragment_id),
_backend_num(backend_num),
_report_status_cb(report_status_cb),
_report_thread_active(false),
_done(false),
@ -85,6 +96,7 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
_collect_query_statistics_with_every_batch(false),
_cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR) {
_report_thread_future = _report_thread_promise.get_future();
_start_time = vectorized::VecDateTimeValue::local_time();
}
PlanFragmentExecutor::~PlanFragmentExecutor() {
@ -100,32 +112,31 @@ PlanFragmentExecutor::~PlanFragmentExecutor() {
DCHECK(!_report_thread_active);
}
Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
QueryContext* query_ctx) {
Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) {
OpentelemetryTracer tracer = telemetry::get_noop_tracer();
if (opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext().IsValid()) {
tracer = telemetry::get_tracer(print_id(_query_id));
tracer = telemetry::get_tracer(print_id(_query_ctx->query_id()));
}
_span = tracer->StartSpan("Plan_fragment_executor");
OpentelemetryScope scope {_span};
const TPlanFragmentExecParams& params = request.params;
_query_id = params.query_id;
if (request.__isset.query_options) {
_timeout_second = request.query_options.execution_timeout;
}
const TPlanFragmentExecParams& params = request.params;
LOG_INFO("PlanFragmentExecutor::prepare")
.tag("query_id", _query_id)
.tag("instance_id", params.fragment_instance_id)
.tag("query_id", print_id(_query_ctx->query_id()))
.tag("instance_id", print_id(params.fragment_instance_id))
.tag("backend_num", request.backend_num)
.tag("pthread_id", (uintptr_t)pthread_self());
// VLOG_CRITICAL << "request:\n" << apache::thrift::ThriftDebugString(request);
const TQueryGlobals& query_globals =
query_ctx == nullptr ? request.query_globals : query_ctx->query_globals;
const TQueryGlobals& query_globals = _query_ctx->query_globals;
_runtime_state =
RuntimeState::create_unique(params, request.query_options, query_globals, _exec_env);
_runtime_state->set_query_ctx(query_ctx);
_runtime_state->set_query_mem_tracker(query_ctx == nullptr ? _exec_env->orphan_mem_tracker()
: query_ctx->query_mem_tracker);
_runtime_state->set_query_ctx(_query_ctx.get());
_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
_runtime_state->set_tracer(std::move(tracer));
SCOPED_ATTACH_TASK(_runtime_state.get());
@ -149,13 +160,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
}
// set up desc tbl
DescriptorTbl* desc_tbl = nullptr;
if (query_ctx != nullptr) {
desc_tbl = query_ctx->desc_tbl;
} else {
DCHECK(request.__isset.desc_tbl);
RETURN_IF_ERROR(DescriptorTbl::create(obj_pool(), request.desc_tbl, &desc_tbl));
}
DescriptorTbl* desc_tbl = _query_ctx->desc_tbl;
_runtime_state->set_desc_tbl(desc_tbl);
// set up plan
@ -249,7 +254,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
Status PlanFragmentExecutor::open() {
int64_t mem_limit = _runtime_state->query_mem_tracker()->limit();
LOG_INFO("PlanFragmentExecutor::open")
.tag("query_id", _query_id)
.tag("query_id", _query_ctx->query_id())
.tag("instance_id", _runtime_state->fragment_instance_id())
.tag("mem_limit", PrettyPrinter::print(mem_limit, TUnit::BYTES));
@ -370,6 +375,49 @@ Status PlanFragmentExecutor::get_vectorized_internal(::doris::vectorized::Block*
return Status::OK();
}
Status PlanFragmentExecutor::execute() {
if (_need_wait_execution_trigger) {
// if _need_wait_execution_trigger is true, which means this instance
// is prepared but need to wait for the signal to do the rest execution.
if (!_query_ctx->wait_for_start()) {
cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "wait fragment start timeout");
return Status::OK();
}
}
#ifndef BE_TEST
if (_runtime_state->is_cancelled()) {
return Status::Cancelled("cancelled before execution");
}
#endif
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start executing Fragment");
Status st = open();
WARN_IF_ERROR(st, strings::Substitute("Got error while opening fragment $0, query id: $1",
print_id(_fragment_instance_id),
print_id(_query_ctx->query_id())));
if (!st.ok()) {
cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
fmt::format("PlanFragmentExecutor open failed, reason: {}", st.to_string()));
}
close();
}
DorisMetrics::instance()->fragment_requests_total->increment(1);
DorisMetrics::instance()->fragment_request_duration_us->increment(duration_ns / 1000);
return Status::OK();
}
bool PlanFragmentExecutor::is_timeout(const vectorized::VecDateTimeValue& now) const {
if (_timeout_second <= 0) {
return false;
}
if (now.second_diff(_start_time) > _timeout_second) {
return true;
}
return false;
}
void PlanFragmentExecutor::_collect_query_statistics() {
_query_statistics->clear();
Status status = _plan->collect_query_statistics(_query_statistics.get());
@ -446,7 +494,7 @@ void PlanFragmentExecutor::report_profile() {
}
void PlanFragmentExecutor::send_report(bool done) {
Status status;
Status status = Status::OK();
{
std::lock_guard<std::mutex> l(_status_lock);
status = _status;
@ -465,12 +513,36 @@ void PlanFragmentExecutor::send_report(bool done) {
if (!_is_report_success && !_is_report_on_cancel) {
return;
}
ReportStatusRequest report_req = {
status,
_is_report_success ? _runtime_state->runtime_profile() : nullptr,
_is_report_success ? _runtime_state->load_channel_profile() : nullptr,
done || !status.ok(),
_query_ctx->coord_addr,
_query_ctx->query_id(),
_fragment_id,
_fragment_instance_id,
_backend_num,
_runtime_state.get(),
std::bind(&PlanFragmentExecutor::update_status, this, std::placeholders::_1),
std::bind(&PlanFragmentExecutor::cancel, this, std::placeholders::_1,
std::placeholders::_2)};
// This will send a report even if we are cancelled. If the query completed correctly
// but fragments still need to be cancelled (e.g. limit reached), the coordinator will
// be waiting for a final report and profile.
_report_status_cb(status, _is_report_success ? profile() : nullptr,
_is_report_success ? load_channel_profile() : nullptr, done || !status.ok());
_report_status_cb(report_req);
}
// Update status of this fragment execute
Status PlanFragmentExecutor::update_status(Status status) {
std::lock_guard<std::mutex> l(_status_lock);
if (!status.ok() && _status.ok()) {
_status = status;
LOG(WARNING) << "query_id=" << print_id(_query_ctx->query_id())
<< ", instance_id=" << print_id(_fragment_instance_id) << " meet error status "
<< status;
}
return _status;
}
void PlanFragmentExecutor::stop_report_thread() {
@ -488,24 +560,39 @@ void PlanFragmentExecutor::stop_report_thread() {
}
void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) {
std::lock_guard<std::mutex> l(_status_lock);
LOG_INFO("PlanFragmentExecutor::cancel")
.tag("query_id", _query_id)
.tag("instance_id", _runtime_state->fragment_instance_id())
.tag("query_id", print_id(_query_ctx->query_id()))
.tag("instance_id", print_id(_runtime_state->fragment_instance_id()))
.tag("reason", reason)
.tag("error message", msg);
if (_runtime_state->is_cancelled()) {
LOG(INFO) << "instance is already cancelled, skip cancel again";
return;
}
DCHECK(_prepared);
_cancel_reason = reason;
if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {
_is_report_on_cancel = false;
}
_cancel_msg = msg;
_runtime_state->set_is_cancelled(true, msg);
// To notify wait_for_start()
_runtime_state->get_query_ctx()->set_ready_to_execute(true);
_query_ctx->set_ready_to_execute(true);
// must close stream_mgr to avoid dead lock in Exchange Node
auto env = _runtime_state->exec_env();
auto id = _runtime_state->fragment_instance_id();
env->vstream_mgr()->cancel(id);
_exec_env->vstream_mgr()->cancel(_fragment_instance_id);
// Cancel the result queue manager used by spark doris connector
_exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg));
_exec_env->result_queue_mgr()->update_queue_status(_fragment_instance_id, Status::Aborted(msg));
#ifndef BE_TEST
// Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
// For stream load the fragment's query_id == load id, it is set in FE.
auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_ctx->query_id());
if (stream_load_ctx != nullptr) {
stream_load_ctx->pipe->cancel(msg);
}
#endif
return;
}
const RowDescriptor& PlanFragmentExecutor::row_desc() {

View File

@ -34,6 +34,7 @@
#include <vector>
#include "common/status.h"
#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
@ -47,6 +48,7 @@ class DescriptorTbl;
class ExecEnv;
class ObjectPool;
class QueryStatistics;
struct ReportStatusRequest;
namespace vectorized {
class Block;
@ -73,18 +75,12 @@ class Block;
// thread-safe.
class PlanFragmentExecutor {
public:
// Callback to report execution status of plan fragment.
// 'profile' is the cumulative profile, 'done' indicates whether the execution
// is done or still continuing.
// Note: this does not take a const RuntimeProfile&, because it might need to call
// functions like PrettyPrint() or to_thrift(), neither of which is const
// because they take locks.
using report_status_callback =
std::function<void(const Status&, RuntimeProfile*, RuntimeProfile*, bool)>;
using report_status_callback = std::function<void(const ReportStatusRequest)>;
// report_status_cb, if !empty(), is used to report the accumulated profile
// information periodically during execution (open() or get_next()).
PlanFragmentExecutor(ExecEnv* exec_env, const report_status_callback& report_status_cb);
PlanFragmentExecutor(ExecEnv* exec_env, std::shared_ptr<QueryContext> query_ctx,
const TUniqueId& instance_id, int fragment_id, int backend_num,
const report_status_callback& report_status_cb);
// Closes the underlying plan fragment and frees up all resources allocated
// in open()/get_next().
@ -100,7 +96,7 @@ public:
// number of bytes this query can consume at runtime.
// The query will be aborted (MEM_LIMIT_EXCEEDED) if it goes over that limit.
// If query_ctx is not null, some components will be got from query_ctx.
Status prepare(const TExecPlanFragmentParams& request, QueryContext* query_ctx = nullptr);
Status prepare(const TExecPlanFragmentParams& request);
// Start execution. Call this prior to get_next().
// If this fragment has a sink, open() will send all rows produced
@ -113,6 +109,10 @@ public:
// time when open() returns, and the status-reporting thread will have been stopped.
Status open();
Status execute();
const vectorized::VecDateTimeValue& start_time() const { return _start_time; }
// Closes the underlying plan fragment and frees up all resources allocated
// in open()/get_next().
void close();
@ -133,12 +133,34 @@ public:
DataSink* get_sink() const { return _sink.get(); }
void set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; }
void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; }
void set_merge_controller_handler(
std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
_merge_controller_handler = handler;
}
std::shared_ptr<QueryContext> get_query_ctx() { return _query_ctx; }
TUniqueId fragment_instance_id() const { return _fragment_instance_id; }
TUniqueId query_id() const { return _query_ctx->query_id(); }
bool is_timeout(const vectorized::VecDateTimeValue& now) const;
bool is_canceled() { return _runtime_state->is_cancelled(); }
Status update_status(Status status);
private:
ExecEnv* _exec_env; // not owned
ExecNode* _plan; // lives in _runtime_state->obj_pool()
TUniqueId _query_id;
std::shared_ptr<QueryContext> _query_ctx;
// Id of this instance
TUniqueId _fragment_instance_id;
int _fragment_id;
// Used to report to coordinator which backend is over
int _backend_num;
// profile reporting-related
report_status_callback _report_status_cb;
@ -196,6 +218,16 @@ private:
RuntimeProfile::Counter* _fragment_cpu_timer;
std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
// If set the true, this plan fragment will be executed only after FE send execution start rpc.
bool _need_wait_execution_trigger = false;
// Timeout of this instance, it is inited from query options
int _timeout_second = -1;
vectorized::VecDateTimeValue _start_time;
// It is shared with BufferControlBlock and will be called in two different
// threads. But their calls are all at different time, there is no problem of
// multithreaded access.

View File

@ -40,7 +40,20 @@
#include "vec/runtime/shared_scanner_controller.h"
namespace doris {
struct ReportStatusRequest {
const Status& status;
RuntimeProfile* profile;
RuntimeProfile* load_channel_profile;
bool done;
TNetworkAddress coord_addr;
TUniqueId query_id;
int fragment_id;
TUniqueId fragment_instance_id;
int backend_num;
RuntimeState* runtime_state;
std::function<Status(Status)> update_fn;
std::function<void(const PPlanFragmentCancelReason&, const std::string&)> cancel_fn;
};
// Save the common components of fragments in a query.
// Some components like DescriptorTbl may be very large
// that will slow down each execution of fragments when DeSer them every time.
@ -49,9 +62,11 @@ class QueryContext {
ENABLE_FACTORY_CREATOR(QueryContext);
public:
QueryContext(int total_fragment_num, ExecEnv* exec_env, const TQueryOptions& query_options)
QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* exec_env,
const TQueryOptions& query_options)
: fragment_num(total_fragment_num),
timeout_second(-1),
_query_id(query_id),
_exec_env(exec_env),
_runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)),
_query_options(query_options) {
@ -69,7 +84,7 @@ public:
LOG(INFO) << fmt::format(
"Deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
"PeakUsed={}",
print_id(query_id), MemTracker::print_bytes(query_mem_tracker->limit()),
print_id(_query_id), MemTracker::print_bytes(query_mem_tracker->limit()),
MemTracker::print_bytes(query_mem_tracker->consumption()),
MemTracker::print_bytes(query_mem_tracker->peak_consumption()));
}
@ -184,8 +199,9 @@ public:
RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); }
TUniqueId query_id() const { return _query_id; }
public:
TUniqueId query_id;
DescriptorTbl* desc_tbl;
bool set_rsc_info = false;
std::string user;
@ -213,6 +229,7 @@ public:
std::map<int, TFileScanRangeParams> file_scan_range_params_map;
private:
TUniqueId _query_id;
ExecEnv* _exec_env;
vectorized::VecDateTimeValue _start_time;
@ -225,7 +242,7 @@ private:
std::mutex _start_lock;
std::condition_variable _start_cond;
// Only valid when _need_wait_execution_trigger is set to true in FragmentExecState.
// Only valid when _need_wait_execution_trigger is set to true in PlanFragmentExecutor.
// And all fragments of this query will start execution when this is set to true.
std::atomic<bool> _ready_to_execute {false};
std::atomic<bool> _is_cancelled {false};

View File

@ -1,148 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/fragment_mgr.h"
#include <gen_cpp/PaloInternalService_types.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <thread>
#include "common/config.h"
#include "exec/data_sink.h"
#include "gtest/gtest_pred_impl.h"
#include "runtime/exec_env.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/runtime_state.h"
namespace doris {
static Status s_prepare_status;
static Status s_open_status;
// Mock used for this unittest
PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
const report_status_callback& report_status_cb)
: _exec_env(exec_env), _report_status_cb(report_status_cb) {}
PlanFragmentExecutor::~PlanFragmentExecutor() {}
Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
QueryContext* batch_ctx) {
return s_prepare_status;
}
Status PlanFragmentExecutor::open() {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
return s_open_status;
}
void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) {
}
void PlanFragmentExecutor::close() {}
class FragmentMgrTest : public testing::Test {
public:
FragmentMgrTest() {}
protected:
virtual void SetUp() {
s_prepare_status = Status::OK();
s_open_status = Status::OK();
config::fragment_pool_thread_num_min = 32;
config::fragment_pool_thread_num_max = 32;
config::fragment_pool_queue_size = 1024;
}
virtual void TearDown() {}
};
TEST_F(FragmentMgrTest, Normal) {
FragmentMgr mgr(nullptr);
TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
params.params.fragment_instance_id.__set_hi(100);
params.params.fragment_instance_id.__set_lo(200);
EXPECT_TRUE(mgr.exec_plan_fragment(params).ok());
// Duplicated
EXPECT_TRUE(mgr.exec_plan_fragment(params).ok());
}
TEST_F(FragmentMgrTest, AddNormal) {
FragmentMgr mgr(nullptr);
for (int i = 0; i < 8; ++i) {
TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
params.params.fragment_instance_id.__set_hi(100 + i);
params.params.fragment_instance_id.__set_lo(200);
EXPECT_TRUE(mgr.exec_plan_fragment(params).ok());
}
}
TEST_F(FragmentMgrTest, CancelNormal) {
FragmentMgr mgr(nullptr);
TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
params.params.fragment_instance_id.__set_hi(100);
params.params.fragment_instance_id.__set_lo(200);
EXPECT_TRUE(mgr.exec_plan_fragment(params).ok());
}
TEST_F(FragmentMgrTest, CancelWithoutAdd) {
FragmentMgr mgr(nullptr);
TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
params.params.fragment_instance_id.__set_hi(100);
params.params.fragment_instance_id.__set_lo(200);
}
TEST_F(FragmentMgrTest, PrepareFailed) {
s_prepare_status = Status::InternalError("Prepare failed.");
FragmentMgr mgr(nullptr);
TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
params.params.fragment_instance_id.__set_hi(100);
params.params.fragment_instance_id.__set_lo(200);
EXPECT_FALSE(mgr.exec_plan_fragment(params).ok());
}
TEST_F(FragmentMgrTest, OfferPoolFailed) {
config::fragment_pool_thread_num_min = 1;
config::fragment_pool_thread_num_max = 1;
config::fragment_pool_queue_size = 0;
FragmentMgr mgr(doris::ExecEnv::GetInstance());
TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
params.params.fragment_instance_id.__set_hi(100);
params.params.fragment_instance_id.__set_lo(200);
EXPECT_TRUE(mgr.exec_plan_fragment(params).ok());
// the first plan open will cost 50ms, so the next 3 plans will be aborted.
for (int i = 1; i < 4; ++i) {
TExecPlanFragmentParams params;
params.params.fragment_instance_id = TUniqueId();
params.params.fragment_instance_id.__set_hi(100 + i);
params.params.fragment_instance_id.__set_lo(200);
EXPECT_FALSE(mgr.exec_plan_fragment(params).ok());
}
}
} // namespace doris