[enhancement](http) executeSQL rest api support streaming response (#12239)

This commit is contained in:
Tiewei Fang
2022-09-08 14:57:15 +08:00
committed by GitHub
parent 9225dd16ca
commit 824a192f8f
4 changed files with 293 additions and 8 deletions

View File

@ -100,7 +100,11 @@ public class StmtExecutionAction extends RestBaseController {
ConnectContext.get().changeDefaultCatalog(ns);
ConnectContext.get().setDatabase(getFullDbName(dbName));
return executeQuery(authInfo, stmtRequestBody.is_sync, stmtRequestBody.limit, stmtRequestBody);
String streamHeader = request.getHeader("X-Doris-Stream");
boolean isStream = !("false".equalsIgnoreCase(streamHeader));
return executeQuery(authInfo, stmtRequestBody.is_sync, stmtRequestBody.limit, stmtRequestBody,
response, isStream);
}
@ -138,16 +142,19 @@ public class StmtExecutionAction extends RestBaseController {
* @param stmtRequestBody
* @return
*/
@NotNull
private ResponseEntity executeQuery(ActionAuthorizationInfo authInfo, boolean isSync, long limit,
StmtRequestBody stmtRequestBody) {
StmtRequestBody stmtRequestBody, HttpServletResponse response, boolean isStream) {
StatementSubmitter.StmtContext stmtCtx = new StatementSubmitter.StmtContext(stmtRequestBody.stmt,
authInfo.fullUserName, authInfo.password, limit);
authInfo.fullUserName, authInfo.password, limit, isStream, response);
Future<ExecutionResultSet> future = stmtSubmitter.submit(stmtCtx);
if (isSync) {
try {
ExecutionResultSet resultSet = future.get();
// if use stream response, we not need to response an object.
if (isStream) {
return null;
}
return ResponseEntityBuilder.ok(resultSet.getResult());
} catch (InterruptedException e) {
LOG.warn("failed to execute stmt", e);

View File

@ -29,6 +29,8 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.httpv2.util.streamresponse.JsonStreamResponse;
import org.apache.doris.httpv2.util.streamresponse.StreamResponseInf;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
@ -49,6 +51,7 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import javax.servlet.http.HttpServletResponse;
/**
* This is a simple stmt submitter for submitting a statement to the local FE.
@ -78,7 +81,6 @@ public class StatementSubmitter {
}
private static class Worker implements Callable<ExecutionResultSet> {
private ConnectContext ctx;
private StmtContext queryCtx;
@ -102,14 +104,25 @@ public class StatementSubmitter {
stmt = conn.prepareStatement(
queryCtx.stmt, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
// set fetch size to 1 to enable streaming result set to avoid OOM.
((PreparedStatement) stmt).setFetchSize(1);
((PreparedStatement) stmt).setFetchSize(1000);
ResultSet rs = ((PreparedStatement) stmt).executeQuery();
if (queryCtx.isStream) {
StreamResponseInf streamResponse = new JsonStreamResponse(queryCtx.response);
streamResponse.handleQueryAndShow(rs, startTime);
rs.close();
return new ExecutionResultSet(null);
}
ExecutionResultSet resultSet = generateResultSet(rs, startTime);
rs.close();
return resultSet;
} else if (stmtBase instanceof DdlStmt || stmtBase instanceof ExportStmt) {
stmt = conn.createStatement();
stmt.execute(queryCtx.stmt);
if (queryCtx.isStream) {
StreamResponseInf streamResponse = new JsonStreamResponse(queryCtx.response);
streamResponse.handleDdlAndExport(startTime);
return new ExecutionResultSet(null);
}
ExecutionResultSet resultSet = generateExecStatus(startTime);
return resultSet;
} else {
@ -198,7 +211,7 @@ public class StatementSubmitter {
* "time" : 10
* }
*/
private ExecutionResultSet generateExecStatus(long startTime) throws SQLException {
private ExecutionResultSet generateExecStatus(long startTime) {
Map<String, Object> result = Maps.newHashMap();
result.put("type", TYPE_EXEC_STATUS);
result.put("status", Maps.newHashMap());
@ -228,12 +241,18 @@ public class StatementSubmitter {
public String user;
public String passwd;
public long limit; // limit the number of rows returned by the stmt
// used for stream Work
public boolean isStream;
public HttpServletResponse response;
public StmtContext(String stmt, String user, String passwd, long limit) {
public StmtContext(String stmt, String user, String passwd, long limit,
boolean isStream, HttpServletResponse response) {
this.stmt = stmt;
this.user = user;
this.passwd = passwd;
this.limit = limit;
this.isStream = isStream;
this.response = response;
}
}
}

View File

@ -0,0 +1,203 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.httpv2.util.streamresponse;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.stream.JsonWriter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletResponse;
/**
* Serialize the ResultSet to JSON, and then response to client
*/
public class JsonStreamResponse extends StreamResponseInf {
private static final Logger LOG = LogManager.getLogger(JsonStreamResponse.class);
private static final Gson gson = new Gson();
private JsonWriter jsonWriter;
public static final String Name = "Json";
public JsonStreamResponse(HttpServletResponse response) {
super(response);
}
/**
* Result json sample:
* {
* "data": {
* "type": "result_set",
* "meta": {},
* "data": [],
* "time": 10
* },
* "msg" : "success",
* "code" : 0
* }
*/
@Override
public void handleQueryAndShow(ResultSet rs, long startTime) throws Exception {
response.setContentType("application/json;charset=utf-8");
out = response.getWriter();
jsonWriter = new JsonWriter(out);
jsonWriter.setIndent(" ");
try {
// begin write response
jsonWriter.beginObject();
// data
writeResultSetData(rs, jsonWriter, startTime);
// suffix contains msg, code.
writeResponseSuffix(jsonWriter);
jsonWriter.endObject();
} catch (SQLException e) {
LOG.warn("Write response error.", e);
} finally {
jsonWriter.flush();
try {
jsonWriter.close();
} catch (IOException e) {
LOG.warn("JSONWriter close exception: ", e);
}
}
}
/**
* Result json sample:
* {
* "data": {
* "type": "exec_status",
* "status": {},
* "time": 10
* },
* "msg" : "success",
* "code" : 0
* }
*/
@Override
public void handleDdlAndExport(long startTime) throws Exception {
response.setContentType("application/json;charset=utf-8");
out = response.getWriter();
jsonWriter = new JsonWriter(out);
jsonWriter.setIndent(" ");
jsonWriter.beginObject();
jsonWriter.name("msg").value("success")
.name("code").value(RestApiStatusCode.OK.code);
writeExecStatusData(startTime);
jsonWriter.endObject();
}
public StreamResponseType getType() {
return StreamResponseType.JSON;
}
private void writeResultSetData(ResultSet rs, JsonWriter jsonWriter, long startTime)
throws IOException, SQLException {
// data
jsonWriter.name("data");
jsonWriter.beginObject();
// data-type
jsonWriter.name("type").value(StreamResponseInf.TYPE_RESULT_SET);
if (rs == null) {
jsonWriter.endObject(); // data
return;
}
ResultSetMetaData metaData = rs.getMetaData();
int colNum = metaData.getColumnCount();
List<Map<String, String>> metaFields = Lists.newArrayList();
// index start from 1
for (int i = 1; i <= colNum; ++i) {
Map<String, String> field = Maps.newHashMap();
field.put("name", metaData.getColumnName(i));
field.put("type", metaData.getColumnTypeName(i));
metaFields.add(field);
}
// data-meta
String metaJson = gson.toJson(metaFields);
jsonWriter.name("meta").jsonValue(metaJson);
// data-data
jsonWriter.name("data");
jsonWriter.beginArray();
// when bufferSize == batchSize, flush jsonWriter.
int bufferSize = 0;
long firstRowTime = 0;
boolean begin = false;
while (rs.next()) {
List<Object> row = Lists.newArrayListWithCapacity(colNum);
// index start from 1
for (int i = 1; i <= colNum; ++i) {
String type = rs.getMetaData().getColumnTypeName(i);
if ("DATE".equalsIgnoreCase(type) || "DATETIME".equalsIgnoreCase(type)
|| "DATEV2".equalsIgnoreCase(type) || "DATETIMEV2".equalsIgnoreCase(type)) {
row.add(rs.getString(i));
} else {
row.add(rs.getObject(i));
}
}
if (begin == false) {
firstRowTime = (System.currentTimeMillis() - startTime);
begin = true;
}
String rowJson = gson.toJson(row);
jsonWriter.jsonValue(rowJson);
++bufferSize;
if (bufferSize == streamBatchSize) {
jsonWriter.flush();
bufferSize = 0;
}
}
jsonWriter.endArray();
// data-time
// the time seems no meaning, because the time contains json serialize time.
jsonWriter.name("time").value(firstRowTime);
jsonWriter.endObject(); // data
jsonWriter.flush();
}
private void writeExecStatusData(long startTime) throws IOException {
// data
jsonWriter.name("data");
jsonWriter.beginObject();
// data-type
jsonWriter.name("type").value(StreamResponseInf.TYPE_EXEC_STATUS);
String statusJson = gson.toJson(Maps.newHashMap());
// data-status
jsonWriter.name("status").jsonValue(statusJson);
// data-time
jsonWriter.name("time").value((System.currentTimeMillis() - startTime));
jsonWriter.endObject();
}
private void writeResponseSuffix(JsonWriter jsonWriter) throws IOException {
// msg
jsonWriter.name("msg").value("success");
// code
jsonWriter.name("code").value(RestApiStatusCode.OK.code);
}
}

View File

@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.httpv2.util.streamresponse;
import java.io.PrintWriter;
import java.sql.ResultSet;
import javax.servlet.http.HttpServletResponse;
/**
* StreamResponseInf use response.getWriter() to response client
*/
public abstract class StreamResponseInf {
public static final String TYPE_RESULT_SET = "result_set";
public static final String TYPE_EXEC_STATUS = "exec_status";
protected HttpServletResponse response;
protected PrintWriter out;
protected int streamBatchSize = 1000;
public abstract void handleQueryAndShow(ResultSet rs, long startTime) throws Exception;
public abstract void handleDdlAndExport(long startTime) throws Exception;
public abstract StreamResponseType getType();
public StreamResponseInf(HttpServletResponse response) {
this.response = response;
}
public enum StreamResponseType {
JSON;
public String toStreamResponseName() {
switch (this) {
case JSON:
return "Json";
default:
return null;
}
}
}
}