diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index e3fff6df98..d7c6aebdc1 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -158,7 +158,11 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, .tag("instance_id", print_id(_runtime_state->fragment_instance_id())) .tag("reason", PPlanFragmentCancelReason_Name(reason)) .tag("message", msg); - + // TODO(zhiqiang): may be not need to check if query is already cancelled. + // Dont cancel in this situation may lead to bug. For example, result sink node + // can not be cancelled if other fragments set the query_ctx cancelled, this will + // make result receiver on fe be stocked on rpc forever until timeout... + // We need a more detail discussion. if (_query_ctx->cancel(true, msg, Status::Cancelled(msg))) { if (reason != PPlanFragmentCancelReason::LIMIT_REACH) { LOG(WARNING) << "PipelineFragmentContext " diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index d84af6f0d4..f514e3a8ce 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -96,7 +96,7 @@ namespace doris { -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(plan_fragment_count, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::NOUNIT); bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare"); @@ -119,7 +119,8 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env) : _exec_env(exec_env), _stop_background_threads_latch(1) { _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr"); INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count); - REGISTER_HOOK_METRIC(plan_fragment_count, [this]() { return _fragment_map.size(); }); + REGISTER_HOOK_METRIC(fragment_instance_count, + [this]() { return _fragment_instance_map.size(); }); auto s = Thread::create( "FragmentMgr", "cancel_timeout_plan_fragment", [this]() { this->cancel_worker(); }, @@ -149,7 +150,7 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env) FragmentMgr::~FragmentMgr() = default; void FragmentMgr::stop() { - DEREGISTER_HOOK_METRIC(plan_fragment_count); + DEREGISTER_HOOK_METRIC(fragment_instance_count); DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size); _stop_background_threads_latch.count_down(); if (_cancel_thread) { @@ -162,7 +163,7 @@ void FragmentMgr::stop() { // Only me can delete { std::lock_guard lock(_lock); - _fragment_map.clear(); + _fragment_instance_map.clear(); _query_ctx_map.clear(); for (auto& pipeline : _pipeline_map) { pipeline.second->close_sink(); @@ -450,7 +451,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr fragment_ex // remove exec state after this fragment finished { std::lock_guard lock(_lock); - _fragment_map.erase(fragment_executor->fragment_instance_id()); + _fragment_instance_map.erase(fragment_executor->fragment_instance_id()); if (all_done && query_ctx) { _query_ctx_map.erase(query_ctx->query_id()); } @@ -709,8 +710,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, const TUniqueId& fragment_instance_id = params.params.fragment_instance_id; { std::lock_guard lock(_lock); - auto iter = _fragment_map.find(fragment_instance_id); - if (iter != _fragment_map.end()) { + auto iter = _fragment_instance_map.find(fragment_instance_id); + if (iter != _fragment_instance_map.end()) { // Duplicated LOG(WARNING) << "duplicate fragment instance id: " << print_id(fragment_instance_id); return Status::OK(); @@ -727,7 +728,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, // memory, but query_is_canncelled will traverse the vector, it will core. // query_is_cancelled is called in allocator, we has to avoid dead lock. std::lock_guard lock(_lock); - query_ctx->fragment_ids.push_back(fragment_instance_id); + query_ctx->fragment_instance_ids.push_back(fragment_instance_id); } auto fragment_executor = std::make_shared( @@ -754,7 +755,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, fragment_executor->set_merge_controller_handler(handler); { std::lock_guard lock(_lock); - _fragment_map.insert(std::make_pair(params.params.fragment_instance_id, fragment_executor)); + _fragment_instance_map.insert( + std::make_pair(params.params.fragment_instance_id, fragment_executor)); _cv.notify_all(); } auto st = _thread_pool->submit_func( @@ -767,7 +769,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, { // Remove the exec state added std::lock_guard lock(_lock); - _fragment_map.erase(params.params.fragment_instance_id); + _fragment_instance_map.erase(params.params.fragment_instance_id); } fragment_executor->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "push plan fragment to thread pool failed"); @@ -833,7 +835,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, // Duplicated return Status::OK(); } - query_ctx->fragment_ids.push_back(fragment_instance_id); + query_ctx->fragment_instance_ids.push_back(fragment_instance_id); } START_AND_SCOPE_SPAN(tracer, span, "exec_instance"); span->SetAttribute("instance_id", print_id(fragment_instance_id)); @@ -870,7 +872,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, // Duplicated return Status::OK(); } - query_ctx->fragment_ids.push_back(fragment_instance_id); + query_ctx->fragment_instance_ids.push_back(fragment_instance_id); } START_AND_SCOPE_SPAN(tracer, span, "exec_instance"); span->SetAttribute("instance_id", print_id(fragment_instance_id)); @@ -979,37 +981,13 @@ void FragmentMgr::cancel_query_unlocked(const TUniqueId& query_id, return; } - if (ctx->second->enable_pipeline_exec()) { - for (auto it : ctx->second->fragment_ids) { - // instance_id will not be removed from query_context.instance_ids currently - // and it will be removed from fragment_mgr::_pipeline_map only. - // so we add this check to avoid too many WARNING log. - if (_pipeline_map.contains(it)) { - cancel_instance_unlocked(it, reason, state_lock, msg); - } - } - } else { - for (auto it : ctx->second->fragment_ids) { - cancel_fragment_unlocked(it, reason, state_lock, msg); - } + for (auto it : ctx->second->fragment_instance_ids) { + cancel_instance_unlocked(it, reason, state_lock, msg); } ctx->second->cancel(true, msg, Status::Cancelled(msg)); _query_ctx_map.erase(query_id); - LOG(INFO) << "Query " << print_id(query_id) << " is cancelled. Reason: " << msg; -} - -void FragmentMgr::cancel_fragment(const TUniqueId& fragment_id, - const PPlanFragmentCancelReason& reason, const std::string& msg) { - std::unique_lock state_lock(_lock); - return cancel_fragment_unlocked(fragment_id, reason, state_lock, msg); -} - -void FragmentMgr::cancel_fragment_unlocked(const TUniqueId& fragment_id, - const PPlanFragmentCancelReason& reason, - const std::unique_lock& state_lock, - const std::string& msg) { - return cancel_unlocked_impl(fragment_id, reason, state_lock, false /*not pipeline query*/, msg); + LOG(INFO) << "Query " << print_id(query_id) << " is cancelled and removed. Reason: " << msg; } void FragmentMgr::cancel_instance(const TUniqueId& instance_id, @@ -1022,32 +1000,25 @@ void FragmentMgr::cancel_instance_unlocked(const TUniqueId& instance_id, const PPlanFragmentCancelReason& reason, const std::unique_lock& state_lock, const std::string& msg) { - return cancel_unlocked_impl(instance_id, reason, state_lock, true /*pipeline query*/, msg); -} + const bool is_pipeline_instance = _pipeline_map.contains(instance_id); -void FragmentMgr::cancel_unlocked_impl(const TUniqueId& id, const PPlanFragmentCancelReason& reason, - const std::unique_lock& /*state_lock*/, - bool is_pipeline, const std::string& msg) { - if (is_pipeline) { - const TUniqueId& instance_id = id; + if (is_pipeline_instance) { auto itr = _pipeline_map.find(instance_id); if (itr != _pipeline_map.end()) { // calling PipelineFragmentContext::cancel itr->second->cancel(reason, msg); } else { - LOG(WARNING) << "Could not find the instance id:" << print_id(instance_id) + LOG(WARNING) << "Could not find the pipeline instance id:" << print_id(instance_id) << " to cancel"; } } else { - const TUniqueId& fragment_id = id; - auto itr = _fragment_map.find(fragment_id); - - if (itr != _fragment_map.end()) { + auto itr = _fragment_instance_map.find(instance_id); + if (itr != _fragment_instance_map.end()) { // calling PlanFragmentExecutor::cancel itr->second->cancel(reason, msg); } else { - LOG(WARNING) << "Could not find the fragment id:" << print_id(fragment_id) + LOG(WARNING) << "Could not find the fragment instance id:" << print_id(instance_id) << " to cancel"; } } @@ -1059,17 +1030,17 @@ bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) { if (ctx != _query_ctx_map.end()) { const bool is_pipeline_version = ctx->second->enable_pipeline_exec(); - for (auto itr : ctx->second->fragment_ids) { + for (auto itr : ctx->second->fragment_instance_ids) { if (is_pipeline_version) { auto pipeline_ctx_iter = _pipeline_map.find(itr); if (pipeline_ctx_iter != _pipeline_map.end() && pipeline_ctx_iter->second) { return pipeline_ctx_iter->second->is_canceled(); } } else { - auto fragment_executor_iter = _fragment_map.find(itr); - if (fragment_executor_iter != _fragment_map.end() && - fragment_executor_iter->second) { - return fragment_executor_iter->second->is_canceled(); + auto fragment_instance_itr = _fragment_instance_map.find(itr); + if (fragment_instance_itr != _fragment_instance_map.end() && + fragment_instance_itr->second) { + return fragment_instance_itr->second->is_canceled(); } } } @@ -1086,9 +1057,9 @@ void FragmentMgr::cancel_worker() { VecDateTimeValue now = VecDateTimeValue::local_time(); { std::lock_guard lock(_lock); - for (auto& it : _fragment_map) { - if (it.second->is_timeout(now)) { - to_cancel.push_back(it.second->fragment_instance_id()); + for (auto& fragment_instance_itr : _fragment_instance_map) { + if (fragment_instance_itr.second->is_timeout(now)) { + to_cancel.push_back(fragment_instance_itr.second->fragment_instance_id()); } } for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) { @@ -1134,8 +1105,8 @@ void FragmentMgr::cancel_worker() { // designed to count canceled fragment of non-pipeline query. timeout_canceled_fragment_count->increment(to_cancel.size()); for (auto& id : to_cancel) { - cancel_fragment(id, PPlanFragmentCancelReason::TIMEOUT); - LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout fragment " + cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT); + LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout instance " << print_id(id); } @@ -1156,10 +1127,10 @@ void FragmentMgr::debug(std::stringstream& ss) { // Keep things simple std::lock_guard lock(_lock); - ss << "FragmentMgr have " << _fragment_map.size() << " jobs.\n"; + ss << "FragmentMgr have " << _fragment_instance_map.size() << " jobs.\n"; ss << "job_id\t\tstart_time\t\texecute_time(s)\n"; VecDateTimeValue now = VecDateTimeValue::local_time(); - for (auto& it : _fragment_map) { + for (auto& it : _fragment_instance_map) { ss << it.first << "\t" << it.second->start_time().debug_string() << "\t" << now.second_diff(it.second->start_time()) << "\n"; } @@ -1308,10 +1279,10 @@ Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, pip_context->get_runtime_state(fragment_instance_id)->runtime_filter_mgr(); } else { std::unique_lock lock(_lock); - auto iter = _fragment_map.find(tfragment_instance_id); - if (iter == _fragment_map.end()) { - VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id; - return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string()); + auto iter = _fragment_instance_map.find(tfragment_instance_id); + if (iter == _fragment_instance_map.end()) { + VLOG_CRITICAL << "unknown.... fragment instance id:" << print_id(tfragment_instance_id); + return Status::InvalidArgument("fragment-id: {}", print_id(tfragment_instance_id)); } fragment_executor = iter->second; @@ -1353,10 +1324,12 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request, pool = &pip_context->get_query_context()->obj_pool; } else { std::unique_lock lock(_lock); - auto iter = _fragment_map.find(tfragment_instance_id); - if (iter == _fragment_map.end()) { - VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id; - return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string()); + auto iter = _fragment_instance_map.find(tfragment_instance_id); + if (iter == _fragment_instance_map.end()) { + VLOG_CRITICAL << "unknown.... fragment instance id:" + << print_id(tfragment_instance_id); + return Status::InvalidArgument("fragment instance id: {}", + print_id(tfragment_instance_id)); } fragment_executor = iter->second; @@ -1410,10 +1383,11 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, pip_context = iter->second; } else { std::unique_lock lock(_lock); - auto iter = _fragment_map.find(tfragment_instance_id); - if (iter == _fragment_map.end()) { - VLOG_CRITICAL << "unknown fragment-id:" << fragment_instance_id; - return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string()); + auto iter = _fragment_instance_map.find(tfragment_instance_id); + if (iter == _fragment_instance_map.end()) { + VLOG_CRITICAL << "unknown fragment instance id:" << print_id(tfragment_instance_id); + return Status::InvalidArgument("fragment instance id: {}", + print_id(tfragment_instance_id)); } // hold reference to fragment_executor, or else runtime_state can be destroyed diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 395b9b546c..93f010a5ec 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -92,17 +92,10 @@ public: Status start_query_execution(const PExecPlanFragmentStartRequest* request); - // This method can only be used to cancel a fragment of non-pipeline query. - void cancel_fragment(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason, - const std::string& msg = ""); - void cancel_fragment_unlocked(const TUniqueId& instance_id, - const PPlanFragmentCancelReason& reason, - const std::unique_lock& state_lock, - const std::string& msg = ""); Status trigger_pipeline_context_report(const ReportStatusRequest, std::shared_ptr&&); - // Pipeline version, cancel a fragment instance. + // Cancel instance (pipeline or nonpipeline). void cancel_instance(const TUniqueId& instance_id, const PPlanFragmentCancelReason& reason, const std::string& msg = ""); void cancel_instance_unlocked(const TUniqueId& instance_id, @@ -188,7 +181,7 @@ private: std::condition_variable _cv; // Make sure that remove this before no data reference PlanFragmentExecutor - std::unordered_map> _fragment_map; + std::unordered_map> _fragment_instance_map; std::unordered_map> _pipeline_map; diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 92974c73a2..0a1686704c 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -46,7 +46,7 @@ void ThreadMemTrackerMgr::detach_limiter_tracker( _wait_gc = false; } -void ThreadMemTrackerMgr::cancel_fragment(const std::string& exceed_msg) { +void ThreadMemTrackerMgr::cancel_instance(const std::string& exceed_msg) { ExecEnv::GetInstance()->fragment_mgr()->cancel_instance( _fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, exceed_msg); } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 4e9c83b43f..487f82ac54 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -93,7 +93,7 @@ public: void disable_wait_gc() { _wait_gc = false; } bool wait_gc() { return _wait_gc; } - void cancel_fragment(const std::string& exceed_msg); + void cancel_instance(const std::string& exceed_msg); std::string print_debug_string() { fmt::memory_buffer consumer_tracker_buf; diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 95e24e1ce3..fd1ccf09c8 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -591,11 +591,10 @@ void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const .tag("instance_id", print_id(_runtime_state->fragment_instance_id())) .tag("reason", reason) .tag("error message", msg); - if (_runtime_state->is_cancelled()) { - LOG(INFO) << "instance << " << print_id(_runtime_state->fragment_instance_id()) - << "is already cancelled, skip cancel again"; - return; - } + // NOTE: Not need to check if already cancelled. + // Bug scenario: test_array_map_function.groovy: + // select /*+SET_VAR(experimental_enable_pipeline_engine=false)*/ array_map((x,y)->x+y, c_array1, c_array2) from test.array_test2 where id > 10 order by id + DCHECK(_prepared); _cancel_reason = reason; if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index e3cb330368..4ec3517740 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -257,7 +257,7 @@ public: // MemTracker that is shared by all fragment instances running on this host. std::shared_ptr query_mem_tracker; - std::vector fragment_ids; + std::vector fragment_instance_ids; // plan node id -> TFileScanRangeParams // only for file scan node diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 68597019c8..bad1487f32 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -313,6 +313,7 @@ Status RuntimeState::query_status() { } bool RuntimeState::is_cancelled() const { + // Maybe we should just return _is_cancelled.load() return _is_cancelled.load() || (_query_ctx && _query_ctx->is_cancelled()); } diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 02ab1b4450..37a0b076d9 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -558,15 +558,17 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* tid.__set_lo(request->finst_id().lo()); signal::set_signal_task_id(tid); Status st = Status::OK(); - if (request->has_cancel_reason()) { - LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid) - << ", reason: " << PPlanFragmentCancelReason_Name(request->cancel_reason()); - _exec_env->fragment_mgr()->cancel_instance(tid, request->cancel_reason()); - } else { - LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid); - _exec_env->fragment_mgr()->cancel_instance(tid, - PPlanFragmentCancelReason::INTERNAL_ERROR); - } + + const bool has_cancel_reason = request->has_cancel_reason(); + LOG(INFO) << fmt::format("Cancel instance {}, reason: {}", print_id(tid), + has_cancel_reason + ? PPlanFragmentCancelReason_Name(request->cancel_reason()) + : "INTERNAL_ERROR"); + + _exec_env->fragment_mgr()->cancel_instance( + tid, has_cancel_reason ? request->cancel_reason() + : PPlanFragmentCancelReason::INTERNAL_ERROR); + // TODO: the logic seems useless, cancel only return Status::OK. remove it st.to_protobuf(result->mutable_status()); }); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index 39036d4589..60ca3b5107 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -174,7 +174,7 @@ public: UIntGauge* data_stream_receiver_count; UIntGauge* fragment_endpoint_count; UIntGauge* active_scan_context_count; - UIntGauge* plan_fragment_count; + UIntGauge* fragment_instance_count; UIntGauge* load_channel_count; UIntGauge* result_buffer_block_count; UIntGauge* result_block_queue_count; diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 1c8085aac4..b4025807be 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -98,7 +98,7 @@ void Allocator::sys_memory_check(size_t "Query:{} canceled asyn, after waiting for memory {}ms, {}.", print_id(doris::thread_context()->task_id()), wait_milliseconds, err_msg); - doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg); + doris::thread_context()->thread_mem_tracker_mgr->cancel_instance(err_msg); } else { LOG(INFO) << fmt::format( "Query:{} throw exception, after waiting for memory {}ms, {}.", @@ -131,7 +131,7 @@ void Allocator::memory_tracker_check(siz if (!doris::enable_thread_catch_bad_alloc) { LOG(INFO) << fmt::format("query/load:{} canceled asyn, {}.", print_id(doris::thread_context()->task_id()), err_msg); - doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg); + doris::thread_context()->thread_mem_tracker_mgr->cancel_instance(err_msg); } else { LOG(INFO) << fmt::format("query/load:{} throw exception, {}.", print_id(doris::thread_context()->task_id()), err_msg); diff --git a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md index 4a48dcbbb2..1f6e289877 100644 --- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md +++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md @@ -248,7 +248,7 @@ curl http://be_host:webserver_port/metrics?type=json |`doris_be_meta_request_total`| |Num | 访问 RocksDB 中的 meta 的次数累计 | 通过斜率观测 BE 元数据访问频率 | P0 | ||{type="read"} | Num| 读取次数 | | ||{type="write"} | Num| 写入次数 | | -|`doris_be_plan_fragment_count`| | Num | 当前已接收的 fragment instance 的数量 | 观测是否出现 instance 堆积 | P0 | +|`doris_be_fragment_instance_count`| | Num | 当前已接收的 fragment instance 的数量 | 观测是否出现 instance 堆积 | P0 | |`doris_be_process_fd_num_limit_hard`| |Num| BE 进程的文件句柄数硬限。通过 `/proc/pid/limits` 采集 | | |`doris_be_process_fd_num_limit_soft`| |Num| BE 进程的文件句柄数软限。通过 `/proc/pid/limits` 采集 | | |`doris_be_process_fd_num_used`| |Num| BE 进程已使用的文件句柄数。通过 `/proc/pid/limits` 采集 | | diff --git a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy index b699263c35..311b188e43 100644 --- a/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy +++ b/regression-test/suites/query_p0/sql_functions/array_functions/test_array_map_function.groovy @@ -88,12 +88,20 @@ suite("test_array_map_function") { """ test { - sql"""select array_map((x,y)->x+y, c_array1, c_array2) from array_test2 where id > 10 order by id;""" + sql"""select /*+SET_VAR(experimental_enable_pipeline_engine=false)*/ array_map((x,y)->x+y, c_array1, c_array2) from ${tableName} where id > 10 order by id""" check{result, exception, startTime, endTime -> assertTrue(exception != null) logger.info(exception.message) } - } + } + + test { + sql"""select /*+SET_VAR(experimental_enable_pipeline_engine=true)*/ array_map((x,y)->x+y, c_array1, c_array2) from ${tableName} where id > 10 order by id""" + check{result, exception, startTime, endTime -> + assertTrue(exception != null) + logger.info(exception.message) + } + } sql "DROP TABLE IF EXISTS ${tableName}" }