[feature-wip](arrow-flight)(step4) Support other DML and DDL statements, besides Select (#25919)

Design Documentation Linked to #25514
This commit is contained in:
Xinyi Zou
2023-11-08 10:50:42 +08:00
committed by GitHub
parent 806461721c
commit 1544110c1b
30 changed files with 1131 additions and 594 deletions

View File

@ -1204,7 +1204,10 @@ public enum ErrorCode {
"the auto increment must be BIGINT type."),
ERR_AUTO_INCREMENT_COLUMN_IN_AGGREGATE_TABLE(5096, new byte[]{'4', '2', '0', '0', '0'},
"the auto increment is only supported in duplicate table and unique table.");
"the auto increment is only supported in duplicate table and unique table."),
ERR_ARROW_FLIGHT_SQL_MUST_ONLY_RESULT_STMT(5097, new byte[]{'4', '2', '0', '0', '0'},
"There can only be one stmt that returns the result and it is at the end.");
// This is error code
private final int code;

View File

@ -22,6 +22,7 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.qe.ConnectScheduler;
import org.apache.doris.qe.MysqlConnectProcessor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -81,7 +82,7 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo
context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser()));
context.setUserInsertTimeout(
context.getEnv().getAuth().getInsertTimeout(context.getQualifiedUser()));
ConnectProcessor processor = new ConnectProcessor(context);
ConnectProcessor processor = new MysqlConnectProcessor(context);
context.startAcceptQuery(processor);
} catch (AfterConnectedException e) {
// do not need to print log for this kind of exception.

View File

@ -23,6 +23,7 @@ import java.util.EnumSet;
import java.util.Map;
// MySQL protocol text command
// Reused by arrow flight protocol
public enum MysqlCommand {
COM_SLEEP("Sleep", 0),
COM_QUIT("Quit", 1),

View File

@ -17,6 +17,7 @@
package org.apache.doris.qe;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
@ -39,10 +40,12 @@ import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.system.Backend;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultSinkType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionEntry;
@ -59,6 +62,7 @@ import org.json.JSONObject;
import org.xnio.StreamConnection;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -77,7 +81,7 @@ public class ConnectContext {
public enum ConnectType {
MYSQL,
ARROW_FLIGHT
ARROW_FLIGHT_SQL
}
protected volatile ConnectType connectType;
@ -96,8 +100,14 @@ public class ConnectContext {
protected volatile int connectionId;
// Timestamp when the connection is make
protected volatile long loginTime;
// arrow flight token
// for arrow flight
protected volatile String peerIdentity;
private String runningQuery;
private TNetworkAddress resultFlightServerAddr;
private TNetworkAddress resultInternalServiceAddr;
private ArrayList<Expr> resultOutputExprs;
private TUniqueId finstId;
private boolean returnResultFromLocal = true;
// mysql net
protected volatile MysqlChannel mysqlChannel;
// state
@ -190,7 +200,7 @@ public class ConnectContext {
private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL;
//internal call like `insert overwrite` need skipAuth
// internal call like `insert overwrite` need skipAuth
// For example, `insert overwrite` only requires load permission,
// but the internal implementation will call the logic of `AlterTable`.
// In this case, `skipAuth` needs to be set to `true` to skip the permission check of `AlterTable`
@ -286,41 +296,30 @@ public class ConnectContext {
return connectType;
}
public ConnectContext() {
this((StreamConnection) null);
}
public ConnectContext(String peerIdentity) {
this.connectType = ConnectType.ARROW_FLIGHT;
this.peerIdentity = peerIdentity;
public void init() {
state = new QueryState();
returnRows = 0;
isKilled = false;
sessionVariable = VariableMgr.newSessionVariable();
mysqlChannel = new DummyMysqlChannel();
command = MysqlCommand.COM_SLEEP;
if (Config.use_fuzzy_session_variable) {
sessionVariable.initFuzzyModeVariables();
}
setResultSinkType(TResultSinkType.ARROW_FLIGHT_PROTOCAL);
}
public ConnectContext() {
this((StreamConnection) null);
}
public ConnectContext(StreamConnection connection) {
connectType = ConnectType.MYSQL;
state = new QueryState();
returnRows = 0;
serverCapability = MysqlCapability.DEFAULT_CAPABILITY;
isKilled = false;
if (connection != null) {
mysqlChannel = new MysqlChannel(connection);
} else {
mysqlChannel = new DummyMysqlChannel();
}
sessionVariable = VariableMgr.newSessionVariable();
command = MysqlCommand.COM_SLEEP;
if (Config.use_fuzzy_session_variable) {
sessionVariable.initFuzzyModeVariables();
}
init();
}
public boolean isTxnModel() {
@ -541,14 +540,70 @@ public class ConnectContext {
this.loginTime = System.currentTimeMillis();
}
public void setRunningQuery(String runningQuery) {
this.runningQuery = runningQuery;
}
public String getRunningQuery() {
return runningQuery;
}
public void setResultFlightServerAddr(TNetworkAddress resultFlightServerAddr) {
this.resultFlightServerAddr = resultFlightServerAddr;
}
public TNetworkAddress getResultFlightServerAddr() {
return resultFlightServerAddr;
}
public void setResultInternalServiceAddr(TNetworkAddress resultInternalServiceAddr) {
this.resultInternalServiceAddr = resultInternalServiceAddr;
}
public TNetworkAddress getResultInternalServiceAddr() {
return resultInternalServiceAddr;
}
public void setResultOutputExprs(ArrayList<Expr> resultOutputExprs) {
this.resultOutputExprs = resultOutputExprs;
}
public ArrayList<Expr> getResultOutputExprs() {
return resultOutputExprs;
}
public void setFinstId(TUniqueId finstId) {
this.finstId = finstId;
}
public TUniqueId getFinstId() {
return finstId;
}
public void setReturnResultFromLocal(boolean returnResultFromLocal) {
this.returnResultFromLocal = returnResultFromLocal;
}
public boolean isReturnResultFromLocal() {
return returnResultFromLocal;
}
public String getPeerIdentity() {
return peerIdentity;
}
public FlightSqlChannel getFlightSqlChannel() {
throw new RuntimeException("getFlightSqlChannel not in flight sql connection");
}
public MysqlChannel getMysqlChannel() {
return mysqlChannel;
}
public String getClientIP() {
return getMysqlChannel().getRemoteHostPortString();
}
public QueryState getState() {
return state;
}
@ -620,10 +675,14 @@ public class ConnectContext {
return executor;
}
public void cleanup() {
protected void closeChannel() {
if (mysqlChannel != null) {
mysqlChannel.close();
}
}
public void cleanup() {
closeChannel();
threadLocalInfo.remove();
returnRows = 0;
}
@ -701,27 +760,17 @@ public class ConnectContext {
}
public String getRemoteHostPortString() {
if (connectType.equals(ConnectType.MYSQL)) {
return getMysqlChannel().getRemoteHostPortString();
} else if (connectType.equals(ConnectType.ARROW_FLIGHT)) {
// TODO Get flight client IP:Port
return peerIdentity;
}
return "";
return getMysqlChannel().getRemoteHostPortString();
}
// kill operation with no protect.
public void kill(boolean killConnection) {
LOG.warn("kill query from {}, kill connection: {}", getRemoteHostPortString(), killConnection);
LOG.warn("kill query from {}, kill mysql connection: {}", getRemoteHostPortString(), killConnection);
if (killConnection) {
isKilled = true;
if (connectType.equals(ConnectType.MYSQL)) {
// Close channel to break connection with client
getMysqlChannel().close();
} else if (connectType.equals(ConnectType.ARROW_FLIGHT)) {
connectScheduler.unregisterConnection(this);
}
// Close channel to break connection with client
closeChannel();
}
// Now, cancel running query.
cancelQuery();

View File

@ -17,11 +17,8 @@
package org.apache.doris.qe;
import org.apache.doris.analysis.ExecuteStmt;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.KillStmt;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
@ -35,7 +32,7 @@ import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.telemetry.Telemetry;
import org.apache.doris.common.util.DebugUtil;
@ -47,7 +44,6 @@ import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlPacket;
import org.apache.doris.mysql.MysqlProto;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.mysql.MysqlServerStatusFlag;
import org.apache.doris.nereids.exceptions.NotSupportedException;
@ -61,6 +57,7 @@ import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
@ -74,9 +71,6 @@ import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.AsynchronousCloseException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -84,11 +78,16 @@ import java.util.Map;
import java.util.UUID;
/**
* Process one mysql connection, receive one packet, process, send one packet.
* Process one connection, the life cycle is the same as connection
*/
public class ConnectProcessor {
public abstract class ConnectProcessor {
public enum ConnectType {
MYSQL,
ARROW_FLIGHT_SQL
}
private static final Logger LOG = LogManager.getLogger(ConnectProcessor.class);
private static final TextMapGetter<Map<String, String>> getter =
protected static final TextMapGetter<Map<String, String>> getter =
new TextMapGetter<Map<String, String>>() {
@Override
public Iterable<String> keys(Map<String, String> carrier) {
@ -103,17 +102,17 @@ public class ConnectProcessor {
return "";
}
};
private final ConnectContext ctx;
private ByteBuffer packetBuf;
private StmtExecutor executor = null;
protected final ConnectContext ctx;
protected StmtExecutor executor = null;
protected ConnectType connectType;
protected ArrayList<StmtExecutor> returnResultFromRemoteExecutor = new ArrayList<>();
public ConnectProcessor(ConnectContext context) {
this.ctx = context;
}
// COM_INIT_DB: change current database of this session.
private void handleInitDb() {
String fullDbName = new String(packetBuf.array(), 1, packetBuf.limit() - 1);
// change current database of this session.
protected void handleInitDb(String fullDbName) {
if (Strings.isNullOrEmpty(ctx.getClusterName())) {
ctx.getState().setError(ErrorCode.ERR_CLUSTER_NAME_NULL, "Please enter cluster");
return;
@ -160,24 +159,22 @@ public class ConnectProcessor {
ctx.getState().setOk();
}
// COM_QUIT: set killed flag and then return OK packet.
private void handleQuit() {
// set killed flag
protected void handleQuit() {
ctx.setKilled();
ctx.getState().setOk();
}
// process COM_PING statement, do nothing, just return one OK packet.
private void handlePing() {
// do nothing
protected void handlePing() {
ctx.getState().setOk();
}
private void handleStmtReset() {
protected void handleStmtReset() {
ctx.getState().setOk();
}
private void handleStmtClose() {
packetBuf = packetBuf.order(ByteOrder.LITTLE_ENDIAN);
int stmtId = packetBuf.getInt();
protected void handleStmtClose(int stmtId) {
LOG.debug("close stmt id: {}", stmtId);
ConnectContext.get().removePrepareStmt(String.valueOf(stmtId));
// No response packet is sent back to the client, see
@ -185,119 +182,27 @@ public class ConnectProcessor {
ctx.getState().setNoop();
}
private void debugPacket() {
byte[] bytes = packetBuf.array();
StringBuilder printB = new StringBuilder();
for (byte b : bytes) {
if (Character.isLetterOrDigit((char) b & 0xFF)) {
char x = (char) b;
printB.append(x);
} else {
printB.append("0x" + Integer.toHexString(b & 0xFF));
}
printB.append(" ");
}
LOG.debug("debug packet {}", printB.toString().substring(0, 200));
}
private static boolean isNull(byte[] bitmap, int position) {
protected static boolean isNull(byte[] bitmap, int position) {
return (bitmap[position / 8] & (1 << (position & 7))) != 0;
}
// process COM_EXECUTE, parse binary row data
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_execute.html
private void handleExecute() {
// debugPacket();
packetBuf = packetBuf.order(ByteOrder.LITTLE_ENDIAN);
// parse stmt_id, flags, params
int stmtId = packetBuf.getInt();
// flag
packetBuf.get();
// iteration_count always 1,
packetBuf.getInt();
LOG.debug("execute prepared statement {}", stmtId);
PrepareStmtContext prepareCtx = ctx.getPreparedStmt(String.valueOf(stmtId));
if (prepareCtx == null) {
LOG.debug("No such statement in context, stmtId:{}", stmtId);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR,
"msg: Not supported such prepared statement");
return;
}
ctx.setStartTime();
if (prepareCtx.stmt.getInnerStmt() instanceof QueryStmt) {
ctx.getState().setIsQuery(true);
}
prepareCtx.stmt.setIsPrepared();
int paramCount = prepareCtx.stmt.getParmCount();
// null bitmap
byte[] nullbitmapData = new byte[(paramCount + 7) / 8];
packetBuf.get(nullbitmapData);
String stmtStr = "";
try {
// new_params_bind_flag
if ((int) packetBuf.get() != 0) {
// parse params's types
for (int i = 0; i < paramCount; ++i) {
int typeCode = packetBuf.getChar();
LOG.debug("code {}", typeCode);
prepareCtx.stmt.placeholders().get(i).setTypeCode(typeCode);
}
}
List<LiteralExpr> realValueExprs = new ArrayList<>();
// parse param data
for (int i = 0; i < paramCount; ++i) {
if (isNull(nullbitmapData, i)) {
realValueExprs.add(new NullLiteral());
continue;
}
LiteralExpr l = prepareCtx.stmt.placeholders().get(i).createLiteralFromType();
l.setupParamFromBinary(packetBuf);
realValueExprs.add(l);
}
ExecuteStmt executeStmt = new ExecuteStmt(String.valueOf(stmtId), realValueExprs);
// TODO set real origin statement
executeStmt.setOrigStmt(new OriginStatement("null", 0));
executeStmt.setUserInfo(ctx.getCurrentUserIdentity());
LOG.debug("executeStmt {}", executeStmt);
executor = new StmtExecutor(ctx, executeStmt);
ctx.setExecutor(executor);
executor.execute();
stmtStr = executeStmt.toSql();
} catch (Throwable e) {
// Catch all throwable.
// If reach here, maybe palo bug.
LOG.warn("Process one query failed because unknown reason: ", e);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
e.getClass().getSimpleName() + ", msg: " + e.getMessage());
}
auditAfterExec(stmtStr, prepareCtx.stmt.getInnerStmt(), null, false);
}
private void auditAfterExec(String origStmt, StatementBase parsedStmt,
Data.PQueryStatistics statistics, boolean printFuzzyVariables) {
protected void auditAfterExec(String origStmt, StatementBase parsedStmt,
Data.PQueryStatistics statistics, boolean printFuzzyVariables) {
AuditLogHelper.logAuditLog(ctx, origStmt, parsedStmt, statistics, printFuzzyVariables);
}
// Process COM_QUERY statement,
// only throw an exception when there is a problem interacting with the requesting client
private void handleQuery(MysqlCommand mysqlCommand) {
protected void handleQuery(MysqlCommand mysqlCommand, String originStmt) {
if (MetricRepo.isInit) {
MetricRepo.COUNTER_REQUEST_ALL.increase(1L);
}
// convert statement to Java string
byte[] bytes = packetBuf.array();
int ending = packetBuf.limit() - 1;
while (ending >= 1 && bytes[ending] == '\0') {
ending--;
}
String originStmt = new String(bytes, 1, ending, StandardCharsets.UTF_8);
String sqlHash = DigestUtils.md5Hex(originStmt);
ctx.setSqlHash(sqlHash);
ctx.getAuditEventBuilder().reset();
ctx.getAuditEventBuilder()
.setTimestamp(System.currentTimeMillis())
.setClientIp(ctx.getMysqlChannel().getRemoteHostPortString())
.setClientIp(ctx.getClientIP())
.setUser(ClusterNamespace.getNameFromFullName(ctx.getQualifiedUser()))
.setSqlHash(ctx.getSqlHash());
@ -356,10 +261,25 @@ public class ConnectProcessor {
try {
executor.execute();
if (i != stmts.size() - 1) {
ctx.getState().serverStatus |= MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS;
if (ctx.getState().getStateType() != MysqlStateType.ERR) {
finalizeCommand();
if (connectType.equals(ConnectType.MYSQL)) {
if (i != stmts.size() - 1) {
ctx.getState().serverStatus |= MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS;
if (ctx.getState().getStateType() != MysqlStateType.ERR) {
finalizeCommand();
}
}
} else if (connectType.equals(ConnectType.ARROW_FLIGHT_SQL)) {
if (!ctx.isReturnResultFromLocal()) {
returnResultFromRemoteExecutor.add(executor);
}
Preconditions.checkState(ctx.getFlightSqlChannel().resultNum() <= 1);
if (ctx.getFlightSqlChannel().resultNum() == 1 && i != stmts.size() - 1) {
String errMsg = "Only be one stmt that returns the result and it is at the end. stmts.size(): "
+ stmts.size();
LOG.warn(errMsg);
ctx.getState().setError(ErrorCode.ERR_ARROW_FLIGHT_SQL_MUST_ONLY_RESULT_STMT, errMsg);
ctx.getState().setErrType(QueryState.ErrType.OTHER_ERR);
break;
}
}
auditAfterExec(auditStmt, executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(), true);
@ -381,8 +301,8 @@ public class ConnectProcessor {
}
// Use a handler for exception to avoid big try catch block which is a little hard to understand
private void handleQueryException(Throwable throwable, String origStmt,
StatementBase parsedStmt, Data.PQueryStatistics statistics) {
protected void handleQueryException(Throwable throwable, String origStmt,
StatementBase parsedStmt, Data.PQueryStatistics statistics) {
if (ctx.getMinidump() != null) {
MinidumpUtils.saveMinidumpString(ctx.getMinidump(), DebugUtil.printId(ctx.queryId()));
}
@ -415,7 +335,7 @@ public class ConnectProcessor {
}
// analyze the origin stmt and return multi-statements
private List<StatementBase> parse(String originStmt) throws AnalysisException, DdlException {
protected List<StatementBase> parse(String originStmt) throws AnalysisException, DdlException {
LOG.debug("the originStmts are: {}", originStmt);
// Parse statement with parser generated by CUP&FLEX
SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
@ -443,9 +363,8 @@ public class ConnectProcessor {
// Get the column definitions of a table
@SuppressWarnings("rawtypes")
private void handleFieldList() throws IOException {
protected void handleFieldList(String tableName) {
// Already get command code.
String tableName = new String(MysqlProto.readNulTerminateString(packetBuf), StandardCharsets.UTF_8);
if (Strings.isNullOrEmpty(tableName)) {
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_TABLE, "Empty tableName");
return;
@ -463,18 +382,21 @@ public class ConnectProcessor {
table.readLock();
try {
MysqlChannel channel = ctx.getMysqlChannel();
MysqlSerializer serializer = channel.getSerializer();
if (connectType.equals(ConnectType.MYSQL)) {
MysqlChannel channel = ctx.getMysqlChannel();
MysqlSerializer serializer = channel.getSerializer();
// Send fields
// NOTE: Field list doesn't send number of fields
List<Column> baseSchema = table.getBaseSchema();
for (Column column : baseSchema) {
serializer.reset();
serializer.writeField(db.getFullName(), table.getName(), column, true);
channel.sendOnePacket(serializer.toByteBuffer());
// Send fields
// NOTE: Field list doesn't send number of fields
List<Column> baseSchema = table.getBaseSchema();
for (Column column : baseSchema) {
serializer.reset();
serializer.writeField(db.getFullName(), table.getName(), column, true);
channel.sendOnePacket(serializer.toByteBuffer());
}
} else if (connectType.equals(ConnectType.ARROW_FLIGHT_SQL)) {
// TODO
}
} catch (Throwable throwable) {
handleQueryException(throwable, "", null, null);
} finally {
@ -483,62 +405,9 @@ public class ConnectProcessor {
ctx.getState().setEof();
}
private void dispatch() throws IOException {
int code = packetBuf.get();
MysqlCommand command = MysqlCommand.fromCode(code);
if (command == null) {
ErrorReport.report(ErrorCode.ERR_UNKNOWN_COM_ERROR);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR, "Unknown command(" + code + ")");
LOG.warn("Unknown command(" + code + ")");
return;
}
LOG.debug("handle command {}", command);
ctx.setCommand(command);
ctx.setStartTime();
switch (command) {
case COM_INIT_DB:
handleInitDb();
break;
case COM_QUIT:
handleQuit();
break;
case COM_QUERY:
case COM_STMT_PREPARE:
ctx.initTracer("trace");
Span rootSpan = ctx.getTracer().spanBuilder("handleQuery").setNoParent().startSpan();
try (Scope scope = rootSpan.makeCurrent()) {
handleQuery(command);
} catch (Exception e) {
rootSpan.recordException(e);
throw e;
} finally {
rootSpan.end();
}
break;
case COM_STMT_EXECUTE:
handleExecute();
break;
case COM_FIELD_LIST:
handleFieldList();
break;
case COM_PING:
handlePing();
break;
case COM_STMT_RESET:
handleStmtReset();
break;
case COM_STMT_CLOSE:
handleStmtClose();
break;
default:
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR, "Unsupported command(" + command + ")");
LOG.warn("Unsupported command(" + command + ")");
break;
}
}
private ByteBuffer getResultPacket() {
// only Mysql protocol
protected ByteBuffer getResultPacket() {
Preconditions.checkState(connectType.equals(ConnectType.MYSQL));
MysqlPacket packet = ctx.getState().toResponsePacket();
if (packet == null) {
// possible two cases:
@ -555,7 +424,9 @@ public class ConnectProcessor {
// When any request is completed, it will generally need to send a response packet to the client
// This method is used to send a response packet to the client
private void finalizeCommand() throws IOException {
// only Mysql protocol
public void finalizeCommand() throws IOException {
Preconditions.checkState(connectType.equals(ConnectType.MYSQL));
ByteBuffer packet;
if (executor != null && executor.isForwardToMaster()
&& ctx.getState().getStateType() != QueryState.MysqlStateType.ERR) {
@ -736,47 +607,9 @@ public class ConnectProcessor {
return result;
}
// Process a MySQL request
public void processOnce() throws IOException {
// set status of query to OK.
ctx.getState().reset();
executor = null;
// reset sequence id of MySQL protocol
final MysqlChannel channel = ctx.getMysqlChannel();
channel.setSequenceId(0);
// read packet from channel
try {
packetBuf = channel.fetchOnePacket();
if (packetBuf == null) {
LOG.warn("Null packet received from network. remote: {}", channel.getRemoteHostPortString());
throw new IOException("Error happened when receiving packet.");
}
} catch (AsynchronousCloseException e) {
// when this happened, timeout checker close this channel
// killed flag in ctx has been already set, just return
return;
}
// dispatch
dispatch();
// finalize
finalizeCommand();
ctx.setCommand(MysqlCommand.COM_SLEEP);
}
public void loop() {
while (!ctx.isKilled()) {
try {
processOnce();
} catch (Exception e) {
// TODO(zhaochun): something wrong
LOG.warn("Exception happened in one session(" + ctx + ").", e);
ctx.setKilled();
break;
}
}
// only Mysql protocol
public void processOnce() throws IOException, NotImplementedException {
throw new NotImplementedException("Not Impl processOnce");
}
}

View File

@ -102,7 +102,7 @@ public class ConnectScheduler {
return false;
}
connectionMap.put(ctx.getConnectionId(), ctx);
if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT)) {
if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
flightToken2ConnectionId.put(ctx.getPeerIdentity(), ctx.getConnectionId());
}
return true;
@ -116,7 +116,7 @@ public class ConnectScheduler {
conns.decrementAndGet();
}
numberConnection.decrementAndGet();
if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT)) {
if (ctx.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
flightToken2ConnectionId.remove(ctx.getPeerIdentity());
}
}

View File

@ -19,7 +19,6 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
@ -68,6 +67,7 @@ import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
import org.apache.doris.proto.Types;
import org.apache.doris.proto.Types.PUniqueId;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.QueryStatisticsItem.FragmentInstanceInfo;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
@ -210,11 +210,6 @@ public class Coordinator implements CoordInterface {
private final List<BackendExecState> needCheckBackendExecStates = Lists.newArrayList();
private final List<PipelineExecContext> needCheckPipelineExecContexts = Lists.newArrayList();
private ResultReceiver receiver;
private TNetworkAddress resultFlightServerAddr;
private TNetworkAddress resultInternalServiceAddr;
private ArrayList<Expr> resultOutputExprs;
private TUniqueId finstId;
private final List<ScanNode> scanNodes;
private int scanRangeNum = 0;
// number of instances of this query, equals to
@ -283,22 +278,6 @@ public class Coordinator implements CoordInterface {
return executionProfile;
}
public TNetworkAddress getResultFlightServerAddr() {
return resultFlightServerAddr;
}
public TNetworkAddress getResultInternalServiceAddr() {
return resultInternalServiceAddr;
}
public ArrayList<Expr> getResultOutputExprs() {
return resultOutputExprs;
}
public TUniqueId getFinstId() {
return finstId;
}
// True if all scan node are ExternalScanNode.
private boolean isAllExternalScan = true;
@ -631,10 +610,14 @@ public class Coordinator implements CoordInterface {
TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
receiver = new ResultReceiver(queryId, topParams.instanceExecParams.get(0).instanceId,
addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), this.timeoutDeadline);
finstId = topParams.instanceExecParams.get(0).instanceId;
resultFlightServerAddr = toArrowFlightHost(execBeAddr);
resultInternalServiceAddr = toBrpcHost(execBeAddr);
resultOutputExprs = fragments.get(0).getOutputExprs();
if (!context.isReturnResultFromLocal()) {
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr));
context.setResultInternalServiceAddr(toBrpcHost(execBeAddr));
context.setResultOutputExprs(fragments.get(0).getOutputExprs());
}
if (LOG.isDebugEnabled()) {
LOG.debug("dispatch result sink of query {} to {}", DebugUtil.printId(queryId),
topParams.instanceExecParams.get(0).host);

View File

@ -0,0 +1,272 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.analysis.ExecuteStmt;
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.mysql.MysqlProto;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.AsynchronousCloseException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
/**
* Process one mysql connection, receive one packet, process, send one packet.
*/
public class MysqlConnectProcessor extends ConnectProcessor {
private static final Logger LOG = LogManager.getLogger(MysqlConnectProcessor.class);
private ByteBuffer packetBuf;
public MysqlConnectProcessor(ConnectContext context) {
super(context);
connectType = ConnectType.MYSQL;
}
// COM_INIT_DB: change current database of this session.
private void handleInitDb() {
String fullDbName = new String(packetBuf.array(), 1, packetBuf.limit() - 1);
handleInitDb(fullDbName);
}
private void handleStmtClose() {
packetBuf = packetBuf.order(ByteOrder.LITTLE_ENDIAN);
int stmtId = packetBuf.getInt();
handleStmtClose(stmtId);
}
private void debugPacket() {
byte[] bytes = packetBuf.array();
StringBuilder printB = new StringBuilder();
for (byte b : bytes) {
if (Character.isLetterOrDigit((char) b & 0xFF)) {
char x = (char) b;
printB.append(x);
} else {
printB.append("0x" + Integer.toHexString(b & 0xFF));
}
printB.append(" ");
}
LOG.debug("debug packet {}", printB.toString().substring(0, 200));
}
// process COM_EXECUTE, parse binary row data
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_execute.html
private void handleExecute() {
// debugPacket();
packetBuf = packetBuf.order(ByteOrder.LITTLE_ENDIAN);
// parse stmt_id, flags, params
int stmtId = packetBuf.getInt();
// flag
packetBuf.get();
// iteration_count always 1,
packetBuf.getInt();
LOG.debug("execute prepared statement {}", stmtId);
PrepareStmtContext prepareCtx = ctx.getPreparedStmt(String.valueOf(stmtId));
if (prepareCtx == null) {
LOG.debug("No such statement in context, stmtId:{}", stmtId);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR,
"msg: Not supported such prepared statement");
return;
}
ctx.setStartTime();
if (prepareCtx.stmt.getInnerStmt() instanceof QueryStmt) {
ctx.getState().setIsQuery(true);
}
prepareCtx.stmt.setIsPrepared();
int paramCount = prepareCtx.stmt.getParmCount();
// null bitmap
byte[] nullbitmapData = new byte[(paramCount + 7) / 8];
packetBuf.get(nullbitmapData);
String stmtStr = "";
try {
// new_params_bind_flag
if ((int) packetBuf.get() != 0) {
// parse params's types
for (int i = 0; i < paramCount; ++i) {
int typeCode = packetBuf.getChar();
LOG.debug("code {}", typeCode);
prepareCtx.stmt.placeholders().get(i).setTypeCode(typeCode);
}
}
List<LiteralExpr> realValueExprs = new ArrayList<>();
// parse param data
for (int i = 0; i < paramCount; ++i) {
if (isNull(nullbitmapData, i)) {
realValueExprs.add(new NullLiteral());
continue;
}
LiteralExpr l = prepareCtx.stmt.placeholders().get(i).createLiteralFromType();
l.setupParamFromBinary(packetBuf);
realValueExprs.add(l);
}
ExecuteStmt executeStmt = new ExecuteStmt(String.valueOf(stmtId), realValueExprs);
// TODO set real origin statement
executeStmt.setOrigStmt(new OriginStatement("null", 0));
executeStmt.setUserInfo(ctx.getCurrentUserIdentity());
LOG.debug("executeStmt {}", executeStmt);
executor = new StmtExecutor(ctx, executeStmt);
ctx.setExecutor(executor);
executor.execute();
stmtStr = executeStmt.toSql();
} catch (Throwable e) {
// Catch all throwable.
// If reach here, maybe palo bug.
LOG.warn("Process one query failed because unknown reason: ", e);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
e.getClass().getSimpleName() + ", msg: " + e.getMessage());
}
auditAfterExec(stmtStr, prepareCtx.stmt.getInnerStmt(), null, false);
}
// Process COM_QUERY statement,
private void handleQuery(MysqlCommand mysqlCommand) {
// convert statement to Java string
byte[] bytes = packetBuf.array();
int ending = packetBuf.limit() - 1;
while (ending >= 1 && bytes[ending] == '\0') {
ending--;
}
String originStmt = new String(bytes, 1, ending, StandardCharsets.UTF_8);
handleQuery(mysqlCommand, originStmt);
}
private void dispatch() throws IOException {
int code = packetBuf.get();
MysqlCommand command = MysqlCommand.fromCode(code);
if (command == null) {
ErrorReport.report(ErrorCode.ERR_UNKNOWN_COM_ERROR);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR, "Unknown command(" + code + ")");
LOG.warn("Unknown command(" + code + ")");
return;
}
LOG.debug("handle command {}", command);
ctx.setCommand(command);
ctx.setStartTime();
switch (command) {
case COM_INIT_DB:
handleInitDb();
break;
case COM_QUIT:
// COM_QUIT: set killed flag and then return OK packet.
handleQuit();
break;
case COM_QUERY:
case COM_STMT_PREPARE:
// Process COM_QUERY statement,
ctx.initTracer("trace");
Span rootSpan = ctx.getTracer().spanBuilder("handleQuery").setNoParent().startSpan();
try (Scope scope = rootSpan.makeCurrent()) {
handleQuery(command);
} catch (Exception e) {
rootSpan.recordException(e);
throw e;
} finally {
rootSpan.end();
}
break;
case COM_STMT_EXECUTE:
handleExecute();
break;
case COM_FIELD_LIST:
handleFieldList();
break;
case COM_PING:
// process COM_PING statement, do nothing, just return one OK packet.
handlePing();
break;
case COM_STMT_RESET:
handleStmtReset();
break;
case COM_STMT_CLOSE:
handleStmtClose();
break;
default:
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR, "Unsupported command(" + command + ")");
LOG.warn("Unsupported command(" + command + ")");
break;
}
}
private void handleFieldList() {
String tableName = new String(MysqlProto.readNulTerminateString(packetBuf), StandardCharsets.UTF_8);
handleFieldList(tableName);
}
// Process a MySQL request
public void processOnce() throws IOException {
// set status of query to OK.
ctx.getState().reset();
executor = null;
// reset sequence id of MySQL protocol
final MysqlChannel channel = ctx.getMysqlChannel();
channel.setSequenceId(0);
// read packet from channel
try {
packetBuf = channel.fetchOnePacket();
if (packetBuf == null) {
LOG.warn("Null packet received from network. remote: {}", channel.getRemoteHostPortString());
throw new IOException("Error happened when receiving packet.");
}
} catch (AsynchronousCloseException e) {
// when this happened, timeout checker close this channel
// killed flag in ctx has been already set, just return
return;
}
// dispatch
dispatch();
// finalize
finalizeCommand();
ctx.setCommand(MysqlCommand.COM_SLEEP);
}
public void loop() {
while (!ctx.isKilled()) {
try {
processOnce();
} catch (Exception e) {
// TODO(zhaochun): something wrong
LOG.warn("Exception happened in one session(" + ctx + ").", e);
ctx.setKilled();
break;
}
}
}
}

View File

@ -25,6 +25,7 @@ import org.apache.doris.mysql.MysqlPacket;
// query state used to record state of query, maybe query status is better
public class QueryState {
// Reused by arrow flight protocol
public enum MysqlStateType {
NOOP, // send nothing to remote
OK, // send OK packet to remote

View File

@ -136,6 +136,7 @@ import org.apache.doris.proto.InternalService.PGroupCommitInsertRequest;
import org.apache.doris.proto.InternalService.PGroupCommitInsertResponse;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
import org.apache.doris.qe.cache.CacheAnalyzer;
@ -147,7 +148,6 @@ import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.service.arrowflight.FlightStatementExecutor;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.InternalQueryBuffer;
import org.apache.doris.system.Backend;
@ -242,6 +242,7 @@ public class StmtExecutor {
// this constructor is mainly for proxy
public StmtExecutor(ConnectContext context, OriginStatement originStmt, boolean isProxy) {
Preconditions.checkState(context.getConnectType().equals(ConnectType.MYSQL));
this.context = context;
this.originStmt = originStmt;
this.serializer = context.getMysqlChannel().getSerializer();
@ -262,7 +263,11 @@ public class StmtExecutor {
this.context = ctx;
this.parsedStmt = parsedStmt;
this.originStmt = parsedStmt.getOrigStmt();
this.serializer = context.getMysqlChannel().getSerializer();
if (context.getConnectType() == ConnectType.MYSQL) {
this.serializer = context.getMysqlChannel().getSerializer();
} else {
this.serializer = null;
}
this.isProxy = false;
if (parsedStmt instanceof LogicalPlanAdapter) {
this.statementContext = ((LogicalPlanAdapter) parsedStmt).getStatementContext();
@ -428,7 +433,7 @@ public class StmtExecutor {
* isValuesOrConstantSelect: when this interface return true, original string is truncated at 1024
*
* @return parsed and analyzed statement for Stale planner.
* an unresolved LogicalPlan wrapped with a LogicalPlanAdapter for Nereids.
* an unresolved LogicalPlan wrapped with a LogicalPlanAdapter for Nereids.
*/
public StatementBase getParsedStmt() {
return parsedStmt;
@ -444,13 +449,16 @@ public class StmtExecutor {
public void execute(TUniqueId queryId) throws Exception {
SessionVariable sessionVariable = context.getSessionVariable();
Span executeSpan = context.getTracer().spanBuilder("execute").setParent(Context.current()).startSpan();
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
context.setReturnResultFromLocal(true);
}
try (Scope scope = executeSpan.makeCurrent()) {
if (parsedStmt instanceof LogicalPlanAdapter
|| (parsedStmt == null && sessionVariable.isEnableNereidsPlanner())) {
try {
executeByNereids(queryId);
} catch (NereidsException | ParseException e) {
if (context.getMinidump() != null) {
if (context.getMinidump() != null && context.getMinidump().toString(4) != null) {
MinidumpUtils.saveMinidumpString(context.getMinidump(), DebugUtil.printId(context.queryId()));
}
// try to fall back to legacy planner
@ -600,12 +608,23 @@ public class StmtExecutor {
}
if (statements.size() <= originStmt.idx) {
throw new ParseException("Nereids parse failed. Parser get " + statements.size() + " statements,"
+ " but we need at least " + originStmt.idx + " statements.");
+ " but we need at least " + originStmt.idx + " statements.");
}
parsedStmt = statements.get(originStmt.idx);
}
public void finalizeQuery() {
// The final profile report occurs after be returns the query data, and the profile cannot be
// received after unregisterQuery(), causing the instance profile to be lost, so we should wait
// for the profile before unregisterQuery().
updateProfile(true);
QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
}
private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
context.setReturnResultFromLocal(false);
}
// queue query here
syncJournalIfNeeded();
QueueOfferToken offerRet = null;
@ -631,7 +650,7 @@ public class StmtExecutor {
try {
for (int i = 0; i < retryTime; i++) {
try {
//reset query id for each retry
// reset query id for each retry
if (i > 0) {
UUID uuid = UUID.randomUUID();
TUniqueId newQueryId = new TUniqueId(uuid.getMostSignificantBits(),
@ -646,17 +665,15 @@ public class StmtExecutor {
if (i == retryTime - 1) {
throw e;
}
if (!context.getMysqlChannel().isSend()) {
if (context.getConnectType().equals(ConnectType.MYSQL) && !context.getMysqlChannel().isSend()) {
LOG.warn("retry {} times. stmt: {}", (i + 1), parsedStmt.getOrigStmt().originStmt);
} else {
throw e;
}
} finally {
// The final profile report occurs after be returns the query data, and the profile cannot be
// received after unregisterQuery(), causing the instance profile to be lost, so we should wait
// for the profile before unregisterQuery().
updateProfile(true);
QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
if (context.isReturnResultFromLocal()) {
finalizeQuery();
}
}
}
} finally {
@ -1361,9 +1378,11 @@ public class StmtExecutor {
// Process a select statement.
private void handleQueryStmt() throws Exception {
LOG.info("Handling query {} with query id {}",
originStmt.originStmt, DebugUtil.printId(context.queryId));
// Every time set no send flag and clean all data in buffer
context.getMysqlChannel().reset();
originStmt.originStmt, DebugUtil.printId(context.queryId));
if (context.getConnectType() == ConnectType.MYSQL) {
// Every time set no send flag and clean all data in buffer
context.getMysqlChannel().reset();
}
Queriable queryStmt = (Queriable) parsedStmt;
QueryDetail queryDetail = new QueryDetail(context.getStartTime(),
@ -1390,12 +1409,16 @@ public class StmtExecutor {
return;
}
MysqlChannel channel = context.getMysqlChannel();
MysqlChannel channel = null;
if (context.getConnectType().equals(ConnectType.MYSQL)) {
channel = context.getMysqlChannel();
}
boolean isOutfileQuery = queryStmt.hasOutFileClause();
// Sql and PartitionCache
CacheAnalyzer cacheAnalyzer = new CacheAnalyzer(context, parsedStmt, planner);
if (cacheAnalyzer.enableCache() && !isOutfileQuery
// TODO support arrow flight sql
if (channel != null && cacheAnalyzer.enableCache() && !isOutfileQuery
&& context.getSessionVariable().getSqlSelectLimit() < 0
&& context.getSessionVariable().getDefaultOrderByLimit() < 0) {
if (queryStmt instanceof QueryStmt || queryStmt instanceof LogicalPlanAdapter) {
@ -1406,7 +1429,8 @@ public class StmtExecutor {
}
// handle select .. from xx limit 0
if (parsedStmt instanceof SelectStmt) {
// TODO support arrow flight sql
if (channel != null && parsedStmt instanceof SelectStmt) {
SelectStmt parsedSelectStmt = (SelectStmt) parsedStmt;
if (parsedSelectStmt.getLimit() == 0) {
LOG.info("ignore handle limit 0 ,sql:{}", parsedSelectStmt.toSql());
@ -1469,6 +1493,22 @@ public class StmtExecutor {
}
}
if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
Preconditions.checkState(!context.isReturnResultFromLocal());
profile.getSummaryProfile().setTempStartTime();
if (coordBase.getInstanceTotalNum() > 1 && LOG.isDebugEnabled()) {
try {
LOG.debug("Finish to execute fragment. user: {}, db: {}, sql: {}, fragment instance num: {}",
context.getQualifiedUser(), context.getDatabase(),
parsedStmt.getOrigStmt().originStmt.replace("\n", " "),
coordBase.getInstanceTotalNum());
} catch (Exception e) {
LOG.warn("Fail to print fragment concurrency for Query.", e);
}
}
return;
}
Span fetchResultSpan = context.getTracer().spanBuilder("fetch result").setParent(Context.current()).startSpan();
try (Scope scope = fetchResultSpan.makeCurrent()) {
while (true) {
@ -1573,8 +1613,10 @@ public class StmtExecutor {
}
private void handleTransactionStmt() throws Exception {
// Every time set no send flag and clean all data in buffer
context.getMysqlChannel().reset();
if (context.getConnectType() == ConnectType.MYSQL) {
// Every time set no send flag and clean all data in buffer
context.getMysqlChannel().reset();
}
context.getState().setOk(0, 0, "");
// create plan
if (context.getTxnEntry() != null && context.getTxnEntry().getRowsInTransaction() == 0
@ -1774,8 +1816,8 @@ public class StmtExecutor {
// Process an insert statement.
private void handleInsertStmt() throws Exception {
// Every time set no send flag and clean all data in buffer
if (context.getMysqlChannel() != null) {
if (context.getConnectType() == ConnectType.MYSQL) {
// Every time set no send flag and clean all data in buffer
context.getMysqlChannel().reset();
}
InsertStmt insertStmt = (InsertStmt) parsedStmt;
@ -1989,8 +2031,7 @@ public class StmtExecutor {
*/
throwable = t;
} finally {
updateProfile(true);
QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId());
finalizeQuery();
}
// Go here, which means:
@ -2059,7 +2100,9 @@ public class StmtExecutor {
}
private void handleUnsupportedStmt() {
context.getMysqlChannel().reset();
if (context.getConnectType() == ConnectType.MYSQL) {
context.getMysqlChannel().reset();
}
// do nothing
context.getState().setOk();
}
@ -2084,10 +2127,10 @@ public class StmtExecutor {
private void handlePrepareStmt() throws Exception {
// register prepareStmt
LOG.debug("add prepared statement {}, isBinaryProtocol {}",
prepareStmt.getName(), prepareStmt.isBinaryProtocol());
prepareStmt.getName(), prepareStmt.isBinaryProtocol());
context.addPreparedStmt(prepareStmt.getName(),
new PrepareStmtContext(prepareStmt,
context, planner, analyzer, prepareStmt.getName()));
context, planner, analyzer, prepareStmt.getName()));
if (prepareStmt.isBinaryProtocol()) {
sendStmtPrepareOK();
}
@ -2114,6 +2157,7 @@ public class StmtExecutor {
}
private void sendMetaData(ResultSetMetaData metaData) throws IOException {
Preconditions.checkState(context.getConnectType() == ConnectType.MYSQL);
// sends how many columns
serializer.reset();
serializer.writeVInt(metaData.getColumnCount());
@ -2137,6 +2181,7 @@ public class StmtExecutor {
}
private void sendStmtPrepareOK() throws IOException {
Preconditions.checkState(context.getConnectType() == ConnectType.MYSQL);
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_prepare.html#sect_protocol_com_stmt_prepare_response
serializer.reset();
// 0x00 OK
@ -2174,6 +2219,7 @@ public class StmtExecutor {
}
private void sendFields(List<String> colNames, List<Type> types) throws IOException {
Preconditions.checkState(context.getConnectType() == ConnectType.MYSQL);
// sends how many columns
serializer.reset();
serializer.writeVInt(colNames.size());
@ -2205,24 +2251,33 @@ public class StmtExecutor {
}
public void sendResultSet(ResultSet resultSet) throws IOException {
context.updateReturnRows(resultSet.getResultRows().size());
// Send meta data.
sendMetaData(resultSet.getMetaData());
if (context.getConnectType().equals(ConnectType.MYSQL)) {
context.updateReturnRows(resultSet.getResultRows().size());
// Send meta data.
sendMetaData(resultSet.getMetaData());
// Send result set.
for (List<String> row : resultSet.getResultRows()) {
serializer.reset();
for (String item : row) {
if (item == null || item.equals(FeConstants.null_string)) {
serializer.writeNull();
} else {
serializer.writeLenEncodedString(item);
// Send result set.
for (List<String> row : resultSet.getResultRows()) {
serializer.reset();
for (String item : row) {
if (item == null || item.equals(FeConstants.null_string)) {
serializer.writeNull();
} else {
serializer.writeLenEncodedString(item);
}
}
context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
}
context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
}
context.getState().setEof();
context.getState().setEof();
} else if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
context.updateReturnRows(resultSet.getResultRows().size());
context.getFlightSqlChannel()
.addResult(DebugUtil.printId(context.queryId()), context.getRunningQuery(), resultSet);
context.getState().setEof();
} else {
LOG.error("sendResultSet error connect type");
}
}
// Process show statement
@ -2248,6 +2303,7 @@ public class StmtExecutor {
}
public void handleExplainStmt(String result, boolean isNereids) throws IOException {
// TODO support arrow flight sql
ShowResultSetMetaData metaData = ShowResultSetMetaData.builder()
.addColumn(new Column("Explain String" + (isNereids ? "(Nereids Planner)" : "(Old Planner)"),
ScalarType.createVarchar(20)))
@ -2686,64 +2742,6 @@ public class StmtExecutor {
}
}
public void executeArrowFlightQuery(FlightStatementExecutor flightStatementExecutor) {
LOG.debug("ARROW FLIGHT QUERY: " + originStmt.toString());
try {
try {
if (ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) {
try {
parseByNereids();
Preconditions.checkState(parsedStmt instanceof LogicalPlanAdapter,
"Nereids only process LogicalPlanAdapter,"
+ " but parsedStmt is " + parsedStmt.getClass().getName());
context.getState().setNereids(true);
context.getState().setIsQuery(true);
planner = new NereidsPlanner(statementContext);
planner.plan(parsedStmt, context.getSessionVariable().toThrift());
} catch (Exception e) {
LOG.warn("fall back to legacy planner, because: {}", e.getMessage(), e);
parsedStmt = null;
context.getState().setNereids(false);
analyzer = new Analyzer(context.getEnv(), context);
analyze(context.getSessionVariable().toThrift());
}
} else {
analyzer = new Analyzer(context.getEnv(), context);
analyze(context.getSessionVariable().toThrift());
}
} catch (Exception e) {
throw new RuntimeException("Failed to execute Arrow Flight SQL. " + Util.getRootCauseMessage(e), e);
}
coord = new Coordinator(context, analyzer, planner, context.getStatsErrorEstimator());
profile.addExecutionProfile(coord.getExecutionProfile());
try {
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(),
new QeProcessorImpl.QueryInfo(context, originStmt.originStmt, coord));
} catch (UserException e) {
throw new RuntimeException("Failed to execute Arrow Flight SQL. " + Util.getRootCauseMessage(e), e);
}
Span queryScheduleSpan = context.getTracer()
.spanBuilder("Arrow Flight SQL schedule").setParent(Context.current()).startSpan();
try (Scope scope = queryScheduleSpan.makeCurrent()) {
coord.exec();
} catch (Exception e) {
queryScheduleSpan.recordException(e);
LOG.warn("Failed to coord exec Arrow Flight SQL, because: {}", e.getMessage(), e);
throw new RuntimeException(e.getMessage() + Util.getRootCauseMessage(e), e);
} finally {
queryScheduleSpan.end();
}
} finally {
QeProcessorImpl.INSTANCE.unregisterQuery(context.queryId()); // TODO for query profile
}
flightStatementExecutor.setFinstId(coord.getFinstId());
flightStatementExecutor.setResultFlightServerAddr(coord.getResultFlightServerAddr());
flightStatementExecutor.setResultInternalServiceAddr(coord.getResultInternalServiceAddr());
flightStatementExecutor.setResultOutputExprs(coord.getResultOutputExprs());
}
private List<ResultRow> convertResultBatchToResultRows(TResultBatch batch) {
List<String> columns = parsedStmt.getColLabels();
List<ResultRow> resultRows = new ArrayList<>();

View File

@ -83,15 +83,18 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.DdlExecutor;
import org.apache.doris.qe.MasterCatalogExecutor;
import org.apache.doris.qe.MysqlConnectProcessor;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.service.arrowflight.FlightSqlConnectProcessor;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.StatisticsCacheKey;
@ -1104,7 +1107,16 @@ public class FrontendServiceImpl implements FrontendService.Iface {
ConnectContext context = new ConnectContext();
// Set current connected FE to the client address, so that we can know where this request come from.
context.setCurrentConnectedFEIp(params.getClientNodeHost());
ConnectProcessor processor = new ConnectProcessor(context);
ConnectProcessor processor = null;
if (context.getConnectType().equals(ConnectType.MYSQL)) {
processor = new MysqlConnectProcessor(context);
} else if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
processor = new FlightSqlConnectProcessor(context);
} else {
throw new TException("unknown ConnectType: " + context.getConnectType());
}
TMasterOpResult result = processor.proxyExecute(params);
if (QueryState.MysqlStateType.ERR.name().equalsIgnoreCase(result.getStatus())) {
context.getState().setError(result.getStatus());

View File

@ -24,8 +24,11 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry;
import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;
import com.google.common.base.Preconditions;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
@ -63,12 +66,15 @@ import org.apache.arrow.flight.sql.impl.FlightSql.TicketStatementQuery;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable {
private static final Logger LOG = LogManager.getLogger(DorisFlightSqlProducer.class);
@ -111,33 +117,72 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
ConnectContext connectContext = null;
try {
connectContext = flightSessionsManager.getConnectContext(context.peerIdentity());
// Only for ConnectContext check timeout.
connectContext.setCommand(MysqlCommand.COM_QUERY);
// After the previous query was executed, there was no getStreamStatement to take away the result.
connectContext.getFlightSqlChannel().reset();
final String query = request.getQuery();
final FlightStatementExecutor flightStatementExecutor = new FlightStatementExecutor(query, connectContext);
final FlightSqlConnectProcessor flightSQLConnectProcessor = new FlightSqlConnectProcessor(connectContext);
flightStatementExecutor.executeQuery();
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder()
.setStatementHandle(ByteString.copyFromUtf8(
DebugUtil.printId(flightStatementExecutor.getFinstId()) + ":" + query)).build();
final Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
// TODO Support multiple endpoints.
Location location = Location.forGrpcInsecure(flightStatementExecutor.getResultFlightServerAddr().hostname,
flightStatementExecutor.getResultFlightServerAddr().port);
List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location));
Schema schema;
schema = flightStatementExecutor.fetchArrowFlightSchema(5000);
if (schema == null) {
throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null").toRuntimeException();
flightSQLConnectProcessor.handleQuery(query);
if (connectContext.getState().getStateType() == MysqlStateType.ERR) {
throw new RuntimeException("after handleQuery");
}
if (connectContext.isReturnResultFromLocal()) {
// set/use etc. stmt returns an OK result by default.
if (connectContext.getFlightSqlChannel().resultNum() == 0) {
// a random query id and add empty results
String queryId = UUID.randomUUID().toString();
connectContext.getFlightSqlChannel().addEmptyResult(queryId, query);
final ByteString handle = ByteString.copyFromUtf8(context.peerIdentity() + ":" + queryId);
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
return getFlightInfoForSchema(ticketStatement, descriptor,
connectContext.getFlightSqlChannel().getResult(queryId).getVectorSchemaRoot().getSchema());
}
// A Flight Sql request can only contain one statement that returns result,
// otherwise expected thrown exception during execution.
Preconditions.checkState(connectContext.getFlightSqlChannel().resultNum() == 1);
// The tokens used for authentication between getStreamStatement and getFlightInfoStatement
// are different. So put the peerIdentity into the ticket and then getStreamStatement is used to find
// the correct ConnectContext.
// queryId is used to find query results.
final ByteString handle = ByteString.copyFromUtf8(
context.peerIdentity() + ":" + DebugUtil.printId(connectContext.queryId()));
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
return getFlightInfoForSchema(ticketStatement, descriptor,
connectContext.getFlightSqlChannel().getResult(DebugUtil.printId(connectContext.queryId()))
.getVectorSchemaRoot().getSchema());
} else {
// Now only query stmt will pull results from BE.
final ByteString handle = ByteString.copyFromUtf8(
DebugUtil.printId(connectContext.getFinstId()) + ":" + query);
Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (schema == null) {
throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null").toRuntimeException();
}
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
// TODO Support multiple endpoints.
Location location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
connectContext.getResultFlightServerAddr().port);
List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location));
// TODO Set in BE callback after query end, Client will not callback.
connectContext.setCommand(MysqlCommand.COM_SLEEP);
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
}
// TODO Set in BE callback after query end, Client client will not callback by default.
connectContext.setCommand(MysqlCommand.COM_SLEEP);
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
} catch (Exception e) {
if (null != connectContext) {
connectContext.setCommand(MysqlCommand.COM_SLEEP);
String errMsg = "get flight info statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(
e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: "
+ connectContext.getState().getErrorMessage();
LOG.warn(errMsg, e);
throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
}
LOG.warn("get flight info statement failed, " + e.getMessage(), e);
throw CallStatus.INTERNAL.withDescription(Util.getRootCauseMessage(e)).withCause(e).toRuntimeException();
@ -146,8 +191,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
@Override
public FlightInfo getFlightInfoPreparedStatement(final CommandPreparedStatementQuery command,
final CallContext context,
final FlightDescriptor descriptor) {
final CallContext context, final FlightDescriptor descriptor) {
throw CallStatus.UNIMPLEMENTED.withDescription("getFlightInfoPreparedStatement unimplemented")
.toRuntimeException();
}
@ -158,6 +202,42 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
throw CallStatus.UNIMPLEMENTED.withDescription("getSchemaStatement unimplemented").toRuntimeException();
}
@Override
public void getStreamStatement(final TicketStatementQuery ticketStatementQuery, final CallContext context,
final ServerStreamListener listener) {
ConnectContext connectContext = null;
final String handle = ticketStatementQuery.getStatementHandle().toStringUtf8();
String[] handleParts = handle.split(":");
String executedPeerIdentity = handleParts[0];
String queryId = handleParts[1];
try {
// The tokens used for authentication between getStreamStatement and getFlightInfoStatement are different.
connectContext = flightSessionsManager.getConnectContext(executedPeerIdentity);
final FlightSqlResultCacheEntry flightSqlResultCacheEntry = Objects.requireNonNull(
connectContext.getFlightSqlChannel().getResult(queryId));
final VectorSchemaRoot vectorSchemaRoot = flightSqlResultCacheEntry.getVectorSchemaRoot();
listener.start(vectorSchemaRoot);
listener.putNext();
} catch (Exception e) {
listener.error(e);
if (null != connectContext) {
String errMsg = "get stream statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e)
+ ", error code: " + connectContext.getState().getErrorCode() + ", error msg: "
+ connectContext.getState().getErrorMessage();
LOG.warn(errMsg, e);
throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
}
LOG.warn("get stream statement failed, " + e.getMessage(), e);
throw CallStatus.INTERNAL.withDescription(Util.getRootCauseMessage(e)).withCause(e).toRuntimeException();
} finally {
listener.completed();
if (null != connectContext) {
// The result has been sent, delete it.
connectContext.getFlightSqlChannel().invalidate(queryId);
}
}
}
@Override
public void close() throws Exception {
AutoCloseables.close(rootAllocator);
@ -180,8 +260,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
}
@Override
public Runnable acceptPutStatement(CommandStatementUpdate command,
CallContext context, FlightStream flightStream,
public Runnable acceptPutStatement(CommandStatementUpdate command, CallContext context, FlightStream flightStream,
StreamListener<PutResult> ackStream) {
throw CallStatus.UNIMPLEMENTED.withDescription("acceptPutStatement unimplemented").toRuntimeException();
}
@ -219,8 +298,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
}
@Override
public void getStreamTypeInfo(CommandGetXdbcTypeInfo request, CallContext context,
ServerStreamListener listener) {
public void getStreamTypeInfo(CommandGetXdbcTypeInfo request, CallContext context, ServerStreamListener listener) {
throw CallStatus.UNIMPLEMENTED.withDescription("getStreamTypeInfo unimplemented").toRuntimeException();
}
@ -323,12 +401,6 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
throw CallStatus.UNIMPLEMENTED.withDescription("getStreamCrossReference unimplemented").toRuntimeException();
}
@Override
public void getStreamStatement(final TicketStatementQuery ticketStatementQuery, final CallContext context,
final ServerStreamListener listener) {
throw CallStatus.UNIMPLEMENTED.withDescription("getStreamStatement unimplemented").toRuntimeException();
}
private <T extends Message> FlightInfo getFlightInfoForSchema(final T request, final FlightDescriptor descriptor,
final Schema schema) {
final Ticket ticket = new Ticket(Any.pack(request).toByteArray());

View File

@ -14,18 +14,19 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/arrow/blob/main/java/flight/flight-sql/src/test/java/org/apache/arrow/flight/sql/example/StatementContext.java
// and modified by Doris
package org.apache.doris.service.arrowflight;
import org.apache.doris.analysis.Expr;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
@ -33,112 +34,84 @@ import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public final class FlightStatementExecutor implements AutoCloseable {
private ConnectContext connectContext;
private final String query;
private TUniqueId queryId;
private TUniqueId finstId;
private TNetworkAddress resultFlightServerAddr;
private TNetworkAddress resultInternalServiceAddr;
private ArrayList<Expr> resultOutputExprs;
/**
* Process one flgiht sql connection.
*/
public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoCloseable {
private static final Logger LOG = LogManager.getLogger(FlightSqlConnectProcessor.class);
public FlightStatementExecutor(final String query, ConnectContext connectContext) {
this.query = query;
this.connectContext = connectContext;
connectContext.setThreadLocalInfo();
public FlightSqlConnectProcessor(ConnectContext context) {
super(context);
connectType = ConnectType.ARROW_FLIGHT_SQL;
context.setThreadLocalInfo();
context.setReturnResultFromLocal(true);
}
public void setQueryId(TUniqueId queryId) {
this.queryId = queryId;
}
public void prepare(MysqlCommand command) {
// set status of query to OK.
ctx.getState().reset();
executor = null;
public void setFinstId(TUniqueId finstId) {
this.finstId = finstId;
}
public void setResultFlightServerAddr(TNetworkAddress resultFlightServerAddr) {
this.resultFlightServerAddr = resultFlightServerAddr;
}
public void setResultInternalServiceAddr(TNetworkAddress resultInternalServiceAddr) {
this.resultInternalServiceAddr = resultInternalServiceAddr;
}
public void setResultOutputExprs(ArrayList<Expr> resultOutputExprs) {
this.resultOutputExprs = resultOutputExprs;
}
public String getQuery() {
return query;
}
public TUniqueId getQueryId() {
return queryId;
}
public TUniqueId getFinstId() {
return finstId;
}
public TNetworkAddress getResultFlightServerAddr() {
return resultFlightServerAddr;
}
public TNetworkAddress getResultInternalServiceAddr() {
return resultInternalServiceAddr;
}
public ArrayList<Expr> getResultOutputExprs() {
return resultOutputExprs;
}
@Override
public boolean equals(final Object other) {
if (!(other instanceof FlightStatementExecutor)) {
return false;
if (command == null) {
ErrorReport.report(ErrorCode.ERR_UNKNOWN_COM_ERROR);
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR, "Unknown command(" + command.toString() + ")");
LOG.warn("Unknown command(" + command + ")");
return;
}
return this == other;
LOG.debug("arrow flight sql handle command {}", command);
ctx.setCommand(command);
ctx.setStartTime();
}
@Override
public int hashCode() {
return Objects.hash(this);
}
public void handleQuery(String query) {
MysqlCommand command = MysqlCommand.COM_QUERY;
prepare(command);
public void executeQuery() {
try {
UUID uuid = UUID.randomUUID();
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
setQueryId(queryId);
connectContext.setQueryId(queryId);
StmtExecutor stmtExecutor = new StmtExecutor(connectContext, getQuery());
connectContext.setExecutor(stmtExecutor);
stmtExecutor.executeArrowFlightQuery(this);
ctx.setRunningQuery(query);
ctx.initTracer("trace");
Span rootSpan = ctx.getTracer().spanBuilder("handleQuery").setNoParent().startSpan();
try (Scope scope = rootSpan.makeCurrent()) {
handleQuery(command, query);
} catch (Exception e) {
throw new RuntimeException("Failed to coord exec", e);
rootSpan.recordException(e);
throw e;
} finally {
rootSpan.end();
}
}
// TODO
// private void handleInitDb() {
// handleInitDb(fullDbName);
// }
// TODO
// private void handleFieldList() {
// handleFieldList(tableName);
// }
public Schema fetchArrowFlightSchema(int timeoutMs) {
TNetworkAddress address = getResultInternalServiceAddr();
TUniqueId tid = getFinstId();
ArrayList<Expr> resultOutputExprs = getResultOutputExprs();
TNetworkAddress address = ctx.getResultInternalServiceAddr();
TUniqueId tid = ctx.getFinstId();
ArrayList<Expr> resultOutputExprs = ctx.getResultOutputExprs();
Types.PUniqueId finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
try {
InternalService.PFetchArrowFlightSchemaRequest request =
@ -156,7 +129,7 @@ public final class FlightStatementExecutor implements AutoCloseable {
}
TStatusCode code = TStatusCode.findByValue(pResult.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
Status status = null;
Status status = new Status();
status.setPstatus(pResult.getStatus());
throw new RuntimeException(String.format("fetch arrow flight schema failed, finstId: %s, errmsg: %s",
DebugUtil.printId(tid), status));
@ -204,6 +177,14 @@ public final class FlightStatementExecutor implements AutoCloseable {
@Override
public void close() throws Exception {
ctx.setCommand(MysqlCommand.COM_SLEEP);
// TODO support query profile
for (StmtExecutor asynExecutor : returnResultFromRemoteExecutor) {
asynExecutor.finalizeQuery();
}
returnResultFromRemoteExecutor.clear();
ConnectContext.remove();
}
}

View File

@ -0,0 +1,147 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.service.arrowflight.results;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.FeConstants;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.ResultSetMetaData;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.ArrowType.Utf8;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.jetbrains.annotations.NotNull;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class FlightSqlChannel {
private final Cache<String, FlightSqlResultCacheEntry> resultCache;
private final BufferAllocator allocator;
public FlightSqlChannel() {
// The Stmt result is not picked up by the Client within 10 minutes and will be deleted.
resultCache =
CacheBuilder.newBuilder()
.maximumSize(100)
.expireAfterWrite(10, TimeUnit.MINUTES)
.removalListener(new ResultRemovalListener())
.build();
allocator = new RootAllocator(Long.MAX_VALUE);
}
// TODO
public String getRemoteIp() {
return "0.0.0.0";
}
// TODO
public String getRemoteHostPortString() {
return "0.0.0.0:0";
}
public void addResult(String queryId, String runningQuery, ResultSet resultSet) {
List<Field> schemaFields = new ArrayList<>();
List<FieldVector> dataFields = new ArrayList<>();
List<List<String>> resultData = resultSet.getResultRows();
ResultSetMetaData metaData = resultSet.getMetaData();
// TODO: only support varchar type
for (Column col : metaData.getColumns()) {
schemaFields.add(new Field(col.getName(), FieldType.nullable(new Utf8()), null));
VarCharVector varCharVector = new VarCharVector(col.getName(), allocator);
varCharVector.allocateNew();
varCharVector.setValueCount(resultData.size());
dataFields.add(varCharVector);
}
for (int i = 0; i < resultData.size(); i++) {
List<String> row = resultData.get(i);
for (int j = 0; j < row.size(); j++) {
String item = row.get(j);
if (item == null || item.equals(FeConstants.null_string)) {
dataFields.get(j).setNull(i);
} else {
((VarCharVector) dataFields.get(j)).setSafe(i, item.getBytes());
}
}
}
VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(schemaFields, dataFields);
final FlightSqlResultCacheEntry flightSqlResultCacheEntry = new FlightSqlResultCacheEntry(vectorSchemaRoot,
runningQuery);
resultCache.put(queryId, flightSqlResultCacheEntry);
}
public void addEmptyResult(String queryId, String query) {
List<Field> schemaFields = new ArrayList<>();
List<FieldVector> dataFields = new ArrayList<>();
schemaFields.add(new Field("StatusResult", FieldType.nullable(new Utf8()), null));
VarCharVector varCharVector = new VarCharVector("StatusResult", allocator);
varCharVector.allocateNew();
varCharVector.setValueCount(1);
varCharVector.setSafe(0, "OK".getBytes());
dataFields.add(varCharVector);
VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(schemaFields, dataFields);
final FlightSqlResultCacheEntry flightSqlResultCacheEntry = new FlightSqlResultCacheEntry(vectorSchemaRoot,
query);
resultCache.put(queryId, flightSqlResultCacheEntry);
}
public FlightSqlResultCacheEntry getResult(String queryId) {
return resultCache.getIfPresent(queryId);
}
public void invalidate(String handle) {
resultCache.invalidate(handle);
}
public long resultNum() {
return resultCache.size();
}
public void reset() {
resultCache.invalidateAll();
}
public void close() {
reset();
}
private static class ResultRemovalListener implements RemovalListener<String, FlightSqlResultCacheEntry> {
@Override
public void onRemoval(@NotNull final RemovalNotification<String, FlightSqlResultCacheEntry> notification) {
try {
AutoCloseables.close(notification.getValue());
} catch (final Exception e) {
// swallow
}
}
}
}

View File

@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.service.arrowflight.results;
import org.apache.arrow.vector.VectorSchemaRoot;
import java.util.Objects;
public final class FlightSqlResultCacheEntry implements AutoCloseable {
private final VectorSchemaRoot vectorSchemaRoot;
private final String query;
public FlightSqlResultCacheEntry(final VectorSchemaRoot vectorSchemaRoot, final String query) {
this.vectorSchemaRoot = Objects.requireNonNull(vectorSchemaRoot, "result cannot be null.");
this.query = query;
}
public VectorSchemaRoot getVectorSchemaRoot() {
return vectorSchemaRoot;
}
public String getQuery() {
return query;
}
@Override
public void close() throws Exception {
vectorSchemaRoot.clear();
}
@Override
public boolean equals(final Object other) {
if (this == other) {
return true;
}
if (!(other instanceof VectorSchemaRoot)) {
return false;
}
final VectorSchemaRoot that = (VectorSchemaRoot) other;
return vectorSchemaRoot.equals(that);
}
@Override
public int hashCode() {
return Objects.hash(vectorSchemaRoot);
}
}

View File

@ -49,8 +49,8 @@ public interface FlightSessionsManager {
*/
ConnectContext createConnectContext(String peerIdentity);
public static ConnectContext buildConnectContext(String peerIdentity, UserIdentity userIdentity, String remoteIP) {
ConnectContext connectContext = new ConnectContext(peerIdentity);
static ConnectContext buildConnectContext(String peerIdentity, UserIdentity userIdentity, String remoteIP) {
ConnectContext connectContext = new FlightSqlConnectContext(peerIdentity);
connectContext.setEnv(Env.getCurrentEnv());
connectContext.setStartTime();
connectContext.setCluster(SystemInfoService.DEFAULT_CLUSTER);

View File

@ -58,6 +58,7 @@ public class FlightSessionsWithTokenManager implements FlightSessionsManager {
if (flightTokenDetails.getCreatedSession()) {
return null;
}
flightTokenDetails.setCreatedSession(true);
return FlightSessionsManager.buildConnectContext(peerIdentity, flightTokenDetails.getUserIdentity(),
flightTokenDetails.getRemoteIp());
} catch (IllegalArgumentException e) {

View File

@ -0,0 +1,104 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.service.arrowflight.sessions;
import org.apache.doris.mysql.MysqlChannel;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
import org.apache.doris.thrift.TResultSinkType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
public class FlightSqlConnectContext extends ConnectContext {
private static final Logger LOG = LogManager.getLogger(FlightSqlConnectContext.class);
protected volatile FlightSqlChannel flightSqlChannel;
public FlightSqlConnectContext(String peerIdentity) {
this.connectType = ConnectType.ARROW_FLIGHT_SQL;
this.peerIdentity = peerIdentity;
mysqlChannel = null; // Use of MysqlChannel is not expected
flightSqlChannel = new FlightSqlChannel();
setResultSinkType(TResultSinkType.ARROW_FLIGHT_PROTOCAL);
init();
}
@Override
public FlightSqlChannel getFlightSqlChannel() {
return flightSqlChannel;
}
@Override
public MysqlChannel getMysqlChannel() {
throw new RuntimeException("getMysqlChannel not in mysql connection");
}
@Override
public String getClientIP() {
return flightSqlChannel.getRemoteHostPortString();
}
@Override
protected void closeChannel() {
if (flightSqlChannel != null) {
flightSqlChannel.close();
}
}
// kill operation with no protect.
@Override
public void kill(boolean killConnection) {
LOG.warn("kill query from {}, kill flight sql connection: {}", getRemoteHostPortString(), killConnection);
if (killConnection) {
isKilled = true;
closeChannel();
connectScheduler.unregisterConnection(this);
}
// Now, cancel running query.
cancelQuery();
}
@Override
public String getRemoteHostPortString() {
return getFlightSqlChannel().getRemoteHostPortString();
}
@Override
public void startAcceptQuery(ConnectProcessor connectProcessor) {
throw new RuntimeException("Flight Sql Not impl startAcceptQuery");
}
@Override
public void suspendAcceptQuery() {
throw new RuntimeException("Flight Sql Not impl suspendAcceptQuery");
}
@Override
public void resumeAcceptQuery() {
throw new RuntimeException("Flight Sql Not impl resumeAcceptQuery");
}
@Override
public void stopAcceptQuery() throws IOException {
throw new RuntimeException("Flight Sql Not impl stopAcceptQuery");
}
}