[chore](log) Add log to trace query cancel #28020

This commit is contained in:
zhiqiang
2023-12-06 15:51:21 +08:00
committed by GitHub
parent 605257ccb7
commit 994c5c6f6e

View File

@ -622,10 +622,9 @@ public class Coordinator implements CoordInterface {
context.setResultInternalServiceAddr(toBrpcHost(execBeAddr));
context.setResultOutputExprs(fragments.get(0).getOutputExprs());
}
if (LOG.isDebugEnabled()) {
LOG.debug("dispatch result sink of query {} to {}", DebugUtil.printId(queryId),
topParams.instanceExecParams.get(0).host);
}
LOG.info("dispatch result sink of query {} to {}", DebugUtil.printId(queryId),
topParams.instanceExecParams.get(0).host);
if (topDataSink instanceof ResultFileSink
&& ((ResultFileSink) topDataSink).getStorageType() == StorageBackend.StorageType.BROKER) {
@ -2540,12 +2539,19 @@ public class Coordinator implements CoordInterface {
// for now, abort the query if we see any error except if the error is cancelled
// and returned_all_results_ is true.
// (UpdateStatus() initiates cancellation, if it hasn't already been initiated)
if (!(returnedAllResults && status.isCancelled()) && !status.ok()) {
LOG.warn("one instance report fail, query_id={} instance_id={}, error message: {}",
DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()),
status.getErrorMsg());
updateStatus(status, params.getFragmentInstanceId());
if (!status.ok()) {
if (status.isCancelled() && returnedAllResults) {
LOG.warn("Query {} has returned all results, its instance {} is reporting failed status {}",
DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()),
status.getErrorMsg());
} else {
LOG.warn("Instance {} of query {} report failed status, error msg: {}",
DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()),
status.getErrorMsg());
updateStatus(status, params.getFragmentInstanceId());
}
}
if (execState.done) {
if (params.isSetDeltaUrls()) {
updateDeltas(params.getDeltaUrls());
@ -2968,12 +2974,10 @@ public class Coordinator implements CoordInterface {
// cancel the fragment instance.
// return true if cancel success. Otherwise, return false
public synchronized boolean cancelFragmentInstance(Types.PPlanFragmentCancelReason cancelReason) {
if (LOG.isDebugEnabled()) {
LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} backend: {},"
+ " fragment instance id={}, reason: {}",
this.initiated, this.done, this.hasCanceled, backend.getId(),
DebugUtil.printId(fragmentInstanceId()), cancelReason.name());
}
LOG.warn("cancelRemoteFragments initiated={} done={} hasCanceled={} backend: {},"
+ " fragment instance id={}, reason: {}",
this.initiated, this.done, this.hasCanceled, backend.getId(),
DebugUtil.printId(fragmentInstanceId()), cancelReason.name());
try {
if (!this.initiated) {
return false;
@ -3160,13 +3164,11 @@ public class Coordinator implements CoordInterface {
return false;
}
for (TPipelineInstanceParams localParam : rpcParams.local_params) {
if (LOG.isDebugEnabled()) {
LOG.debug("cancelRemoteFragments initiated={} done={} hasCanceled={} backend: {},"
+ " fragment instance id={} query={}, reason: {}",
this.initiated, this.done, this.hasCanceled, backend.getId(),
DebugUtil.printId(localParam.fragment_instance_id),
DebugUtil.printId(queryId), cancelReason.name());
}
LOG.warn("cancelRemoteFragments initiated={} done={} hasCanceled={} backend:{},"
+ " fragment instance id={} query={}, reason: {}",
this.initiated, this.done, this.hasCanceled, backend.getId(),
DebugUtil.printId(localParam.fragment_instance_id),
DebugUtil.printId(queryId), cancelReason.name());
RuntimeProfile profile = fragmentInstancesMap.get(localParam.fragment_instance_id);
if (profile.getIsDone() || profile.getIsCancel()) {