[pipelineX](profile) Complete load profile (#24690)

This commit is contained in:
Gabriel
2023-09-21 09:57:34 +08:00
committed by GitHub
parent 0d20a61587
commit d62b59b620
5 changed files with 143 additions and 10 deletions

View File

@ -296,10 +296,8 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto task = std::make_unique<PipelineXTask>(_pipelines[pip_idx], _total_tasks++,
_runtime_states[i].get(), this,
_pipelines[pip_idx]->pipeline_profile());
_runtime_states[i]->runtime_profile());
pipeline_id_to_task.insert({_pipelines[pip_idx]->id(), task.get()});
_runtime_states[i]->runtime_profile()->add_child(
_pipelines[pip_idx]->pipeline_profile(), true, nullptr);
_tasks[i].emplace_back(std::move(task));
}

View File

@ -62,7 +62,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_prepare_timer);
LocalSinkStateInfo sink_info {_pipeline->pipeline_profile(), local_params.sender_id,
LocalSinkStateInfo sink_info {_parent_profile, local_params.sender_id,
get_downstream_dependency().get()};
RETURN_IF_ERROR(_sink->setup_local_state(state, sink_info));
@ -72,7 +72,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
LocalStateInfo info {
op_idx == _operators.size() - 1
? _pipeline->pipeline_profile()
? _parent_profile
: state->get_local_state(_operators[op_idx + 1]->id())->profile(),
scan_ranges, get_upstream_dependency(_operators[op_idx]->id())};
RETURN_IF_ERROR(_operators[op_idx]->setup_local_state(state, info));

View File

@ -243,6 +243,13 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
for (auto& it : req.runtime_state->output_files()) {
params.delta_urls.push_back(to_http_path(it));
}
} else if (!req.runtime_states.empty()) {
params.__isset.delta_urls = true;
for (auto* rs : req.runtime_states) {
for (auto& it : rs->output_files()) {
params.delta_urls.push_back(to_http_path(it));
}
}
}
if (req.runtime_state->num_rows_load_total() > 0 ||
req.runtime_state->num_rows_load_filtered() > 0) {
@ -260,14 +267,50 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
params.load_counters.emplace(
s_unselected_rows,
std::to_string(req.runtime_state->num_rows_load_unselected()));
} else if (!req.runtime_states.empty()) {
static std::string s_dpp_normal_all = "dpp.norm.ALL";
static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
static std::string s_unselected_rows = "unselected.rows";
int64_t num_rows_load_success = 0;
int64_t num_rows_load_filtered = 0;
int64_t num_rows_load_unselected = 0;
for (auto* rs : req.runtime_states) {
if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0) {
params.__isset.load_counters = true;
num_rows_load_success += rs->num_rows_load_success();
num_rows_load_filtered += rs->num_rows_load_filtered();
num_rows_load_unselected += rs->num_rows_load_unselected();
}
}
params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
params.load_counters.emplace(s_dpp_abnormal_all,
std::to_string(num_rows_load_filtered));
params.load_counters.emplace(s_unselected_rows,
std::to_string(num_rows_load_unselected));
}
if (!req.runtime_state->get_error_log_file_path().empty()) {
params.__set_tracking_url(
to_load_error_http_path(req.runtime_state->get_error_log_file_path()));
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->get_error_log_file_path().empty()) {
params.__set_tracking_url(
to_load_error_http_path(rs->get_error_log_file_path()));
}
}
}
if (!req.runtime_state->export_output_files().empty()) {
params.__isset.export_files = true;
params.export_files = req.runtime_state->export_output_files();
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->export_output_files().empty()) {
params.__isset.export_files = true;
params.export_files.insert(params.export_files.end(),
rs->export_output_files().begin(),
rs->export_output_files().end());
}
}
}
if (!req.runtime_state->tablet_commit_infos().empty()) {
params.__isset.commitInfos = true;
@ -275,6 +318,15 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
for (auto& info : req.runtime_state->tablet_commit_infos()) {
params.commitInfos.push_back(info);
}
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->tablet_commit_infos().empty()) {
params.__isset.commitInfos = true;
params.commitInfos.insert(params.commitInfos.end(),
rs->tablet_commit_infos().begin(),
rs->tablet_commit_infos().end());
}
}
}
if (!req.runtime_state->error_tablet_infos().empty()) {
params.__isset.errorTabletInfos = true;
@ -282,6 +334,15 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
for (auto& info : req.runtime_state->error_tablet_infos()) {
params.errorTabletInfos.push_back(info);
}
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->error_tablet_infos().empty()) {
params.__isset.errorTabletInfos = true;
params.errorTabletInfos.insert(params.errorTabletInfos.end(),
rs->error_tablet_infos().begin(),
rs->error_tablet_infos().end());
}
}
}
// Send new errors to coordinator

