[Cherry-pick]Support kill query in be (#35794)

## Proposed changes
 pick #35602

```
mysql [information_schema]>kill query '2047df937c66704d-3ac4cfaf17f65eae';
Query OK, 0 rows affected (0.01 sec)


I20240603 15:21:50.373333 3355508 internal_service.cpp:592] Cancel query 2047df937c66704d-3ac4cfaf17f65eae, reason: USER_CANCEL

```
This commit is contained in:
wangbo
2024-06-03 15:39:30 +08:00
committed by GitHub
parent 4f0365e0bf
commit 3e096dda91
4 changed files with 112 additions and 5 deletions

View File

@ -21,6 +21,8 @@ import org.apache.doris.common.Pair;
import org.apache.doris.proto.Types;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Strings;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.text.DecimalFormat;
@ -135,6 +137,28 @@ public class DebugUtil {
return builder.toString();
}
// id is a String generated by DebugUtil.printId(TUniqueId)
public static TUniqueId parseTUniqueIdFromString(String id) {
if (Strings.isNullOrEmpty(id)) {
throw new NumberFormatException("invalid query id");
}
String[] parts = id.split("-");
if (parts.length != 2) {
throw new NumberFormatException("invalid query id");
}
TUniqueId uniqueId = new TUniqueId();
try {
uniqueId.setHi(Long.parseUnsignedLong(parts[0], 16));
uniqueId.setLo(Long.parseUnsignedLong(parts[1], 16));
} catch (NumberFormatException e) {
throw new NumberFormatException("invalid query id:" + e.getMessage());
}
return uniqueId;
}
public static String printId(final UUID id) {
TUniqueId tUniqueId = new TUniqueId(id.getMostSignificantBits(), id.getLeastSignificantBits());
StringBuilder builder = new StringBuilder();

View File

@ -220,6 +220,7 @@ import java.io.IOException;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -1419,16 +1420,15 @@ public class StmtExecutor {
}
// Handle kill statement.
private void handleKill() throws DdlException {
private void handleKill() throws UserException {
KillStmt killStmt = (KillStmt) parsedStmt;
ConnectContext killCtx = null;
int id = killStmt.getConnectionId();
String queryId = killStmt.getQueryId();
if (id == -1) {
// when killCtx == null, this means the query not in FE,
// then we just send kill signal to BE
killCtx = context.getConnectScheduler().getContextWithQueryId(queryId);
if (killCtx == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_QUERY, queryId);
}
} else {
killCtx = context.getConnectScheduler().getContext(id);
if (killCtx == null) {
@ -1436,7 +1436,27 @@ public class StmtExecutor {
}
}
if (context == killCtx) {
if (killCtx == null) {
TUniqueId tQueryId = null;
try {
tQueryId = DebugUtil.parseTUniqueIdFromString(queryId);
} catch (NumberFormatException e) {
throw new UserException(e.getMessage());
}
LOG.info("kill query {}", queryId);
Collection<Backend> nodesToPublish = Env.getCurrentSystemInfo().getIdToBackend().values();
for (Backend be : nodesToPublish) {
if (be.isAlive()) {
try {
BackendServiceProxy.getInstance()
.cancelPipelineXPlanFragmentAsync(be.getBrpcAddress(), tQueryId,
Types.PPlanFragmentCancelReason.USER_CANCEL);
} catch (Throwable t) {
LOG.info("send kill query {} rpc to be {} failed", queryId, be);
}
}
}
} else if (context == killCtx) {
// Suicide
context.setKilled();
} else {

View File

@ -276,6 +276,24 @@ public class BackendServiceProxy {
}
}
public ListenableFuture<InternalService.PCancelPlanFragmentResult> cancelPipelineXPlanFragmentAsync(
TNetworkAddress address, TUniqueId queryId,
Types.PPlanFragmentCancelReason cancelReason) throws RpcException {
final InternalService.PCancelPlanFragmentRequest pRequest = InternalService.PCancelPlanFragmentRequest
.newBuilder()
.setFinstId(Types.PUniqueId.newBuilder().setHi(0).setLo(0).build())
.setCancelReason(cancelReason)
.setQueryId(Types.PUniqueId.newBuilder().setHi(queryId.hi).setLo(queryId.lo).build()).build();
try {
final BackendServiceClient client = getProxy(address);
return client.cancelPlanFragmentAsync(pRequest);
} catch (Throwable e) {
LOG.warn("Cancel plan fragment catch a exception, address={}:{}", address.getHostname(), address.getPort(),
e);
throw new RpcException(address.hostname, e.getMessage());
}
}
public Future<InternalService.PFetchDataResult> fetchDataAsync(
TNetworkAddress address, InternalService.PFetchDataRequest request) throws RpcException {
try {