From d62b59b620b0629dfd597a43b1be529a2111b996 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 21 Sep 2023 09:57:34 +0800 Subject: [PATCH] [pipelineX](profile) Complete load profile (#24690) --- .../pipeline_x_fragment_context.cpp | 4 +- .../pipeline/pipeline_x/pipeline_x_task.cpp | 4 +- be/src/runtime/fragment_mgr.cpp | 61 ++++++++++++++ .../java/org/apache/doris/qe/Coordinator.java | 81 +++++++++++++++++-- gensrc/thrift/FrontendService.thrift | 3 + 5 files changed, 143 insertions(+), 10 deletions(-) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 8af1419729..696d7759e2 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -296,10 +296,8 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { auto task = std::make_unique(_pipelines[pip_idx], _total_tasks++, _runtime_states[i].get(), this, - _pipelines[pip_idx]->pipeline_profile()); + _runtime_states[i]->runtime_profile()); pipeline_id_to_task.insert({_pipelines[pip_idx]->id(), task.get()}); - _runtime_states[i]->runtime_profile()->add_child( - _pipelines[pip_idx]->pipeline_profile(), true, nullptr); _tasks[i].emplace_back(std::move(task)); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index dcb3aa1b27..cfc29b9978 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -62,7 +62,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams SCOPED_CPU_TIMER(_task_cpu_timer); SCOPED_TIMER(_prepare_timer); - LocalSinkStateInfo sink_info {_pipeline->pipeline_profile(), local_params.sender_id, + LocalSinkStateInfo sink_info {_parent_profile, local_params.sender_id, get_downstream_dependency().get()}; RETURN_IF_ERROR(_sink->setup_local_state(state, sink_info)); @@ -72,7 +72,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) { LocalStateInfo info { op_idx == _operators.size() - 1 - ? _pipeline->pipeline_profile() + ? _parent_profile : state->get_local_state(_operators[op_idx + 1]->id())->profile(), scan_ranges, get_upstream_dependency(_operators[op_idx]->id())}; RETURN_IF_ERROR(_operators[op_idx]->setup_local_state(state, info)); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 5a0d30f0ef..812ba1bb3d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -243,6 +243,13 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { for (auto& it : req.runtime_state->output_files()) { params.delta_urls.push_back(to_http_path(it)); } + } else if (!req.runtime_states.empty()) { + params.__isset.delta_urls = true; + for (auto* rs : req.runtime_states) { + for (auto& it : rs->output_files()) { + params.delta_urls.push_back(to_http_path(it)); + } + } } if (req.runtime_state->num_rows_load_total() > 0 || req.runtime_state->num_rows_load_filtered() > 0) { @@ -260,14 +267,50 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { params.load_counters.emplace( s_unselected_rows, std::to_string(req.runtime_state->num_rows_load_unselected())); + } else if (!req.runtime_states.empty()) { + static std::string s_dpp_normal_all = "dpp.norm.ALL"; + static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL"; + static std::string s_unselected_rows = "unselected.rows"; + int64_t num_rows_load_success = 0; + int64_t num_rows_load_filtered = 0; + int64_t num_rows_load_unselected = 0; + for (auto* rs : req.runtime_states) { + if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0) { + params.__isset.load_counters = true; + num_rows_load_success += rs->num_rows_load_success(); + num_rows_load_filtered += rs->num_rows_load_filtered(); + num_rows_load_unselected += rs->num_rows_load_unselected(); + } + } + params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success)); + params.load_counters.emplace(s_dpp_abnormal_all, + std::to_string(num_rows_load_filtered)); + params.load_counters.emplace(s_unselected_rows, + std::to_string(num_rows_load_unselected)); } if (!req.runtime_state->get_error_log_file_path().empty()) { params.__set_tracking_url( to_load_error_http_path(req.runtime_state->get_error_log_file_path())); + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (!rs->get_error_log_file_path().empty()) { + params.__set_tracking_url( + to_load_error_http_path(rs->get_error_log_file_path())); + } + } } if (!req.runtime_state->export_output_files().empty()) { params.__isset.export_files = true; params.export_files = req.runtime_state->export_output_files(); + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (!rs->export_output_files().empty()) { + params.__isset.export_files = true; + params.export_files.insert(params.export_files.end(), + rs->export_output_files().begin(), + rs->export_output_files().end()); + } + } } if (!req.runtime_state->tablet_commit_infos().empty()) { params.__isset.commitInfos = true; @@ -275,6 +318,15 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { for (auto& info : req.runtime_state->tablet_commit_infos()) { params.commitInfos.push_back(info); } + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (!rs->tablet_commit_infos().empty()) { + params.__isset.commitInfos = true; + params.commitInfos.insert(params.commitInfos.end(), + rs->tablet_commit_infos().begin(), + rs->tablet_commit_infos().end()); + } + } } if (!req.runtime_state->error_tablet_infos().empty()) { params.__isset.errorTabletInfos = true; @@ -282,6 +334,15 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { for (auto& info : req.runtime_state->error_tablet_infos()) { params.errorTabletInfos.push_back(info); } + } else if (!req.runtime_states.empty()) { + for (auto* rs : req.runtime_states) { + if (!rs->error_tablet_infos().empty()) { + params.__isset.errorTabletInfos = true; + params.errorTabletInfos.insert(params.errorTabletInfos.end(), + rs->error_tablet_infos().begin(), + rs->error_tablet_infos().end()); + } + } } // Send new errors to coordinator diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index b9ec89ef61..41a4efc614 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -139,6 +139,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Optional; import java.util.Random; import java.util.Set; @@ -1181,6 +1182,38 @@ public class Coordinator implements CoordInterface { } } + private void updateStatus(Status status, long backendId) { + lock.lock(); + try { + // The query is done and we are just waiting for remote fragments to clean up. + // Ignore their cancelled updates. + if (returnedAllResults && status.isCancelled()) { + return; + } + // nothing to update + if (status.ok()) { + return; + } + + // don't override an error status; also, cancellation has already started + if (!queryStatus.ok()) { + return; + } + + queryStatus.setStatus(status); + LOG.warn("one instance report fail throw updateStatus(), need cancel. job id: {}," + + " query id: {}, error message: {}", + jobId, DebugUtil.printId(queryId), status.getErrorMsg()); + if (status.getErrorCode() == TStatusCode.TIMEOUT) { + cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT, backendId); + } else { + cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, backendId); + } + } finally { + lock.unlock(); + } + } + @Override public RowBatch getNext() throws Exception { if (receiver == null) { @@ -1344,6 +1377,18 @@ public class Coordinator implements CoordInterface { executionProfile.onCancel(); } + private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason, long backendId) { + if (null != receiver) { + receiver.cancel(); + } + if (null != pointExec) { + pointExec.cancel(); + return; + } + cancelRemoteFragmentsAsync(cancelReason, backendId); + executionProfile.onCancel(); + } + private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cancelReason) { if (enablePipelineEngine) { for (PipelineExecContext ctx : pipelineExecContexts.values()) { @@ -1356,6 +1401,15 @@ public class Coordinator implements CoordInterface { } } + private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cancelReason, long backendId) { + Preconditions.checkArgument(enablePipelineXEngine); + for (PipelineExecContext ctx : pipelineExecContexts.values()) { + if (!Objects.equals(idToBackend.get(backendId), ctx.backend)) { + ctx.cancelFragmentInstance(cancelReason); + } + } + } + private void computeFragmentExecParams() throws Exception { // fill hosts field in fragmentExecParams computeFragmentHosts(); @@ -2326,12 +2380,29 @@ public class Coordinator implements CoordInterface { LOG.warn("one instance report fail, query_id={} instance_id={}, error message: {}", DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()), status.getErrorMsg()); - updateStatus(status, params.getFragmentInstanceId()); + updateStatus(status, params.backend_id); + } + if (params.isSetDeltaUrls()) { + updateDeltas(params.getDeltaUrls()); + } + if (params.isSetLoadCounters()) { + updateLoadCounters(params.getLoadCounters()); + } + if (params.isSetTrackingUrl()) { + trackingUrl = params.getTrackingUrl(); + } + if (params.isSetExportFiles()) { + updateExportFiles(params.getExportFiles()); + } + if (params.isSetCommitInfos()) { + updateCommitInfos(params.getCommitInfos()); + } + if (params.isSetErrorTabletInfos()) { + updateErrorTabletInfos(params.getErrorTabletInfos()); } Preconditions.checkArgument(params.isSetDetailedReport()); for (TDetailedReportParams param : params.detailed_report) { if (ctx.fragmentInstancesMap.get(param.fragment_instance_id).getIsDone()) { - // TODO executionProfile.markOneInstanceDone(param.getFragmentInstanceId()); } } @@ -2980,9 +3051,9 @@ public class Coordinator implements CoordInterface { profile.setIsDone(true); profileReportProgress++; } - if (profileReportProgress == numInstances) { - this.done = true; - } + } + if (profileReportProgress == numInstances) { + this.done = true; } return true; } else { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 1507076c27..29739450a1 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -409,6 +409,7 @@ struct TReportExecStatusParams { 3: optional i32 backend_num // required in V1 + // Move to TDetailedReportParams for pipelineX 4: optional Types.TUniqueId fragment_instance_id // Status of fragment execution; any error status means it's done. @@ -421,6 +422,7 @@ struct TReportExecStatusParams { // cumulative profile // required in V1 + // Move to TDetailedReportParams for pipelineX 7: optional RuntimeProfile.TRuntimeProfileTree profile // New errors that have not been reported to the coordinator @@ -450,6 +452,7 @@ struct TReportExecStatusParams { 20: optional PaloInternalService.TQueryType query_type + // Move to TDetailedReportParams for pipelineX 21: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile 22: optional i32 finished_scan_ranges