View File

@ -139,6 +139,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
@ -1181,6 +1182,38 @@ public class Coordinator implements CoordInterface {
}
}
private void updateStatus(Status status, long backendId) {
lock.lock();
try {
// The query is done and we are just waiting for remote fragments to clean up.
// Ignore their cancelled updates.
if (returnedAllResults && status.isCancelled()) {
return;
}
// nothing to update
if (status.ok()) {
return;
}
// don't override an error status; also, cancellation has already started
if (!queryStatus.ok()) {
return;
}
queryStatus.setStatus(status);
LOG.warn("one instance report fail throw updateStatus(), need cancel. job id: {},"
+ " query id: {}, error message: {}",
jobId, DebugUtil.printId(queryId), status.getErrorMsg());
if (status.getErrorCode() == TStatusCode.TIMEOUT) {
cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT, backendId);
} else {
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, backendId);
}
} finally {
lock.unlock();
}
}
@Override
public RowBatch getNext() throws Exception {
if (receiver == null) {
@ -1344,6 +1377,18 @@ public class Coordinator implements CoordInterface {
executionProfile.onCancel();
}
private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason, long backendId) {
if (null != receiver) {
receiver.cancel();
}
if (null != pointExec) {
pointExec.cancel();
return;
}
cancelRemoteFragmentsAsync(cancelReason, backendId);
executionProfile.onCancel();
}
private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cancelReason) {
if (enablePipelineEngine) {
for (PipelineExecContext ctx : pipelineExecContexts.values()) {
@ -1356,6 +1401,15 @@ public class Coordinator implements CoordInterface {
}
}
private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason cancelReason, long backendId) {
Preconditions.checkArgument(enablePipelineXEngine);
for (PipelineExecContext ctx : pipelineExecContexts.values()) {
if (!Objects.equals(idToBackend.get(backendId), ctx.backend)) {
ctx.cancelFragmentInstance(cancelReason);
}
}
}
private void computeFragmentExecParams() throws Exception {
// fill hosts field in fragmentExecParams
computeFragmentHosts();
@ -2326,12 +2380,29 @@ public class Coordinator implements CoordInterface {
LOG.warn("one instance report fail, query_id={} instance_id={}, error message: {}",
DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()),
status.getErrorMsg());
updateStatus(status, params.getFragmentInstanceId());
updateStatus(status, params.backend_id);
}
if (params.isSetDeltaUrls()) {
updateDeltas(params.getDeltaUrls());
}
if (params.isSetLoadCounters()) {
updateLoadCounters(params.getLoadCounters());
}
if (params.isSetTrackingUrl()) {
trackingUrl = params.getTrackingUrl();
}
if (params.isSetExportFiles()) {
updateExportFiles(params.getExportFiles());
}
if (params.isSetCommitInfos()) {
updateCommitInfos(params.getCommitInfos());
}
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
Preconditions.checkArgument(params.isSetDetailedReport());
for (TDetailedReportParams param : params.detailed_report) {
if (ctx.fragmentInstancesMap.get(param.fragment_instance_id).getIsDone()) {
// TODO
executionProfile.markOneInstanceDone(param.getFragmentInstanceId());
}
}
@ -2980,9 +3051,9 @@ public class Coordinator implements CoordInterface {
profile.setIsDone(true);
profileReportProgress++;
}
if (profileReportProgress == numInstances) {
this.done = true;
}
}
if (profileReportProgress == numInstances) {
this.done = true;
}
return true;
} else {

View File

@ -409,6 +409,7 @@ struct TReportExecStatusParams {
3: optional i32 backend_num
// required in V1
// Move to TDetailedReportParams for pipelineX
4: optional Types.TUniqueId fragment_instance_id
// Status of fragment execution; any error status means it's done.
@ -421,6 +422,7 @@ struct TReportExecStatusParams {
// cumulative profile
// required in V1
// Move to TDetailedReportParams for pipelineX
7: optional RuntimeProfile.TRuntimeProfileTree profile
// New errors that have not been reported to the coordinator
@ -450,6 +452,7 @@ struct TReportExecStatusParams {
20: optional PaloInternalService.TQueryType query_type
// Move to TDetailedReportParams for pipelineX
21: optional RuntimeProfile.TRuntimeProfileTree loadChannelProfile
22: optional i32 finished_scan_ranges