[fix](cancel) Fix cancel msg on branch-2.1 (#41798)
Make sure we can tell cancel reason from: 1. user cancel 2. timeout 3. others ```text mysql [demo]>set query_timeout=1; -------------- set query_timeout=1 -------------- Query OK, 0 rows affected (0.00 sec) mysql [demo]>select sleep(5); -------------- select sleep(5) -------------- ERROR 1105 (HY000): errCode = 2, detailMessage = Timeout mysql [demo]>select sleep(5); -------------- select sleep(5) -------------- ^C^C -- sending "KILL QUERY 0" to server ... ^C -- query aborted ERROR 1105 (HY000): errCode = 2, detailMessage = cancel query by user from 127.0.0.1:64208 ```
This commit is contained in:
@ -145,7 +145,7 @@ void PipelineXFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
|
||||
LOG_INFO("PipelineXFragmentContext::cancel")
|
||||
.tag("query_id", print_id(_query_id))
|
||||
.tag("fragment_id", _fragment_id)
|
||||
.tag("reason", reason)
|
||||
.tag("reason", PPlanFragmentCancelReason_Name(reason))
|
||||
.tag("error message", msg);
|
||||
if (reason == PPlanFragmentCancelReason::TIMEOUT) {
|
||||
LOG(WARNING) << "PipelineXFragmentContext is cancelled due to timeout : " << debug_string();
|
||||
|
||||
@ -104,7 +104,7 @@ BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size)
|
||||
}
|
||||
|
||||
BufferControlBlock::~BufferControlBlock() {
|
||||
cancel();
|
||||
cancel(Status::Cancelled("Cancelled"));
|
||||
}
|
||||
|
||||
Status BufferControlBlock::init() {
|
||||
@ -266,13 +266,13 @@ Status BufferControlBlock::close(Status exec_status) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void BufferControlBlock::cancel() {
|
||||
void BufferControlBlock::cancel(const Status& reason) {
|
||||
std::unique_lock<std::mutex> l(_lock);
|
||||
_is_cancelled = true;
|
||||
_data_removal.notify_all();
|
||||
_data_arrival.notify_all();
|
||||
for (auto& ctx : _waiting_rpc) {
|
||||
ctx->on_failure(Status::Cancelled("Cancelled"));
|
||||
ctx->on_failure(reason);
|
||||
}
|
||||
_waiting_rpc.clear();
|
||||
}
|
||||
@ -301,8 +301,8 @@ Status PipBufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void PipBufferControlBlock::cancel() {
|
||||
BufferControlBlock::cancel();
|
||||
void PipBufferControlBlock::cancel(const Status& reason) {
|
||||
BufferControlBlock::cancel(reason);
|
||||
_update_dependency();
|
||||
}
|
||||
|
||||
|
||||
@ -86,8 +86,8 @@ public:
|
||||
// close buffer block, set _status to exec_status and set _is_close to true;
|
||||
// called because data has been read or error happened.
|
||||
Status close(Status exec_status);
|
||||
// this is called by RPC, called from coordinator
|
||||
virtual void cancel();
|
||||
|
||||
virtual void cancel(const Status& reason);
|
||||
|
||||
[[nodiscard]] const TUniqueId& fragment_id() const { return _fragment_id; }
|
||||
|
||||
@ -152,7 +152,7 @@ public:
|
||||
|
||||
Status get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result) override;
|
||||
|
||||
void cancel() override;
|
||||
void cancel(const Status& reason) override;
|
||||
|
||||
void set_dependency(std::shared_ptr<pipeline::Dependency> result_sink_dependency);
|
||||
|
||||
|
||||
@ -1249,7 +1249,7 @@ void FragmentMgr::cancel_worker() {
|
||||
clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp);
|
||||
|
||||
do {
|
||||
std::vector<TUniqueId> to_cancel;
|
||||
std::vector<TUniqueId> queries_timeout;
|
||||
std::vector<TUniqueId> queries_to_cancel;
|
||||
std::vector<TUniqueId> queries_pipeline_task_leak;
|
||||
// Fe process uuid -> set<QueryId>
|
||||
@ -1274,7 +1274,7 @@ void FragmentMgr::cancel_worker() {
|
||||
std::lock_guard<std::mutex> lock(_lock);
|
||||
for (auto& fragment_instance_itr : _fragment_instance_map) {
|
||||
if (fragment_instance_itr.second->is_timeout(now)) {
|
||||
to_cancel.push_back(fragment_instance_itr.second->fragment_instance_id());
|
||||
queries_timeout.push_back(fragment_instance_itr.second->fragment_instance_id());
|
||||
}
|
||||
}
|
||||
for (auto& pipeline_itr : _pipeline_map) {
|
||||
@ -1283,7 +1283,7 @@ void FragmentMgr::cancel_worker() {
|
||||
reinterpret_cast<pipeline::PipelineXFragmentContext*>(pipeline_itr.second.get())
|
||||
->instance_ids(ins_ids);
|
||||
for (auto& ins_id : ins_ids) {
|
||||
to_cancel.push_back(ins_id);
|
||||
queries_timeout.push_back(ins_id);
|
||||
}
|
||||
} else {
|
||||
pipeline_itr.second->clear_finished_tasks();
|
||||
@ -1393,9 +1393,9 @@ void FragmentMgr::cancel_worker() {
|
||||
|
||||
// TODO(zhiqiang): It seems that timeout_canceled_fragment_count is
|
||||
// designed to count canceled fragment of non-pipeline query.
|
||||
timeout_canceled_fragment_count->increment(to_cancel.size());
|
||||
for (auto& id : to_cancel) {
|
||||
cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT);
|
||||
timeout_canceled_fragment_count->increment(queries_timeout.size());
|
||||
for (auto& id : queries_timeout) {
|
||||
cancel_instance(id, PPlanFragmentCancelReason::TIMEOUT, "Query timeout");
|
||||
LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout instance "
|
||||
<< print_id(id);
|
||||
}
|
||||
|
||||
@ -32,6 +32,7 @@
|
||||
|
||||
#include "arrow/record_batch.h"
|
||||
#include "arrow/type_fwd.h"
|
||||
#include "common/status.h"
|
||||
#include "runtime/buffer_control_block.h"
|
||||
#include "util/doris_metrics.h"
|
||||
#include "util/metrics.h"
|
||||
@ -150,13 +151,13 @@ Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void ResultBufferMgr::cancel(const TUniqueId& query_id) {
|
||||
void ResultBufferMgr::cancel(const TUniqueId& query_id, const Status& reason) {
|
||||
{
|
||||
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
|
||||
BufferMap::iterator iter = _buffer_map.find(query_id);
|
||||
|
||||
if (_buffer_map.end() != iter) {
|
||||
iter->second->cancel();
|
||||
iter->second->cancel(reason);
|
||||
_buffer_map.erase(iter);
|
||||
}
|
||||
}
|
||||
@ -206,7 +207,7 @@ void ResultBufferMgr::cancel_thread() {
|
||||
|
||||
// cancel query
|
||||
for (int i = 0; i < query_to_cancel.size(); ++i) {
|
||||
cancel(query_to_cancel[i]);
|
||||
cancel(query_to_cancel[i], Status::TimedOut("Query tiemout"));
|
||||
}
|
||||
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
|
||||
|
||||
|
||||
@ -71,7 +71,7 @@ public:
|
||||
std::shared_ptr<arrow::Schema> find_arrow_schema(const TUniqueId& query_id);
|
||||
|
||||
// cancel
|
||||
void cancel(const TUniqueId& fragment_id);
|
||||
void cancel(const TUniqueId& query_id, const Status& reason);
|
||||
|
||||
// cancel one query at a future time.
|
||||
void cancel_at_time(time_t cancel_time, const TUniqueId& query_id);
|
||||
|
||||
@ -576,7 +576,7 @@ public class QueryProfileAction extends RestBaseController {
|
||||
}
|
||||
|
||||
ExecuteEnv env = ExecuteEnv.getInstance();
|
||||
env.getScheduler().cancelQuery(queryId);
|
||||
env.getScheduler().cancelQuery(queryId, "cancel query by rest api");
|
||||
return ResponseEntityBuilder.ok();
|
||||
}
|
||||
}
|
||||
|
||||
@ -221,7 +221,7 @@ public class InsertTask extends AbstractTask {
|
||||
}
|
||||
isCanceled.getAndSet(true);
|
||||
if (null != stmtExecutor) {
|
||||
stmtExecutor.cancel();
|
||||
stmtExecutor.cancel("insert task cancelled");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -258,7 +258,7 @@ public class MTMVTask extends AbstractTask {
|
||||
protected synchronized void executeCancelLogic() {
|
||||
LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
|
||||
if (executor != null) {
|
||||
executor.cancel();
|
||||
executor.cancel("mtmv task cancelled");
|
||||
}
|
||||
after();
|
||||
}
|
||||
|
||||
@ -162,7 +162,7 @@ public class ExportTaskExecutor implements TransientTaskExecutor {
|
||||
}
|
||||
isCanceled.getAndSet(true);
|
||||
if (stmtExecutor != null) {
|
||||
stmtExecutor.cancel();
|
||||
stmtExecutor.cancel("export task cancelled");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -600,7 +600,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
|
||||
for (TUniqueId loadId : loadIds) {
|
||||
Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(loadId);
|
||||
if (coordinator != null) {
|
||||
coordinator.cancel();
|
||||
coordinator.cancel(failMsg.getMsg());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -143,7 +143,7 @@ public abstract class AbstractInsertExecutor {
|
||||
}
|
||||
boolean notTimeout = coordinator.join(execTimeout);
|
||||
if (!coordinator.isDone()) {
|
||||
coordinator.cancel();
|
||||
coordinator.cancel("insert timeout");
|
||||
if (notTimeout) {
|
||||
errMsg = coordinator.getExecStatus().getErrorMsg();
|
||||
ErrorReport.reportDdlException("there exists unhealthy backend. "
|
||||
|
||||
@ -935,7 +935,7 @@ public class ConnectContext {
|
||||
closeChannel();
|
||||
}
|
||||
// Now, cancel running query.
|
||||
cancelQuery();
|
||||
cancelQuery("cancel query by user from " + getRemoteHostPortString());
|
||||
}
|
||||
|
||||
// kill operation with no protect by timeout.
|
||||
@ -956,10 +956,10 @@ public class ConnectContext {
|
||||
}
|
||||
}
|
||||
|
||||
public void cancelQuery() {
|
||||
public void cancelQuery(String cancelMessage) {
|
||||
StmtExecutor executorRef = executor;
|
||||
if (executorRef != null) {
|
||||
executorRef.cancel();
|
||||
executorRef.cancel(cancelMessage);
|
||||
}
|
||||
}
|
||||
|
||||
@ -990,7 +990,7 @@ public class ConnectContext {
|
||||
long timeout = getExecTimeout() * 1000L;
|
||||
if (delta > timeout) {
|
||||
LOG.warn("kill {} timeout, remote: {}, query timeout: {}, query id: {}",
|
||||
timeoutTag, getRemoteHostPortString(), timeout, queryId);
|
||||
timeoutTag, getRemoteHostPortString(), timeout, DebugUtil.printId(queryId));
|
||||
killFlag = true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -145,11 +145,11 @@ public class ConnectScheduler {
|
||||
return null;
|
||||
}
|
||||
|
||||
public void cancelQuery(String queryId) {
|
||||
public void cancelQuery(String queryId, String cancelReason) {
|
||||
for (ConnectContext ctx : connectionMap.values()) {
|
||||
TUniqueId qid = ctx.queryId();
|
||||
if (qid != null && DebugUtil.printId(qid).equals(queryId)) {
|
||||
ctx.cancelQuery();
|
||||
ctx.cancelQuery(cancelReason);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1164,7 +1164,7 @@ public class Coordinator implements CoordInterface {
|
||||
errMsg = operation + " failed. " + exception.getMessage();
|
||||
}
|
||||
queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, errMsg);
|
||||
switch (code) {
|
||||
case TIMEOUT:
|
||||
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
|
||||
@ -1259,7 +1259,7 @@ public class Coordinator implements CoordInterface {
|
||||
errMsg = operation + " failed. " + exception.getMessage();
|
||||
}
|
||||
queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, errMsg);
|
||||
switch (code) {
|
||||
case TIMEOUT:
|
||||
MetricRepo.BE_COUNTER_QUERY_RPC_FAILED.getOrAdd(triple.getLeft().brpcAddr.hostname)
|
||||
@ -1385,9 +1385,9 @@ public class Coordinator implements CoordInterface {
|
||||
|
||||
queryStatus.updateStatus(status.getErrorCode(), status.getErrorMsg());
|
||||
if (status.getErrorCode() == TStatusCode.TIMEOUT) {
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT);
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.TIMEOUT, status.getErrorMsg());
|
||||
} else {
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR);
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.INTERNAL_ERROR, status.getErrorMsg());
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
@ -1426,7 +1426,7 @@ public class Coordinator implements CoordInterface {
|
||||
throw new RpcException(null, copyStatus.getErrorMsg());
|
||||
} else {
|
||||
String errMsg = copyStatus.getErrorMsg();
|
||||
LOG.warn("query failed: {}", errMsg);
|
||||
LOG.warn("Query {} failed: {}", DebugUtil.printId(queryId), errMsg);
|
||||
throw new UserException(errMsg);
|
||||
}
|
||||
}
|
||||
@ -1441,7 +1441,7 @@ public class Coordinator implements CoordInterface {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("no block query, return num >= limit rows, need cancel");
|
||||
}
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH);
|
||||
cancelInternal(Types.PPlanFragmentCancelReason.LIMIT_REACH, "query reach limit");
|
||||
}
|
||||
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().dryRunQuery) {
|
||||
numReceivedRows = 0;
|
||||
@ -1528,8 +1528,8 @@ public class Coordinator implements CoordInterface {
|
||||
// Cancel execution of query. This includes the execution of the local plan
|
||||
// fragment,
|
||||
// if any, as well as all plan fragments on remote nodes.
|
||||
public void cancel() {
|
||||
cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, "user cancel");
|
||||
public void cancel(String errorMsg) {
|
||||
cancel(Types.PPlanFragmentCancelReason.USER_CANCEL, errorMsg);
|
||||
if (queueToken != null) {
|
||||
queueToken.cancel();
|
||||
}
|
||||
@ -1552,8 +1552,8 @@ public class Coordinator implements CoordInterface {
|
||||
queryStatus.updateStatus(TStatusCode.CANCELLED, errorMsg);
|
||||
}
|
||||
LOG.warn("Cancel execution of query {}, this is a outside invoke, cancelReason {}",
|
||||
DebugUtil.printId(queryId), cancelReason.toString());
|
||||
cancelInternal(cancelReason);
|
||||
DebugUtil.printId(queryId), errorMsg);
|
||||
cancelInternal(cancelReason, errorMsg);
|
||||
} finally {
|
||||
unlock();
|
||||
}
|
||||
@ -1577,9 +1577,9 @@ public class Coordinator implements CoordInterface {
|
||||
}
|
||||
}
|
||||
|
||||
private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason) {
|
||||
private void cancelInternal(Types.PPlanFragmentCancelReason cancelReason, String cancelMessage) {
|
||||
if (null != receiver) {
|
||||
receiver.cancel(cancelReason);
|
||||
receiver.cancel(cancelReason, cancelMessage);
|
||||
}
|
||||
if (null != pointExec) {
|
||||
pointExec.cancel();
|
||||
@ -3307,10 +3307,6 @@ public class Coordinator implements CoordInterface {
|
||||
DebugUtil.printId(fragmentInstanceId()), status.toString());
|
||||
}
|
||||
}
|
||||
LOG.warn("Failed to cancel query {} instance initiated={} done={} backend: {},"
|
||||
+ "fragment instance id={}, reason: {}",
|
||||
DebugUtil.printId(queryId), initiated, done, backend.getId(),
|
||||
DebugUtil.printId(fragmentInstanceId()), "without status");
|
||||
}
|
||||
|
||||
public void onFailure(Throwable t) {
|
||||
|
||||
@ -111,8 +111,8 @@ public class ResultReceiver {
|
||||
LOG.warn("Query {} get result timeout, get result duration {} ms",
|
||||
DebugUtil.printId(this.queryId), (timeoutTs - currentTs) / 1000);
|
||||
setRunStatus(Status.TIMEOUT);
|
||||
status.updateStatus(TStatusCode.TIMEOUT, "");
|
||||
updateCancelReason("fetch data timeout");
|
||||
status.updateStatus(TStatusCode.TIMEOUT, "Query timeout");
|
||||
updateCancelReason("Query timeout");
|
||||
return null;
|
||||
} catch (InterruptedException e) {
|
||||
// continue to get result
|
||||
@ -183,7 +183,7 @@ public class ResultReceiver {
|
||||
}
|
||||
} catch (TimeoutException e) {
|
||||
LOG.warn("fetch result timeout, finstId={}", DebugUtil.printId(finstId), e);
|
||||
status.updateStatus(TStatusCode.TIMEOUT, "query timeout");
|
||||
status.updateStatus(TStatusCode.TIMEOUT, "Query timeout");
|
||||
} finally {
|
||||
synchronized (this) {
|
||||
currentThread = null;
|
||||
@ -205,13 +205,14 @@ public class ResultReceiver {
|
||||
}
|
||||
}
|
||||
|
||||
public void cancel(Types.PPlanFragmentCancelReason reason) {
|
||||
public void cancel(Types.PPlanFragmentCancelReason reason, String cancelMessage) {
|
||||
if (reason == Types.PPlanFragmentCancelReason.TIMEOUT) {
|
||||
setRunStatus(Status.TIMEOUT);
|
||||
} else {
|
||||
setRunStatus(Status.CANCELLED);
|
||||
}
|
||||
updateCancelReason(reason.toString());
|
||||
|
||||
updateCancelReason(cancelMessage);
|
||||
synchronized (this) {
|
||||
if (currentThread != null) {
|
||||
// TODO(cmy): we cannot interrupt this thread, or we may throw
|
||||
|
||||
@ -1475,7 +1475,7 @@ public class StmtExecutor {
|
||||
}
|
||||
|
||||
// Because this is called by other thread
|
||||
public void cancel() {
|
||||
public void cancel(String message) {
|
||||
Optional<InsertOverwriteTableCommand> insertOverwriteTableCommand = getInsertOverwriteTableCommand();
|
||||
if (insertOverwriteTableCommand.isPresent()) {
|
||||
// If the be scheduling has not been triggered yet, cancel the scheduling first
|
||||
@ -1483,7 +1483,7 @@ public class StmtExecutor {
|
||||
}
|
||||
Coordinator coordRef = coord;
|
||||
if (coordRef != null) {
|
||||
coordRef.cancel();
|
||||
coordRef.cancel(message);
|
||||
}
|
||||
if (mysqlLoadId != null) {
|
||||
Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId);
|
||||
|
||||
@ -32,7 +32,7 @@ public class WorkloadActionCancelQuery implements WorkloadAction {
|
||||
&& queryInfo.tUniqueId != null
|
||||
&& QeProcessorImpl.INSTANCE.getCoordinator(queryInfo.tUniqueId) != null) {
|
||||
LOG.info("cancel query {} triggered by query schedule policy.", queryInfo.queryId);
|
||||
queryInfo.context.cancelQuery();
|
||||
queryInfo.context.cancelQuery("cancel query by workload policy");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -74,7 +74,7 @@ public class FlightSqlConnectContext extends ConnectContext {
|
||||
connectScheduler.unregisterConnection(this);
|
||||
}
|
||||
// Now, cancel running query.
|
||||
cancelQuery();
|
||||
cancelQuery("arrow flight query killed by user");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -192,7 +192,7 @@ public abstract class BaseAnalysisTask {
|
||||
public void cancel() {
|
||||
killed = true;
|
||||
if (stmtExecutor != null) {
|
||||
stmtExecutor.cancel();
|
||||
stmtExecutor.cancel("analysis task cancelled");
|
||||
}
|
||||
Env.getCurrentEnv().getAnalysisManager()
|
||||
.updateTaskStatus(info, AnalysisState.FAILED,
|
||||
|
||||
Reference in New Issue
Block a user