Fix query hang when using queue (#20434)

This commit is contained in:
wangbo
2023-06-05 18:12:26 +08:00
committed by GitHub
parent 7d11db0807
commit c7dd7c2eba

View File

@ -578,34 +578,40 @@ public class StmtExecutor {
}
int retryTime = Config.max_query_retry_time;
for (int i = 0; i < retryTime; i++) {
try {
//reset query id for each retry
if (i > 0) {
UUID uuid = UUID.randomUUID();
TUniqueId newQueryId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
AuditLog.getQueryAudit().log("Query {} {} times with new query id: {}",
DebugUtil.printId(queryId), i, DebugUtil.printId(newQueryId));
context.setQueryId(newQueryId);
try {
for (int i = 0; i < retryTime; i++) {
try {
//reset query id for each retry
if (i > 0) {
UUID uuid = UUID.randomUUID();
TUniqueId newQueryId = new TUniqueId(uuid.getMostSignificantBits(),
uuid.getLeastSignificantBits());
AuditLog.getQueryAudit().log("Query {} {} times with new query id: {}",
DebugUtil.printId(queryId), i, DebugUtil.printId(newQueryId));
context.setQueryId(newQueryId);
}
handleQueryStmt();
break;
} catch (RpcException e) {
if (i == retryTime - 1) {
throw e;
}
if (!context.getMysqlChannel().isSend()) {
LOG.warn("retry {} times. stmt: {}", (i + 1), parsedStmt.getOrigStmt().originStmt);
} else {
throw e;
}
} finally {
// The final profile report occurs after be returns the query data, and the profile cannot be
// received after unregisterQuery(), causing the instance profile to be lost, so we should wait
// for the profile before unregisterQuery().
updateProfile(true);
QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
}
handleQueryStmt();
break;
} catch (RpcException e) {
if (i == retryTime - 1) {
throw e;
}
if (!context.getMysqlChannel().isSend()) {
LOG.warn("retry {} times. stmt: {}", (i + 1), parsedStmt.getOrigStmt().originStmt);
} else {
throw e;
}
} finally {
// The final profile report occurs after be returns the query data, and the profile cannot be
// received after unregisterQuery(), causing the instance profile to be lost, so we should wait
// for the profile before unregisterQuery().
updateProfile(true);
QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
}
} finally {
if (offerRet.isOfferSuccess()) {
queryQueue.poll();
}
}
}
@ -646,9 +652,6 @@ public class StmtExecutor {
throw e;
} finally {
queryAnalysisSpan.end();
if (offerRet.isOfferSuccess()) {
queryQueue.poll();
}
}
if (isForwardToMaster()) {
if (isProxy) {