[Fix](query execution) Fix result sink fragment can't be cancelled in non-pipeline (#25524)

This commit is contained in:
zhiqiang
2023-10-23 22:30:29 -05:00
committed by GitHub
parent 215c1625b2
commit 87b414cdae
13 changed files with 90 additions and 109 deletions

View File

@ -158,7 +158,11 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
.tag("instance_id", print_id(_runtime_state->fragment_instance_id()))
.tag("reason", PPlanFragmentCancelReason_Name(reason))
.tag("message", msg);
// TODO(zhiqiang): may be not need to check if query is already cancelled.
// Dont cancel in this situation may lead to bug. For example, result sink node
// can not be cancelled if other fragments set the query_ctx cancelled, this will
// make result receiver on fe be stocked on rpc forever until timeout...
// We need a more detail discussion.
if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) {
if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
LOG(WARNING) << "PipelineFragmentContext "

View File

@ -96,7 +96,7 @@
namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(plan_fragment_count, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::NOUNIT);
bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare");
@ -119,7 +119,8 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
: _exec_env(exec_env), _stop_background_threads_latch(1) {
_entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
REGISTER_HOOK_METRIC(plan_fragment_count, [this]() { return _fragment_map.size(); });
REGISTER_HOOK_METRIC(fragment_instance_count,
[this]() { return _fragment_instance_map.size(); });
auto s = Thread::create(
"FragmentMgr", "cancel_timeout_plan_fragment", [this]() { this->cancel_worker(); },
@ -149,7 +150,7 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)
FragmentMgr::~FragmentMgr() = default;
void FragmentMgr::stop() {
DEREGISTER_HOOK_METRIC(plan_fragment_count);
DEREGISTER_HOOK_METRIC(fragment_instance_count);
DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
_stop_background_threads_latch.count_down();
if (_cancel_thread) {
@ -162,7 +163,7 @@ void FragmentMgr::stop() {
// Only me can delete
{
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.clear();
_fragment_instance_map.clear();
_query_ctx_map.clear();
for (auto& pipeline : _pipeline_map) {
pipeline.second->close_sink();
@ -450,7 +451,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_ex
// remove exec state after this fragment finished
{
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.erase(fragment_executor->fragment_instance_id());
_fragment_instance_map.erase(fragment_executor->fragment_instance_id());
if (all_done && query_ctx) {
_query_ctx_map.erase(query_ctx->query_id());
}
@ -709,8 +710,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
const TUniqueId& fragment_instance_id = params.params.fragment_instance_id;
{
std::lock_guard<std::mutex> lock(_lock);
auto iter = _fragment_map.find(fragment_instance_id);
if (iter != _fragment_map.end()) {
auto iter = _fragment_instance_map.find(fragment_instance_id);
if (iter != _fragment_instance_map.end()) {
// Duplicated
LOG(WARNING) << "duplicate fragment instance id: " << print_id(fragment_instance_id);
return Status::OK();
@ -727,7 +728,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
// memory, but query_is_canncelled will traverse the vector, it will core.
// query_is_cancelled is called in allocator, we has to avoid dead lock.
std::lock_guard<std::mutex> lock(_lock);
query_ctx->fragment_ids.push_back(fragment_instance_id);
query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
}
auto fragment_executor = std::make_shared<PlanFragmentExecutor>(
@ -754,7 +755,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
fragment_executor->set_merge_controller_handler(handler);
{
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.insert(std::make_pair(params.params.fragment_instance_id, fragment_executor));
_fragment_instance_map.insert(
std::make_pair(params.params.fragment_instance_id, fragment_executor));
_cv.notify_all();
}
auto st = _thread_pool->submit_func(
@ -767,7 +769,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
{
// Remove the exec state added
std::lock_guard<std::mutex> lock(_lock);
_fragment_map.erase(params.params.fragment_instance_id);
_fragment_instance_map.erase(params.params.fragment_instance_id);
}
fragment_executor->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR,
"push plan fragment to thread pool failed");
@ -833,7 +835,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
// Duplicated
return Status::OK();
}
query_ctx->fragment_ids.push_back(fragment_instance_id);
query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
}
START_AND_SCOPE_SPAN(tracer, span, "exec_instance");
span->SetAttribute("instance_id", print_id(fragment_instance_id));
@ -870,7 +872,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
// Duplicated
return Status::OK();
}
query_ctx->fragment_ids.push_back(fragment_instance_id);
query_ctx->fragment_instance_ids.push_back(fragment_instance_id);
}
START_AND_SCOPE_SPAN(tracer, span, "exec_instance");
span->SetAttribute("instance_id", print_id(fragment_instance_id));
@ -979,37 +981,13 @@ void FragmentMgr::cancel_query_unlocked(const TUniqueId& query_id,
return;
}
if (ctx->second->enable_pipeline_exec()) {
for (auto it : ctx->second->fragment_ids) {
// instance_id will not be removed from query_context.instance_ids currently
// and it will be removed from fragment_mgr::_pipeline_map only.
// so we add this check to avoid too many WARNING log.
if (_pipeline_map.contains(it)) {
cancel_instance_unlocked(it, reason, state_lock, msg);
}
}
} else {
for (auto it : ctx->second->fragment_ids) {
cancel_fragment_unlocked(it, reason, state_lock, msg);
}
for (auto it : ctx->second->fragment_instance_ids) {
cancel_instance_unlocked(it, reason, state_lock, msg);
}
ctx->second->cancel(true, msg, Status::Cancelled(msg));
_query_ctx_map.erase(query_id);
LOG(INFO) << "Query " << print_id(query_id) << " is cancelled. Reason: " << msg;
}
void FragmentMgr::cancel_fragment(const TUniqueId& fragment_id,
const PPlanFragmentCancelReason& reason, const std::string& msg) {
std::unique_lock<std::mutex> state_lock(_lock);
return cancel_fragment_unlocked(fragment_id, reason, state_lock, msg);
}
void FragmentMgr::cancel_fragment_unlocked(const TUniqueId& fragment_id,
const PPlanFragmentCancelReason& reason,
const std::unique_lock<std::mutex>& state_lock,
const std::string& msg) {
return cancel_unlocked_impl(fragment_id, reason, state_lock, false /*not pipeline query*/, msg);
LOG(INFO) << "Query " << print_id(query_id) << " is cancelled and removed. Reason: " << msg;
}
void FragmentMgr::cancel_instance(const TUniqueId& instance_id,
@ -1022,32 +1000,25 @@ void FragmentMgr::cancel_instance_unlocked(const TUniqueId& instance_id,
const PPlanFragmentCancelReason& reason,
const std::unique_lock<std::mutex>& state_lock,
const std::string& msg) {
return cancel_unlocked_impl(instance_id, reason, state_lock, true /*pipeline query*/, msg);
}
const bool is_pipeline_instance = _pipeline_map.contains(instance_id);
void FragmentMgr::cancel_unlocked_impl(const TUniqueId& id, const PPlanFragmentCancelReason& reason,
const std::unique_lock<std::mutex>& /*state_lock*/,
bool is_pipeline, const std::string& msg) {
if (is_pipeline) {
const TUniqueId& instance_id = id;
if (is_pipeline_instance) {
auto itr = _pipeline_map.find(instance_id);
if (itr != _pipeline_map.end()) {
// calling PipelineFragmentContext::cancel
itr->second->cancel(reason, msg);
} else {
LOG(WARNING) << "Could not find the instance id:" << print_id(instance_id)
LOG(WARNING) << "Could not find the pipeline instance id:" << print_id(instance_id)
<< " to cancel";
}
} else {
const TUniqueId& fragment_id = id;
auto itr = _fragment_map.find(fragment_id);
if (itr != _fragment_map.end()) {
auto itr = _fragment_instance_map.find(instance_id);
if (itr != _fragment_instance_map.end()) {
// calling PlanFragmentExecutor::cancel
itr->second->cancel(reason, msg);
} else {
LOG(WARNING) << "Could not find the fragment id:" << print_id(fragment_id)
LOG(WARNING) << "Could not find the fragment instance id:" << print_id(instance_id)
<< " to cancel";
}
}
@ -1059,17 +1030,17 @@ bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) {
if (ctx != _query_ctx_map.end()) {
const bool is_pipeline_version = ctx->second->enable_pipeline_exec();
for (auto itr : ctx->second->fragment_ids) {
for (auto itr : ctx->second->fragment_instance_ids) {
if (is_pipeline_version) {
auto pipeline_ctx_iter = _pipeline_map.find(itr);
if (pipeline_ctx_iter != _pipeline_map.end() && pipeline_ctx_iter->second) {
return pipeline_ctx_iter->second->is_canceled();
}
} else {
auto fragment_executor_iter = _fragment_map.find(itr);
if (fragment_executor_iter != _fragment_map.end() &&
fragment_executor_iter->second) {
return fragment_executor_iter->second->is_canceled();
auto fragment_instance_itr = _fragment_instance_map.find(itr);
if (fragment_instance_itr != _fragment_instance_map.end() &&
fragment_instance_itr->second) {
return fragment_instance_itr->second->is_canceled();
}
}
}
@ -1086,9 +1057,9 @@ void FragmentMgr::cancel_worker() {
VecDateTimeValue now = VecDateTimeValue::local_time();
{
std::lock_guard<std::mutex> lock(_lock);
for (auto& it : _fragment_map) {
if (it.second->is_timeout(now)) {
to_cancel.push_back(it.second->fragment_instance_id());
for (auto& fragment_instance_itr : _fragment_instance_map) {
if (fragment_instance_itr.second->is_timeout(now)) {
to_cancel.push_back(fragment_instance_itr.second->fragment_instance_id());
}
}
for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) {
@ -1134,8 +1105,8 @@ void FragmentMgr::cancel_worker() {
// designed to count canceled fragment of non-pipeline query.
timeout_canceled_fragment_count->increment(to_cancel.size());
for (auto& id : to_cancel) {
cancel_fragment(id, PPlanFragmentCancelReason::TIMEOUT);
LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout fragment "
cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT);
LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout instance "
<< print_id(id);
}
@ -1156,10 +1127,10 @@ void FragmentMgr::debug(std::stringstream& ss) {
// Keep things simple
std::lock_guard<std::mutex> lock(_lock);
ss << "FragmentMgr have " << _fragment_map.size() << " jobs.\n";
ss << "FragmentMgr have " << _fragment_instance_map.size() << " jobs.\n";
ss << "job_id\t\tstart_time\t\texecute_time(s)\n";
VecDateTimeValue now = VecDateTimeValue::local_time();
for (auto& it : _fragment_map) {
for (auto& it : _fragment_instance_map) {
ss << it.first << "\t" << it.second->start_time().debug_string() << "\t"
<< now.second_diff(it.second->start_time()) << "\n";
}
@ -1308,10 +1279,10 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request,
pip_context->get_runtime_state(fragment_instance_id)->runtime_filter_mgr();
} else {
std::unique_lock<std::mutex> lock(_lock);
auto iter = _fragment_map.find(tfragment_instance_id);
if (iter == _fragment_map.end()) {
VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id;
return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string());
auto iter = _fragment_instance_map.find(tfragment_instance_id);
if (iter == _fragment_instance_map.end()) {
VLOG_CRITICAL << "unknown.... fragment instance id:" << print_id(tfragment_instance_id);
return Status::InvalidArgument("fragment-id: {}", print_id(tfragment_instance_id));
}
fragment_executor = iter->second;
@ -1353,10 +1324,12 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
pool = &pip_context->get_query_context()->obj_pool;
} else {
std::unique_lock<std::mutex> lock(_lock);
auto iter = _fragment_map.find(tfragment_instance_id);
if (iter == _fragment_map.end()) {
VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id;
return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string());
auto iter = _fragment_instance_map.find(tfragment_instance_id);
if (iter == _fragment_instance_map.end()) {
VLOG_CRITICAL << "unknown.... fragment instance id:"
<< print_id(tfragment_instance_id);
return Status::InvalidArgument("fragment instance id: {}",
print_id(tfragment_instance_id));
}
fragment_executor = iter->second;
@ -1410,10 +1383,11 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
pip_context = iter->second;
} else {
std::unique_lock<std::mutex> lock(_lock);
auto iter = _fragment_map.find(tfragment_instance_id);
if (iter == _fragment_map.end()) {
VLOG_CRITICAL << "unknown fragment-id:" << fragment_instance_id;
return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string());
auto iter = _fragment_instance_map.find(tfragment_instance_id);
if (iter == _fragment_instance_map.end()) {
VLOG_CRITICAL << "unknown fragment instance id:" << print_id(tfragment_instance_id);
return Status::InvalidArgument("fragment instance id: {}",
print_id(tfragment_instance_id));
}
// hold reference to fragment_executor, or else runtime_state can be destroyed

View File

@ -92,17 +92,10 @@ public:
Status start_query_execution(const PExecPlanFragmentStartRequest* request);
// This method can only be used to cancel a fragment of non-pipeline query.
void cancel_fragment(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason,
const std::string& msg = "");
void cancel_fragment_unlocked(const TUniqueId& instance_id,
const PPlanFragmentCancelReason& reason,
const std::unique_lock<std::mutex>& state_lock,
const std::string& msg = "");
Status trigger_pipeline_context_report(const ReportStatusRequest,
std::shared_ptr<pipeline::PipelineFragmentContext>&&);
// Pipeline version, cancel a fragment instance.
// Cancel instance (pipeline or nonpipeline).
void cancel_instance(const TUniqueId& instance_id, const PPlanFragmentCancelReason& reason,
const std::string& msg = "");
void cancel_instance_unlocked(const TUniqueId& instance_id,
@ -188,7 +181,7 @@ private:
std::condition_variable _cv;
// Make sure that remove this before no data reference PlanFragmentExecutor
std::unordered_map<TUniqueId, std::shared_ptr<PlanFragmentExecutor>> _fragment_map;
std::unordered_map<TUniqueId, std::shared_ptr<PlanFragmentExecutor>> _fragment_instance_map;
std::unordered_map<TUniqueId, std::shared_ptr<pipeline::PipelineFragmentContext>> _pipeline_map;

View File

@ -46,7 +46,7 @@ void ThreadMemTrackerMgr::detach_limiter_tracker(
_wait_gc = false;
}
void ThreadMemTrackerMgr::cancel_fragment(const std::string& exceed_msg) {
void ThreadMemTrackerMgr::cancel_instance(const std::string& exceed_msg) {
ExecEnv::GetInstance()->fragment_mgr()->cancel_instance(
_fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg);
}

View File

@ -93,7 +93,7 @@ public:
void disable_wait_gc() { _wait_gc = false; }
bool wait_gc() { return _wait_gc; }
void cancel_fragment(const std::string& exceed_msg);
void cancel_instance(const std::string& exceed_msg);
std::string print_debug_string() {
fmt::memory_buffer consumer_tracker_buf;

View File

@ -591,11 +591,10 @@ void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const
.tag("instance_id", print_id(_runtime_state->fragment_instance_id()))
.tag("reason", reason)
.tag("error message", msg);
if (_runtime_state->is_cancelled()) {
LOG(INFO) << "instance << " << print_id(_runtime_state->fragment_instance_id())
<< "is already cancelled, skip cancel again";
return;
}
// NOTE: Not need to check if already cancelled.
// Bug scenario: test_array_map_function.groovy:
// select /*+SET_VAR(experimental_enable_pipeline_engine=false)*/ array_map((x,y)->x+y, c_array1, c_array2) from test.array_test2 where id > 10 order by id
DCHECK(_prepared);
_cancel_reason = reason;
if (reason == PPlanFragmentCancelReason::LIMIT_REACH) {

View File

@ -257,7 +257,7 @@ public:
// MemTracker that is shared by all fragment instances running on this host.
std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
std::vector<TUniqueId> fragment_ids;
std::vector<TUniqueId> fragment_instance_ids;
// plan node id -> TFileScanRangeParams
// only for file scan node

View File

@ -313,6 +313,7 @@ Status RuntimeState::query_status() {
}
bool RuntimeState::is_cancelled() const {
// Maybe we should just return _is_cancelled.load()
return _is_cancelled.load() || (_query_ctx && _query_ctx->is_cancelled());
}

View File

@ -558,15 +558,17 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
tid.__set_lo(request->finst_id().lo());
signal::set_signal_task_id(tid);
Status st = Status::OK();
if (request->has_cancel_reason()) {
LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid)
<< ", reason: " << PPlanFragmentCancelReason_Name(request->cancel_reason());
_exec_env->fragment_mgr()->cancel_instance(tid, request->cancel_reason());
} else {
LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid);
_exec_env->fragment_mgr()->cancel_instance(tid,
PPlanFragmentCancelReason::INTERNAL_ERROR);
}
const bool has_cancel_reason = request->has_cancel_reason();
LOG(INFO) << fmt::format("Cancel instance {}, reason: {}", print_id(tid),
has_cancel_reason
? PPlanFragmentCancelReason_Name(request->cancel_reason())
: "INTERNAL_ERROR");
_exec_env->fragment_mgr()->cancel_instance(
tid, has_cancel_reason ? request->cancel_reason()
: PPlanFragmentCancelReason::INTERNAL_ERROR);
// TODO: the logic seems useless, cancel only return Status::OK. remove it
st.to_protobuf(result->mutable_status());
});

View File

@ -174,7 +174,7 @@ public:
UIntGauge* data_stream_receiver_count;
UIntGauge* fragment_endpoint_count;
UIntGauge* active_scan_context_count;
UIntGauge* plan_fragment_count;
UIntGauge* fragment_instance_count;
UIntGauge* load_channel_count;
UIntGauge* result_buffer_block_count;
UIntGauge* result_block_queue_count;

View File

@ -98,7 +98,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t
"Query:{} canceled asyn, after waiting for memory {}ms, {}.",
print_id(doris::thread_context()->task_id()), wait_milliseconds,
err_msg);
doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg);
doris::thread_context()->thread_mem_tracker_mgr->cancel_instance(err_msg);
} else {
LOG(INFO) << fmt::format(
"Query:{} throw exception, after waiting for memory {}ms, {}.",
@ -131,7 +131,7 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_tracker_check(siz
if (!doris::enable_thread_catch_bad_alloc) {
LOG(INFO) << fmt::format("query/load:{} canceled asyn, {}.",
print_id(doris::thread_context()->task_id()), err_msg);
doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg);
doris::thread_context()->thread_mem_tracker_mgr->cancel_instance(err_msg);
} else {
LOG(INFO) << fmt::format("query/load:{} throw exception, {}.",
print_id(doris::thread_context()->task_id()), err_msg);

View File

@ -248,7 +248,7 @@ curl http://be_host:webserver_port/metrics?type=json
|`doris_be_meta_request_total`| |Num | 访问 RocksDB 中的 meta 的次数累计 | 通过斜率观测 BE 元数据访问频率 | P0 |
||{type="read"} | Num| 读取次数 | |
||{type="write"} | Num| 写入次数 | |
|`doris_be_plan_fragment_count`| | Num | 当前已接收的 fragment instance 的数量 | 观测是否出现 instance 堆积 | P0 |
|`doris_be_fragment_instance_count`| | Num | 当前已接收的 fragment instance 的数量 | 观测是否出现 instance 堆积 | P0 |
|`doris_be_process_fd_num_limit_hard`| |Num| BE 进程的文件句柄数硬限。通过 `/proc/pid/limits` 采集 | |
|`doris_be_process_fd_num_limit_soft`| |Num| BE 进程的文件句柄数软限。通过 `/proc/pid/limits` 采集 | |
|`doris_be_process_fd_num_used`| |Num| BE 进程已使用的文件句柄数。通过 `/proc/pid/limits` 采集 | |

View File

@ -88,12 +88,20 @@ suite("test_array_map_function") {
"""
test {
sql"""select array_map((x,y)->x+y, c_array1, c_array2) from array_test2 where id > 10 order by id;"""
sql"""select /*+SET_VAR(experimental_enable_pipeline_engine=false)*/ array_map((x,y)->x+y, c_array1, c_array2) from ${tableName} where id > 10 order by id"""
check{result, exception, startTime, endTime ->
assertTrue(exception != null)
logger.info(exception.message)
}
}
}
test {
sql"""select /*+SET_VAR(experimental_enable_pipeline_engine=true)*/ array_map((x,y)->x+y, c_array1, c_array2) from ${tableName} where id > 10 order by id"""
check{result, exception, startTime, endTime ->
assertTrue(exception != null)
logger.info(exception.message)
}
}
sql "DROP TABLE IF EXISTS ${tableName}"
}