diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StmtExecutionAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StmtExecutionAction.java index 7ca29fef2b..76f2c50447 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StmtExecutionAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StmtExecutionAction.java @@ -100,7 +100,11 @@ public class StmtExecutionAction extends RestBaseController { ConnectContext.get().changeDefaultCatalog(ns); ConnectContext.get().setDatabase(getFullDbName(dbName)); - return executeQuery(authInfo, stmtRequestBody.is_sync, stmtRequestBody.limit, stmtRequestBody); + + String streamHeader = request.getHeader("X-Doris-Stream"); + boolean isStream = !("false".equalsIgnoreCase(streamHeader)); + return executeQuery(authInfo, stmtRequestBody.is_sync, stmtRequestBody.limit, stmtRequestBody, + response, isStream); } @@ -138,16 +142,19 @@ public class StmtExecutionAction extends RestBaseController { * @param stmtRequestBody * @return */ - @NotNull private ResponseEntity executeQuery(ActionAuthorizationInfo authInfo, boolean isSync, long limit, - StmtRequestBody stmtRequestBody) { + StmtRequestBody stmtRequestBody, HttpServletResponse response, boolean isStream) { StatementSubmitter.StmtContext stmtCtx = new StatementSubmitter.StmtContext(stmtRequestBody.stmt, - authInfo.fullUserName, authInfo.password, limit); + authInfo.fullUserName, authInfo.password, limit, isStream, response); Future future = stmtSubmitter.submit(stmtCtx); if (isSync) { try { ExecutionResultSet resultSet = future.get(); + // if use stream response, we not need to response an object. + if (isStream) { + return null; + } return ResponseEntityBuilder.ok(resultSet.getResult()); } catch (InterruptedException e) { LOG.warn("failed to execute stmt", e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java index 6151229e67..5817f1d252 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/StatementSubmitter.java @@ -29,6 +29,8 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.util.SqlParserUtils; +import org.apache.doris.httpv2.util.streamresponse.JsonStreamResponse; +import org.apache.doris.httpv2.util.streamresponse.StreamResponseInf; import org.apache.doris.qe.ConnectContext; import com.google.common.collect.Lists; @@ -49,6 +51,7 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; +import javax.servlet.http.HttpServletResponse; /** * This is a simple stmt submitter for submitting a statement to the local FE. @@ -78,7 +81,6 @@ public class StatementSubmitter { } private static class Worker implements Callable { - private ConnectContext ctx; private StmtContext queryCtx; @@ -102,14 +104,25 @@ public class StatementSubmitter { stmt = conn.prepareStatement( queryCtx.stmt, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); // set fetch size to 1 to enable streaming result set to avoid OOM. - ((PreparedStatement) stmt).setFetchSize(1); + ((PreparedStatement) stmt).setFetchSize(1000); ResultSet rs = ((PreparedStatement) stmt).executeQuery(); + if (queryCtx.isStream) { + StreamResponseInf streamResponse = new JsonStreamResponse(queryCtx.response); + streamResponse.handleQueryAndShow(rs, startTime); + rs.close(); + return new ExecutionResultSet(null); + } ExecutionResultSet resultSet = generateResultSet(rs, startTime); rs.close(); return resultSet; } else if (stmtBase instanceof DdlStmt || stmtBase instanceof ExportStmt) { stmt = conn.createStatement(); stmt.execute(queryCtx.stmt); + if (queryCtx.isStream) { + StreamResponseInf streamResponse = new JsonStreamResponse(queryCtx.response); + streamResponse.handleDdlAndExport(startTime); + return new ExecutionResultSet(null); + } ExecutionResultSet resultSet = generateExecStatus(startTime); return resultSet; } else { @@ -198,7 +211,7 @@ public class StatementSubmitter { * "time" : 10 * } */ - private ExecutionResultSet generateExecStatus(long startTime) throws SQLException { + private ExecutionResultSet generateExecStatus(long startTime) { Map result = Maps.newHashMap(); result.put("type", TYPE_EXEC_STATUS); result.put("status", Maps.newHashMap()); @@ -228,12 +241,18 @@ public class StatementSubmitter { public String user; public String passwd; public long limit; // limit the number of rows returned by the stmt + // used for stream Work + public boolean isStream; + public HttpServletResponse response; - public StmtContext(String stmt, String user, String passwd, long limit) { + public StmtContext(String stmt, String user, String passwd, long limit, + boolean isStream, HttpServletResponse response) { this.stmt = stmt; this.user = user; this.passwd = passwd; this.limit = limit; + this.isStream = isStream; + this.response = response; } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/streamresponse/JsonStreamResponse.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/streamresponse/JsonStreamResponse.java new file mode 100644 index 0000000000..e816efaeb8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/streamresponse/JsonStreamResponse.java @@ -0,0 +1,203 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.util.streamresponse; + +import org.apache.doris.httpv2.rest.RestApiStatusCode; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.gson.stream.JsonWriter; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import javax.servlet.http.HttpServletResponse; + +/** + * Serialize the ResultSet to JSON, and then response to client + */ +public class JsonStreamResponse extends StreamResponseInf { + private static final Logger LOG = LogManager.getLogger(JsonStreamResponse.class); + private static final Gson gson = new Gson(); + private JsonWriter jsonWriter; + + public static final String Name = "Json"; + + public JsonStreamResponse(HttpServletResponse response) { + super(response); + } + + /** + * Result json sample: + * { + * "data": { + * "type": "result_set", + * "meta": {}, + * "data": [], + * "time": 10 + * }, + * "msg" : "success", + * "code" : 0 + * } + */ + @Override + public void handleQueryAndShow(ResultSet rs, long startTime) throws Exception { + response.setContentType("application/json;charset=utf-8"); + out = response.getWriter(); + jsonWriter = new JsonWriter(out); + jsonWriter.setIndent(" "); + try { + // begin write response + jsonWriter.beginObject(); + // data + writeResultSetData(rs, jsonWriter, startTime); + // suffix contains msg, code. + writeResponseSuffix(jsonWriter); + jsonWriter.endObject(); + } catch (SQLException e) { + LOG.warn("Write response error.", e); + } finally { + jsonWriter.flush(); + try { + jsonWriter.close(); + } catch (IOException e) { + LOG.warn("JSONWriter close exception: ", e); + } + } + } + + /** + * Result json sample: + * { + * "data": { + * "type": "exec_status", + * "status": {}, + * "time": 10 + * }, + * "msg" : "success", + * "code" : 0 + * } + */ + @Override + public void handleDdlAndExport(long startTime) throws Exception { + response.setContentType("application/json;charset=utf-8"); + out = response.getWriter(); + jsonWriter = new JsonWriter(out); + jsonWriter.setIndent(" "); + jsonWriter.beginObject(); + jsonWriter.name("msg").value("success") + .name("code").value(RestApiStatusCode.OK.code); + writeExecStatusData(startTime); + jsonWriter.endObject(); + + } + + public StreamResponseType getType() { + return StreamResponseType.JSON; + } + + private void writeResultSetData(ResultSet rs, JsonWriter jsonWriter, long startTime) + throws IOException, SQLException { + // data + jsonWriter.name("data"); + jsonWriter.beginObject(); + // data-type + jsonWriter.name("type").value(StreamResponseInf.TYPE_RESULT_SET); + if (rs == null) { + jsonWriter.endObject(); // data + return; + } + ResultSetMetaData metaData = rs.getMetaData(); + int colNum = metaData.getColumnCount(); + List> metaFields = Lists.newArrayList(); + // index start from 1 + for (int i = 1; i <= colNum; ++i) { + Map field = Maps.newHashMap(); + field.put("name", metaData.getColumnName(i)); + field.put("type", metaData.getColumnTypeName(i)); + metaFields.add(field); + } + // data-meta + String metaJson = gson.toJson(metaFields); + jsonWriter.name("meta").jsonValue(metaJson); + // data-data + jsonWriter.name("data"); + jsonWriter.beginArray(); + // when bufferSize == batchSize, flush jsonWriter. + int bufferSize = 0; + long firstRowTime = 0; + boolean begin = false; + while (rs.next()) { + List row = Lists.newArrayListWithCapacity(colNum); + // index start from 1 + for (int i = 1; i <= colNum; ++i) { + String type = rs.getMetaData().getColumnTypeName(i); + if ("DATE".equalsIgnoreCase(type) || "DATETIME".equalsIgnoreCase(type) + || "DATEV2".equalsIgnoreCase(type) || "DATETIMEV2".equalsIgnoreCase(type)) { + row.add(rs.getString(i)); + } else { + row.add(rs.getObject(i)); + } + } + if (begin == false) { + firstRowTime = (System.currentTimeMillis() - startTime); + begin = true; + } + String rowJson = gson.toJson(row); + jsonWriter.jsonValue(rowJson); + ++bufferSize; + if (bufferSize == streamBatchSize) { + jsonWriter.flush(); + bufferSize = 0; + } + } + jsonWriter.endArray(); + // data-time + // the time seems no meaning, because the time contains json serialize time. + jsonWriter.name("time").value(firstRowTime); + jsonWriter.endObject(); // data + jsonWriter.flush(); + } + + private void writeExecStatusData(long startTime) throws IOException { + // data + jsonWriter.name("data"); + jsonWriter.beginObject(); + // data-type + jsonWriter.name("type").value(StreamResponseInf.TYPE_EXEC_STATUS); + String statusJson = gson.toJson(Maps.newHashMap()); + // data-status + jsonWriter.name("status").jsonValue(statusJson); + // data-time + jsonWriter.name("time").value((System.currentTimeMillis() - startTime)); + jsonWriter.endObject(); + } + + private void writeResponseSuffix(JsonWriter jsonWriter) throws IOException { + // msg + jsonWriter.name("msg").value("success"); + // code + jsonWriter.name("code").value(RestApiStatusCode.OK.code); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/streamresponse/StreamResponseInf.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/streamresponse/StreamResponseInf.java new file mode 100644 index 0000000000..245222c79d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/util/streamresponse/StreamResponseInf.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.util.streamresponse; + +import java.io.PrintWriter; +import java.sql.ResultSet; +import javax.servlet.http.HttpServletResponse; + +/** + * StreamResponseInf use response.getWriter() to response client + */ +public abstract class StreamResponseInf { + public static final String TYPE_RESULT_SET = "result_set"; + public static final String TYPE_EXEC_STATUS = "exec_status"; + + protected HttpServletResponse response; + protected PrintWriter out; + protected int streamBatchSize = 1000; + + public abstract void handleQueryAndShow(ResultSet rs, long startTime) throws Exception; + + public abstract void handleDdlAndExport(long startTime) throws Exception; + + public abstract StreamResponseType getType(); + + public StreamResponseInf(HttpServletResponse response) { + this.response = response; + } + + public enum StreamResponseType { + JSON; + public String toStreamResponseName() { + switch (this) { + case JSON: + return "Json"; + default: + return null; + } + } + } +}