[fix](exec) fix thread token shutdown (#14418)

Fix Thread pool token was shut down error.
This is because when there are more than 1 fragment of a query on one BE, the thread token maybe
reset incorrectly, causing thread token shutdown earlier.
cherry-pick from master
Introduced from #13021
This commit is contained in:
zhannngchen
2022-11-20 00:04:48 +08:00
committed by GitHub
parent 5dfe5ef965
commit 3e1e8db173
3 changed files with 12 additions and 8 deletions

View File

@ -255,8 +255,9 @@ Status FragmentExecState::execute() {
CgroupsMgr::apply_system_cgroup();
opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start executing Fragment");
Status status = _executor.open();
WARN_IF_ERROR(status, strings::Substitute("Got error while opening fragment $0",
print_id(_fragment_instance_id)));
WARN_IF_ERROR(status,
strings::Substitute("Got error while opening fragment $0, query id: $1",
print_id(_fragment_instance_id), print_id(_query_id)));
_executor.close();
if (!status.ok()) {
@ -403,7 +404,8 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil
<< apache::thrift::ThriftDebugString(params).c_str();
if (!exec_status.ok()) {
LOG(WARNING) << "report error status: " << exec_status.to_string()
<< " to coordinator: " << _coord_addr;
<< " to coordinator: " << _coord_addr << ", query id: " << print_id(_query_id)
<< ", instance id: " << print_id(_fragment_instance_id);
}
try {
try {
@ -630,7 +632,6 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
BackendOptions::get_localhost());
}
fragments_ctx = search->second;
_set_scan_concurrency(params, fragments_ctx.get());
} else {
// This may be a first fragment request of the query.
// Create the query fragments context.
@ -689,7 +690,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
<< print_id(fragments_ctx->query_id)
<< " limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES);
} else {
// Already has a query fragmentscontext, use it
// Already has a query fragments context, use it
fragments_ctx = search->second;
}
}

View File

@ -626,7 +626,9 @@ void PlanFragmentExecutor::update_status(const Status& new_status) {
void PlanFragmentExecutor::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) {
LOG_INFO("PlanFragmentExecutor::cancel")
.tag("query_id", _query_id)
.tag("instance_id", _runtime_state->fragment_instance_id());
.tag("instance_id", _runtime_state->fragment_instance_id())
.tag("reason", reason)
.tag("error message", msg);
DCHECK(_prepared);
_cancel_reason = reason;
_cancel_msg = msg;

View File

@ -1695,8 +1695,9 @@ public class Coordinator {
// and returned_all_results_ is true.
// (UpdateStatus() initiates cancellation, if it hasn't already been initiated)
if (!(returnedAllResults && status.isCancelled()) && !status.ok()) {
LOG.warn("one instance report fail, query_id={} instance_id={}",
DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()));
LOG.warn("one instance report fail, query_id={} instance_id={}, error message: {}",
DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()),
status.getErrorMsg());
updateStatus(status, params.getFragmentInstanceId());
}
if (execState.done) {