[api-change](http) change kill query http api by using query id (#12120)

Now user can cancel query id by http by following steps:

Get query id by trace id
cancel query by query id
The modified api has not been released yet.
This commit is contained in:
Mingyu Chen
2022-08-29 09:51:51 +08:00
committed by GitHub
parent dec576a991
commit 7fbcf3c8ba
9 changed files with 200 additions and 36 deletions

View File

@ -34,9 +34,15 @@ under the License.
`GET /rest/v2/manager/query/profile/text/{query_id}`
`GET /rest/v2/manager/query/profile/graph/{query_id}`
`GET /rest/v2/manager/query/profile/json/{query_id}`
`GET /rest/v2/manager/query/profile/fragments/{query_id}`
`GET /rest/v2/manager/query/profile/graph/{query_id}`
`GET /rest/v2/manager/query/current_queries`
`GET /rest/v2/manager/query/kill/{query_id}`
## Get the query information
@ -342,7 +348,7 @@ Same as `show proc "/current_query_stmts"`, return current running queries.
## Cancel query
`POST /rest/v2/manager/query/kill/{connection_id}`
`POST /rest/v2/manager/query/kill/{query_id}`
### Description
@ -350,9 +356,9 @@ Cancel query of specified connection.
### Path parameters
* `{connection_id}`
* `{query_id}`
connection id
query id. You can get query id by `trance_id` api.
### Query parameters
@ -362,7 +368,7 @@ Cancel query of specified connection.
{
"msg": "success",
"code": 0,
"data": "",
"data": null,
"count": 0
}
```

View File

@ -42,8 +42,7 @@ under the License.
`GET /rest/v2/manager/query/current_queries`
`GET /rest/v2/manager/query/kill/{connection_id}`
`GET /rest/v2/manager/query/kill/{query_id}`
## 获取查询信息
@ -349,7 +348,7 @@ GET /rest/v2/manager/query/query_info
## 取消query
`POST /rest/v2/manager/query/kill/{connection_id}`
`POST /rest/v2/manager/query/kill/{query_id}`
### Description
@ -357,9 +356,9 @@ GET /rest/v2/manager/query/query_info
### Path parameters
* `{connection_id}`
* `{query_id}`
connection id
query id. 你可以通过 trace_id 接口,获取 query id
### Query parameters
@ -369,7 +368,7 @@ GET /rest/v2/manager/query/query_info
{
"msg": "success",
"code": 0,
"data": "",
"data": null,
"count": 0
}
```

View File

@ -45,6 +45,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.json.simple.JSONObject;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
@ -70,7 +71,7 @@ import javax.servlet.http.HttpServletResponse;
* 4. /trace_id/{trace_id}
* 5. /profile/fragments/{query_id}
* 6. /current_queries
* 7. /kill/{connection_id}
* 7. /kill/{query_id}
*/
@RestController
@RequestMapping("/rest/v2/manager/query")
@ -100,7 +101,8 @@ public class QueryProfileAction extends RestBaseController {
.add(NODE).add(USER).add(DEFAULT_DB).add(SQL_STATEMENT).add(QUERY_TYPE).add(START_TIME).add(END_TIME)
.add(TOTAL).add(QUERY_STATE).build();
private List<String> requestAllFe(String httpPath, Map<String, String> arguments, String authorization) {
private List<String> requestAllFe(String httpPath, Map<String, String> arguments, String authorization,
HttpMethod method) {
List<Pair<String, Integer>> frontends = HttpUtils.getFeList();
ImmutableMap<String, String> header = ImmutableMap.<String, String>builder()
.put(NodeAction.AUTHORIZATION, authorization).build();
@ -108,7 +110,12 @@ public class QueryProfileAction extends RestBaseController {
for (Pair<String, Integer> ipPort : frontends) {
String url = HttpUtils.concatUrl(ipPort, httpPath, arguments);
try {
String data = HttpUtils.parseResponse(HttpUtils.doGet(url, header));
String data = null;
if (method == HttpMethod.GET) {
data = HttpUtils.parseResponse(HttpUtils.doGet(url, header));
} else if (method == HttpMethod.POST) {
data = HttpUtils.parseResponse(HttpUtils.doPost(url, header, null));
}
if (!Strings.isNullOrEmpty(data) && !data.equals("{}")) {
dataList.add(data);
}
@ -145,12 +152,12 @@ public class QueryProfileAction extends RestBaseController {
arguments.put(SEARCH_PARA, search);
arguments.put(IS_ALL_NODE_PARA, "false");
List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION));
List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION),
HttpMethod.GET);
for (String data : dataList) {
try {
NodeAction.NodeInfo nodeInfo = GsonUtils.GSON.fromJson(data,
new TypeToken<NodeAction.NodeInfo>() {
}.getType());
NodeAction.NodeInfo nodeInfo = GsonUtils.GSON.fromJson(data, new TypeToken<NodeAction.NodeInfo>() {
}.getType());
queries.addAll(nodeInfo.getRows());
} catch (Exception e) {
LOG.warn("parse query info error: {}", data, e);
@ -200,7 +207,8 @@ public class QueryProfileAction extends RestBaseController {
String httpPath = "/rest/v2/manager/query/sql/" + queryId;
ImmutableMap<String, String> arguments = ImmutableMap.<String, String>builder()
.put(IS_ALL_NODE_PARA, "false").build();
List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION));
List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION),
HttpMethod.GET);
if (!dataList.isEmpty()) {
try {
String sql = JsonParser.parseString(dataList.get(0)).getAsJsonObject().get("sql").getAsString();
@ -292,7 +300,8 @@ public class QueryProfileAction extends RestBaseController {
}
}
} else {
String queryId = ProfileManager.getInstance().getQueryIdByTraceId(traceId);
ExecuteEnv env = ExecuteEnv.getInstance();
String queryId = env.getScheduler().getQueryIdByTraceId(traceId);
if (Strings.isNullOrEmpty(queryId)) {
return ResponseEntityBuilder.badRequest("Not found");
}
@ -415,7 +424,8 @@ public class QueryProfileAction extends RestBaseController {
if (!Strings.isNullOrEmpty(instanceId)) {
builder.put(INSTANCE_ID, instanceId);
}
List<String> dataList = requestAllFe(httpPath, builder.build(), request.getHeader(NodeAction.AUTHORIZATION));
List<String> dataList = requestAllFe(httpPath, builder.build(), request.getHeader(NodeAction.AUTHORIZATION),
HttpMethod.GET);
Map<String, String> result = Maps.newHashMap();
if (!dataList.isEmpty()) {
try {
@ -448,7 +458,8 @@ public class QueryProfileAction extends RestBaseController {
Map<String, String> arguments = Maps.newHashMap();
arguments.put(IS_ALL_NODE_PARA, "false");
List<List<String>> queries = Lists.newArrayList();
List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION));
List<String> dataList = requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION),
HttpMethod.GET);
for (String data : dataList) {
try {
NodeAction.NodeInfo nodeInfo = GsonUtils.GSON.fromJson(data, new TypeToken<NodeAction.NodeInfo>() {
@ -481,25 +492,31 @@ public class QueryProfileAction extends RestBaseController {
}
/**
* kill queries with specified connection id
* kill queries with specific query id
*
* @param request
* @param response
* @param connectionId
* @param queryId
* @return
*/
@RequestMapping(path = "/kill/{connection_id}", method = RequestMethod.POST)
@RequestMapping(path = "/kill/{query_id}", method = RequestMethod.POST)
public Object killQuery(HttpServletRequest request, HttpServletResponse response,
@PathVariable("connection_id") int connectionId) {
@PathVariable("query_id") String queryId,
@RequestParam(value = IS_ALL_NODE_PARA, required = false, defaultValue = "true") boolean isAllNode) {
executeCheckPassword(request, response);
checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN);
ExecuteEnv env = ExecuteEnv.getInstance();
ConnectContext ctx = env.getScheduler().getContext(connectionId);
if (ctx == null) {
return ResponseEntityBuilder.notFound("connection not found");
if (isAllNode) {
// Get current queries from all FE
String httpPath = "/rest/v2/manager/query/kill/" + queryId;
Map<String, String> arguments = Maps.newHashMap();
arguments.put(IS_ALL_NODE_PARA, "false");
requestAllFe(httpPath, arguments, request.getHeader(NodeAction.AUTHORIZATION), HttpMethod.POST);
return ResponseEntityBuilder.ok();
}
ctx.cancelQuery();
ExecuteEnv env = ExecuteEnv.getInstance();
env.getScheduler().cancelQuery(queryId);
return ResponseEntityBuilder.ok();
}
}

View File

@ -38,6 +38,7 @@ import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.opentelemetry.api.trace.Tracer;
@ -62,6 +63,7 @@ public class ConnectContext {
protected volatile long forwardedStmtId;
protected volatile TUniqueId queryId;
protected volatile String traceId;
// id for this connection
protected volatile int connectionId;
// mysql net
@ -469,6 +471,17 @@ public class ConnectContext {
public void setQueryId(TUniqueId queryId) {
this.queryId = queryId;
if (connectScheduler != null && !Strings.isNullOrEmpty(traceId)) {
connectScheduler.putTraceId2QueryId(traceId, queryId);
}
}
public void setTraceId(String traceId) {
this.traceId = traceId;
}
public String traceId() {
return traceId;
}
public TUniqueId queryId() {

View File

@ -21,9 +21,11 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.mysql.MysqlProto;
import org.apache.doris.mysql.nio.NConnectContext;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -51,6 +53,9 @@ public class ConnectScheduler {
private final ExecutorService executor = ThreadPoolManager.newDaemonCacheThreadPool(
Config.max_connection_scheduler_threads_num, "connect-scheduler-pool", true);
// valid trace id -> query id
private final Map<String, TUniqueId> traceId2QueryId = Maps.newConcurrentMap();
// Use a thread to check whether connection is timeout. Because
// 1. If use a scheduler, the task maybe a huge number when query is messy.
// Let timeout is 10m, and 5000 qps, then there are up to 3000000 tasks in scheduler.
@ -125,6 +130,16 @@ public class ConnectScheduler {
return connectionMap.get(connectionId);
}
public void cancelQuery(String queryId) {
for (ConnectContext ctx : connectionMap.values()) {
TUniqueId qid = ctx.queryId();
if (qid != null && DebugUtil.printId(qid).equals(queryId)) {
ctx.cancelQuery();
break;
}
}
}
public int getConnectionNum() {
return numberConnection.get();
}
@ -143,6 +158,15 @@ public class ConnectScheduler {
return infos;
}
public void putTraceId2QueryId(String traceId, TUniqueId queryId) {
traceId2QueryId.put(traceId, queryId);
}
public String getQueryIdByTraceId(String traceId) {
TUniqueId queryId = traceId2QueryId.get(traceId);
return queryId == null ? "" : DebugUtil.printId(queryId);
}
private class LoopHandler implements Runnable {
ConnectContext context;

View File

@ -186,6 +186,10 @@ public class VariableMgr {
ErrorReport.reportDdlException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR, attr.name(), value);
}
if (VariableVarCallbacks.hasCallback(attr.name())) {
VariableVarCallbacks.call(attr.name(), value);
}
return true;
}
@ -515,9 +519,6 @@ public class VariableMgr {
// Set to true if the variables need to be forwarded along with forward statement.
boolean needForward() default false;
// Set to true if the variables need to be set in TQueryOptions
boolean isQueryOption() default false;
}
private static class VarContext {
@ -587,8 +588,7 @@ public class VariableMgr {
}
field.setAccessible(true);
builder.put(attr.name(),
new VarContext(field, null, GLOBAL | attr.flag(), getValue(null, field)));
builder.put(attr.name(), new VarContext(field, null, GLOBAL | attr.flag(), getValue(null, field)));
}
return builder;
}

View File

@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.common.DdlException;
public interface VariableVarCallbackI {
public void call(String value) throws DdlException;
}

View File

@ -0,0 +1,73 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.common.DdlException;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import java.util.Map;
/**
* Callback after setting the session variable
*/
public class VariableVarCallbacks {
public static final Map<String, VariableVarCallbackI> callbacks = Maps.newHashMap();
static {
SessionContextCallback sessionContextCallback = new SessionContextCallback();
callbacks.put(SessionVariable.SESSION_CONTEXT, sessionContextCallback);
}
public static Boolean hasCallback(String varName) {
return callbacks.containsKey(varName);
}
public static void call(String varName, String value) throws DdlException {
if (hasCallback(varName)) {
callbacks.get(varName).call(value);
}
}
// Converter to convert runtime filter type variable
public static class SessionContextCallback implements VariableVarCallbackI {
public void call(String value) throws DdlException {
if (Strings.isNullOrEmpty(value)) {
return;
}
/**
* The sessionContext is as follows:
* "k1:v1;k2:v2;..."
* Here we want to get value with key named "trace_id".
*/
String[] parts = value.split(";");
for (String part : parts) {
String[] innerParts = part.split(":");
if (innerParts.length != 2) {
continue;
}
if (innerParts[0].equals("trace_id")) {
ConnectContext.get().setTraceId(innerParts[1]);
break;
}
}
}
}
}

View File

@ -233,4 +233,12 @@ public class VariableMgrTest {
VariableMgr.setVar(null, setVar);
Assert.fail("No exception throws.");
}
@Test
public void testVariableCallback() throws Exception {
SetStmt stmt = (SetStmt) UtFrameUtils.parseAndAnalyzeStmt("set session_context='trace_id:123'", ctx);
SetExecutor executor = new SetExecutor(ctx, stmt);
executor.execute();
Assert.assertEquals("123", ctx.traceId());
}
}