[feature-wip](arrow-flight)(step5) Support JDBC and PreparedStatement and Fix Bug (#27661)

This commit is contained in:
Xinyi Zou
2023-11-29 21:17:20 +08:00
committed by GitHub
parent 19ecb3a8a2
commit d96e2dfefb
18 changed files with 373 additions and 127 deletions

View File

@ -268,7 +268,6 @@ public class DistributedPlanner {
mergePlan.init(ctx.getRootAnalyzer());
Preconditions.checkState(mergePlan.hasValidStats());
PlanFragment fragment = new PlanFragment(ctx.getNextFragmentId(), mergePlan, DataPartition.UNPARTITIONED);
fragment.setResultSinkType(ctx.getRootAnalyzer().getContext().getResultSinkType());
inputFragment.setDestination(mergePlan);
return fragment;
}

View File

@ -257,6 +257,7 @@ public class OriginalPlanner extends Planner {
LOG.debug("substitute result Exprs {}", resExprs);
rootFragment.setOutputExprs(resExprs);
}
rootFragment.setResultSinkType(ConnectContext.get().getResultSinkType());
LOG.debug("finalize plan fragments");
for (PlanFragment fragment : fragments) {
fragment.finalize(queryStmt);

View File

@ -112,6 +112,7 @@ public class ConnectContext {
protected volatile long loginTime;
// for arrow flight
protected volatile String peerIdentity;
private final Map<String, String> preparedQuerys = new HashMap<>();
private String runningQuery;
private TNetworkAddress resultFlightServerAddr;
private TNetworkAddress resultInternalServiceAddr;
@ -611,6 +612,18 @@ public class ConnectContext {
this.loginTime = System.currentTimeMillis();
}
public void addPreparedQuery(String preparedStatementId, String preparedQuery) {
preparedQuerys.put(preparedStatementId, preparedQuery);
}
public String getPreparedQuery(String preparedStatementId) {
return preparedQuerys.get(preparedStatementId);
}
public void removePreparedQuery(String preparedStatementId) {
preparedQuerys.remove(preparedStatementId);
}
public void setRunningQuery(String runningQuery) {
this.runningQuery = runningQuery;
}

View File

@ -1687,7 +1687,7 @@ public class Coordinator implements CoordInterface {
if (backend.getArrowFlightSqlPort() < 0) {
return null;
}
return new TNetworkAddress(backend.getHost(), backend.getArrowFlightSqlPort());
return backend.getArrowFlightAddress();
}
// estimate if this fragment contains UnionNode

View File

@ -605,9 +605,6 @@ public class StmtExecutor {
}
private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
context.setReturnResultFromLocal(false);
}
// queue query here
syncJournalIfNeeded();
QueueOfferToken offerRet = null;
@ -642,6 +639,9 @@ public class StmtExecutor {
DebugUtil.printId(queryId), i, DebugUtil.printId(newQueryId));
context.setQueryId(newQueryId);
}
if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
context.setReturnResultFromLocal(false);
}
handleQueryStmt();
break;
} catch (RpcException e) {
@ -2305,18 +2305,23 @@ public class StmtExecutor {
}
public void handleExplainStmt(String result, boolean isNereids) throws IOException {
// TODO support arrow flight sql
ShowResultSetMetaData metaData = ShowResultSetMetaData.builder()
.addColumn(new Column("Explain String" + (isNereids ? "(Nereids Planner)" : "(Old Planner)"),
ScalarType.createVarchar(20)))
.build();
sendMetaData(metaData);
if (context.getConnectType() == ConnectType.MYSQL) {
sendMetaData(metaData);
// Send result set.
for (String item : result.split("\n")) {
serializer.reset();
serializer.writeLenEncodedString(item);
context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
// Send result set.
for (String item : result.split("\n")) {
serializer.reset();
serializer.writeLenEncodedString(item);
context.getMysqlChannel().sendOnePacket(serializer.toByteBuffer());
}
} else if (context.getConnectType() == ConnectType.ARROW_FLIGHT_SQL) {
context.getFlightSqlChannel()
.addResult(DebugUtil.printId(context.queryId()), context.getRunningQuery(), metaData, result);
context.setReturnResultFromLocal(true);
}
context.getState().setEof();
}

View File

@ -47,6 +47,7 @@ import org.apache.arrow.flight.sql.FlightSqlProducer;
import org.apache.arrow.flight.sql.SqlInfoBuilder;
import org.apache.arrow.flight.sql.impl.FlightSql.ActionClosePreparedStatementRequest;
import org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementRequest;
import org.apache.arrow.flight.sql.impl.FlightSql.ActionCreatePreparedStatementResult;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetCatalogs;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetCrossReference;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandGetDbSchemas;
@ -61,20 +62,31 @@ import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementQuery;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandPreparedStatementUpdate;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementQuery;
import org.apache.arrow.flight.sql.impl.FlightSql.CommandStatementUpdate;
import org.apache.arrow.flight.sql.impl.FlightSql.DoPutUpdateResult;
import org.apache.arrow.flight.sql.impl.FlightSql.SqlSupportedCaseSensitivity;
import org.apache.arrow.flight.sql.impl.FlightSql.TicketStatementQuery;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable {
private static final Logger LOG = LogManager.getLogger(DorisFlightSqlProducer.class);
@ -82,49 +94,97 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
private final BufferAllocator rootAllocator = new RootAllocator();
private final SqlInfoBuilder sqlInfoBuilder;
private final FlightSessionsManager flightSessionsManager;
private final ExecutorService executorService = Executors.newFixedThreadPool(100);
public DorisFlightSqlProducer(final Location location, FlightSessionsManager flightSessionsManager) {
this.location = location;
this.flightSessionsManager = flightSessionsManager;
sqlInfoBuilder = new SqlInfoBuilder();
sqlInfoBuilder.withFlightSqlServerName("DorisFE")
.withFlightSqlServerVersion("1.0")
.withFlightSqlServerArrowVersion("13.0")
.withFlightSqlServerReadOnly(false)
.withSqlIdentifierQuoteChar("`")
.withSqlDdlCatalog(true)
.withSqlDdlSchema(false)
.withSqlDdlTable(false)
sqlInfoBuilder.withFlightSqlServerName("DorisFE").withFlightSqlServerVersion("1.0")
.withFlightSqlServerArrowVersion("13.0").withFlightSqlServerReadOnly(false)
.withSqlIdentifierQuoteChar("`").withSqlDdlCatalog(true).withSqlDdlSchema(false).withSqlDdlTable(false)
.withSqlIdentifierCase(SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_CASE_INSENSITIVE)
.withSqlQuotedIdentifierCase(SqlSupportedCaseSensitivity.SQL_CASE_SENSITIVITY_CASE_INSENSITIVE);
}
private static ByteBuffer serializeMetadata(final Schema schema) {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try {
MessageSerializer.serialize(new WriteChannel(Channels.newChannel(outputStream)), schema);
return ByteBuffer.wrap(outputStream.toByteArray());
} catch (final IOException e) {
throw new RuntimeException("Failed to serialize arrow flight sql schema", e);
}
}
private void getStreamStatementResult(String handle, ServerStreamListener listener) {
String[] handleParts = handle.split(":");
String executedPeerIdentity = handleParts[0];
String queryId = handleParts[1];
ConnectContext connectContext = flightSessionsManager.getConnectContext(executedPeerIdentity);
try {
// The tokens used for authentication between getStreamStatement and getFlightInfoStatement are different.
final FlightSqlResultCacheEntry flightSqlResultCacheEntry = Objects.requireNonNull(
connectContext.getFlightSqlChannel().getResult(queryId));
final VectorSchemaRoot vectorSchemaRoot = flightSqlResultCacheEntry.getVectorSchemaRoot();
listener.start(vectorSchemaRoot);
listener.putNext();
} catch (Exception e) {
listener.error(e);
String errMsg = "get stream statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e)
+ ", error code: " + connectContext.getState().getErrorCode() + ", error msg: "
+ connectContext.getState().getErrorMessage();
LOG.warn(errMsg, e);
throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
} finally {
listener.completed();
// The result has been sent or sent failed, delete it.
connectContext.getFlightSqlChannel().invalidate(queryId);
}
}
@Override
public void getStreamPreparedStatement(final CommandPreparedStatementQuery command, final CallContext context,
final ServerStreamListener listener) {
throw CallStatus.UNIMPLEMENTED.withDescription("getStreamPreparedStatement unimplemented").toRuntimeException();
getStreamStatementResult(command.getPreparedStatementHandle().toStringUtf8(), listener);
}
@Override
public void getStreamStatement(final TicketStatementQuery ticketStatementQuery, final CallContext context,
final ServerStreamListener listener) {
getStreamStatementResult(ticketStatementQuery.getStatementHandle().toStringUtf8(), listener);
}
@Override
public void closePreparedStatement(final ActionClosePreparedStatementRequest request, final CallContext context,
final StreamListener<Result> listener) {
throw CallStatus.UNIMPLEMENTED.withDescription("closePreparedStatement unimplemented").toRuntimeException();
executorService.submit(() -> {
try {
String[] handleParts = request.getPreparedStatementHandle().toStringUtf8().split(":");
String executedPeerIdentity = handleParts[0];
String preparedStatementId = handleParts[1];
flightSessionsManager.getConnectContext(executedPeerIdentity).removePreparedQuery(preparedStatementId);
} catch (final Exception e) {
listener.onError(e);
return;
}
listener.onCompleted();
});
}
@Override
public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, final CallContext context,
private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext connectContext, String query,
final FlightDescriptor descriptor) {
ConnectContext connectContext = null;
Preconditions.checkState(null != connectContext);
Preconditions.checkState(!query.isEmpty());
try {
connectContext = flightSessionsManager.getConnectContext(context.peerIdentity());
// After the previous query was executed, there was no getStreamStatement to take away the result.
connectContext.getFlightSqlChannel().reset();
final String query = request.getQuery();
final FlightSqlConnectProcessor flightSQLConnectProcessor = new FlightSqlConnectProcessor(connectContext);
flightSQLConnectProcessor.handleQuery(query);
if (connectContext.getState().getStateType() == MysqlStateType.ERR) {
throw new RuntimeException("after handleQuery");
throw new RuntimeException("after executeQueryStatement handleQuery");
}
if (connectContext.isReturnResultFromLocal()) {
@ -132,30 +192,30 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
if (connectContext.getFlightSqlChannel().resultNum() == 0) {
// a random query id and add empty results
String queryId = UUID.randomUUID().toString();
connectContext.getFlightSqlChannel().addEmptyResult(queryId, query);
connectContext.getFlightSqlChannel().addOKResult(queryId, query);
final ByteString handle = ByteString.copyFromUtf8(context.peerIdentity() + ":" + queryId);
final ByteString handle = ByteString.copyFromUtf8(peerIdentity + ":" + queryId);
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
return getFlightInfoForSchema(ticketStatement, descriptor,
connectContext.getFlightSqlChannel().getResult(queryId).getVectorSchemaRoot().getSchema());
} else {
// A Flight Sql request can only contain one statement that returns result,
// otherwise expected thrown exception during execution.
Preconditions.checkState(connectContext.getFlightSqlChannel().resultNum() == 1);
// The tokens used for authentication between getStreamStatement and getFlightInfoStatement
// are different. So put the peerIdentity into the ticket and then getStreamStatement is used to
// find the correct ConnectContext.
// queryId is used to find query results.
final ByteString handle = ByteString.copyFromUtf8(
peerIdentity + ":" + DebugUtil.printId(connectContext.queryId()));
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
return getFlightInfoForSchema(ticketStatement, descriptor,
connectContext.getFlightSqlChannel().getResult(DebugUtil.printId(connectContext.queryId()))
.getVectorSchemaRoot().getSchema());
}
// A Flight Sql request can only contain one statement that returns result,
// otherwise expected thrown exception during execution.
Preconditions.checkState(connectContext.getFlightSqlChannel().resultNum() == 1);
// The tokens used for authentication between getStreamStatement and getFlightInfoStatement
// are different. So put the peerIdentity into the ticket and then getStreamStatement is used to find
// the correct ConnectContext.
// queryId is used to find query results.
final ByteString handle = ByteString.copyFromUtf8(
context.peerIdentity() + ":" + DebugUtil.printId(connectContext.queryId()));
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
return getFlightInfoForSchema(ticketStatement, descriptor,
connectContext.getFlightSqlChannel().getResult(DebugUtil.printId(connectContext.queryId()))
.getVectorSchemaRoot().getSchema());
} else {
// Now only query stmt will pull results from BE.
final ByteString handle = ByteString.copyFromUtf8(
@ -176,24 +236,31 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
}
} catch (Exception e) {
if (null != connectContext) {
connectContext.setCommand(MysqlCommand.COM_SLEEP);
String errMsg = "get flight info statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(
e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: "
+ connectContext.getState().getErrorMessage();
LOG.warn(errMsg, e);
throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
}
LOG.warn("get flight info statement failed, " + e.getMessage(), e);
throw CallStatus.INTERNAL.withDescription(Util.getRootCauseMessage(e)).withCause(e).toRuntimeException();
connectContext.setCommand(MysqlCommand.COM_SLEEP);
String errMsg = "get flight info statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e)
+ ", error code: " + connectContext.getState().getErrorCode() + ", error msg: "
+ connectContext.getState().getErrorMessage();
LOG.warn(errMsg, e);
throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
}
}
@Override
public FlightInfo getFlightInfoStatement(final CommandStatementQuery request, final CallContext context,
final FlightDescriptor descriptor) {
ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity());
return executeQueryStatement(context.peerIdentity(), connectContext, request.getQuery(), descriptor);
}
@Override
public FlightInfo getFlightInfoPreparedStatement(final CommandPreparedStatementQuery command,
final CallContext context, final FlightDescriptor descriptor) {
throw CallStatus.UNIMPLEMENTED.withDescription("getFlightInfoPreparedStatement unimplemented")
.toRuntimeException();
String[] handleParts = command.getPreparedStatementHandle().toStringUtf8().split(":");
String executedPeerIdentity = handleParts[0];
String preparedStatementId = handleParts[1];
ConnectContext connectContext = flightSessionsManager.getConnectContext(executedPeerIdentity);
return executeQueryStatement(executedPeerIdentity, connectContext,
connectContext.getPreparedQuery(preparedStatementId), descriptor);
}
@Override
@ -202,42 +269,6 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
throw CallStatus.UNIMPLEMENTED.withDescription("getSchemaStatement unimplemented").toRuntimeException();
}
@Override
public void getStreamStatement(final TicketStatementQuery ticketStatementQuery, final CallContext context,
final ServerStreamListener listener) {
ConnectContext connectContext = null;
final String handle = ticketStatementQuery.getStatementHandle().toStringUtf8();
String[] handleParts = handle.split(":");
String executedPeerIdentity = handleParts[0];
String queryId = handleParts[1];
try {
// The tokens used for authentication between getStreamStatement and getFlightInfoStatement are different.
connectContext = flightSessionsManager.getConnectContext(executedPeerIdentity);
final FlightSqlResultCacheEntry flightSqlResultCacheEntry = Objects.requireNonNull(
connectContext.getFlightSqlChannel().getResult(queryId));
final VectorSchemaRoot vectorSchemaRoot = flightSqlResultCacheEntry.getVectorSchemaRoot();
listener.start(vectorSchemaRoot);
listener.putNext();
} catch (Exception e) {
listener.error(e);
if (null != connectContext) {
String errMsg = "get stream statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e)
+ ", error code: " + connectContext.getState().getErrorCode() + ", error msg: "
+ connectContext.getState().getErrorMessage();
LOG.warn(errMsg, e);
throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
}
LOG.warn("get stream statement failed, " + e.getMessage(), e);
throw CallStatus.INTERNAL.withDescription(Util.getRootCauseMessage(e)).withCause(e).toRuntimeException();
} finally {
listener.completed();
if (null != connectContext) {
// The result has been sent, delete it.
connectContext.getFlightSqlChannel().invalidate(queryId);
}
}
}
@Override
public void close() throws Exception {
AutoCloseables.close(rootAllocator);
@ -248,10 +279,54 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
throw CallStatus.UNIMPLEMENTED.withDescription("listFlights unimplemented").toRuntimeException();
}
private ActionCreatePreparedStatementResult buildCreatePreparedStatementResult(ByteString handle,
Schema parameterSchema, Schema metaData) {
Preconditions.checkState(!Objects.isNull(metaData));
final ByteString bytes = Objects.isNull(parameterSchema) ? ByteString.EMPTY
: ByteString.copyFrom(serializeMetadata(parameterSchema));
return ActionCreatePreparedStatementResult.newBuilder()
.setDatasetSchema(ByteString.copyFrom(serializeMetadata(metaData)))
.setParameterSchema(bytes)
.setPreparedStatementHandle(handle).build();
}
@Override
public void createPreparedStatement(final ActionCreatePreparedStatementRequest request, final CallContext context,
final StreamListener<Result> listener) {
throw CallStatus.UNIMPLEMENTED.withDescription("createPreparedStatement unimplemented").toRuntimeException();
// TODO can only execute complete SQL, not support SQL parameters.
executorService.submit(() -> {
ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity());
try {
final String query = request.getQuery();
String preparedStatementId = UUID.randomUUID().toString();
final ByteString handle = ByteString.copyFromUtf8(context.peerIdentity() + ":" + preparedStatementId);
connectContext.addPreparedQuery(preparedStatementId, query);
VectorSchemaRoot emptyVectorSchemaRoot = new VectorSchemaRoot(new ArrayList<>(), new ArrayList<>());
final Schema parameterSchema = emptyVectorSchemaRoot.getSchema();
// TODO FE does not have the ability to convert root fragment output expr into arrow schema.
// However, the metaData schema returned by createPreparedStatement is usually not used by the client,
// but it cannot be empty, otherwise it will be mistaken by the client as an updata statement.
// see: https://github.com/apache/arrow/issues/38911
Schema metaData = connectContext.getFlightSqlChannel()
.createOneOneSchemaRoot("ResultMeta", "UNIMPLEMENTED").getSchema();
listener.onNext(new Result(
Any.pack(buildCreatePreparedStatementResult(handle, parameterSchema, metaData))
.toByteArray()));
} catch (Exception e) {
connectContext.setCommand(MysqlCommand.COM_SLEEP);
String errMsg = "create prepared statement failed, " + e.getMessage() + ", "
+ Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode()
+ ", error msg: " + connectContext.getState().getErrorMessage();
LOG.warn(errMsg, e);
listener.onError(CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException());
return;
} catch (final Throwable t) {
listener.onError(CallStatus.INTERNAL.withDescription("Unknown error: " + t).toRuntimeException());
return;
}
listener.onCompleted();
});
}
@Override
@ -268,8 +343,22 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
@Override
public Runnable acceptPutPreparedStatementUpdate(CommandPreparedStatementUpdate command, CallContext context,
FlightStream flightStream, StreamListener<PutResult> ackStream) {
throw CallStatus.UNIMPLEMENTED.withDescription("acceptPutPreparedStatementUpdate unimplemented")
.toRuntimeException();
return () -> {
while (flightStream.next()) {
final VectorSchemaRoot root = flightStream.getRoot();
final int rowCount = root.getRowCount();
// TODO support update
Preconditions.checkState(rowCount == 0);
final int recordCount = -1;
final DoPutUpdateResult build = DoPutUpdateResult.newBuilder().setRecordCount(recordCount).build();
try (final ArrowBuf buffer = rootAllocator.buffer(build.getSerializedSize())) {
buffer.writeBytes(build.toByteArray());
ackStream.onNext(PutResult.metadata(buffer));
}
}
ackStream.onCompleted();
};
}
@Override

View File

@ -121,7 +121,10 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC
Status status = new Status();
status.setPstatus(pResult.getStatus());
throw new RuntimeException(String.format("fetch arrow flight schema failed, finstId: %s, errmsg: %s",
DebugUtil.printId(tid), status));
DebugUtil.printId(tid), status.getErrorMsg()));
}
if (pResult.hasBeArrowFlightIp()) {
ctx.getResultFlightServerAddr().hostname = pResult.getBeArrowFlightIp().toStringUtf8();
}
if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);

View File

@ -47,12 +47,8 @@ public class FlightSqlChannel {
public FlightSqlChannel() {
// The Stmt result is not picked up by the Client within 10 minutes and will be deleted.
resultCache =
CacheBuilder.newBuilder()
.maximumSize(100)
.expireAfterWrite(10, TimeUnit.MINUTES)
.removalListener(new ResultRemovalListener())
.build();
resultCache = CacheBuilder.newBuilder().maximumSize(100).expireAfterWrite(10, TimeUnit.MINUTES)
.removalListener(new ResultRemovalListener()).build();
allocator = new RootAllocator(Long.MAX_VALUE);
}
@ -98,19 +94,53 @@ public class FlightSqlChannel {
resultCache.put(queryId, flightSqlResultCacheEntry);
}
public void addEmptyResult(String queryId, String query) {
public void addResult(String queryId, String runningQuery, ResultSetMetaData metaData, String result) {
List<Field> schemaFields = new ArrayList<>();
List<FieldVector> dataFields = new ArrayList<>();
schemaFields.add(new Field("StatusResult", FieldType.nullable(new Utf8()), null));
VarCharVector varCharVector = new VarCharVector("StatusResult", allocator);
varCharVector.allocateNew();
varCharVector.setValueCount(1);
varCharVector.setSafe(0, "OK".getBytes());
dataFields.add(varCharVector);
// TODO: only support varchar type
for (Column col : metaData.getColumns()) {
schemaFields.add(new Field(col.getName(), FieldType.nullable(new Utf8()), null));
VarCharVector varCharVector = new VarCharVector(col.getName(), allocator);
varCharVector.allocateNew();
varCharVector.setValueCount(result.split("\n").length);
dataFields.add(varCharVector);
}
int rowNum = 0;
for (String item : result.split("\n")) {
if (item == null || item.equals(FeConstants.null_string)) {
dataFields.get(0).setNull(rowNum);
} else {
((VarCharVector) dataFields.get(0)).setSafe(rowNum, item.getBytes());
}
rowNum += 1;
}
VectorSchemaRoot vectorSchemaRoot = new VectorSchemaRoot(schemaFields, dataFields);
final FlightSqlResultCacheEntry flightSqlResultCacheEntry = new FlightSqlResultCacheEntry(vectorSchemaRoot,
query);
runningQuery);
resultCache.put(queryId, flightSqlResultCacheEntry);
}
/**
* Create a SchemaRoot with one row and one column.
*/
public VectorSchemaRoot createOneOneSchemaRoot(String colName, String colValue) {
List<Field> schemaFields = new ArrayList<>();
List<FieldVector> dataFields = new ArrayList<>();
schemaFields.add(new Field(colName, FieldType.nullable(new Utf8()), null));
VarCharVector varCharVector = new VarCharVector(colName, allocator);
varCharVector.allocateNew();
varCharVector.setValueCount(1);
varCharVector.setSafe(0, colValue.getBytes());
dataFields.add(varCharVector);
return new VectorSchemaRoot(schemaFields, dataFields);
}
public void addOKResult(String queryId, String query) {
final FlightSqlResultCacheEntry flightSqlResultCacheEntry = new FlightSqlResultCacheEntry(
createOneOneSchemaRoot("StatusResult", "OK"), query);
resultCache.put(queryId, flightSqlResultCacheEntry);
}

View File

@ -66,8 +66,7 @@ public interface FlightSessionsManager {
connectContext.setConnectScheduler(ExecuteEnv.getInstance().getScheduler());
if (!ExecuteEnv.getInstance().getScheduler().registerConnection(connectContext)) {
connectContext.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS,
"Reach limit of connections");
connectContext.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, "Reach limit of connections");
throw CallStatus.UNAUTHENTICATED.withDescription("Reach limit of connections").toRuntimeException();
}
return connectContext;

View File

@ -17,6 +17,7 @@
package org.apache.doris.service.arrowflight.sessions;
import org.apache.doris.common.util.Util;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.arrowflight.tokens.FlightTokenDetails;
@ -37,18 +38,23 @@ public class FlightSessionsWithTokenManager implements FlightSessionsManager {
@Override
public ConnectContext getConnectContext(String peerIdentity) {
ConnectContext connectContext = ExecuteEnv.getInstance().getScheduler().getContext(peerIdentity);
if (null == connectContext) {
connectContext = createConnectContext(peerIdentity);
try {
ConnectContext connectContext = ExecuteEnv.getInstance().getScheduler().getContext(peerIdentity);
if (null == connectContext) {
flightTokenManager.invalidateToken(peerIdentity);
String err = "UserSession expire after access, need reauthorize.";
LOG.error(err);
throw CallStatus.UNAUTHENTICATED.withDescription(err).toRuntimeException();
connectContext = createConnectContext(peerIdentity);
if (null == connectContext) {
flightTokenManager.invalidateToken(peerIdentity);
String err = "UserSession expire after access, need reauthorize.";
LOG.error(err);
throw CallStatus.UNAUTHENTICATED.withDescription(err).toRuntimeException();
}
return connectContext;
}
return connectContext;
} catch (Exception e) {
LOG.warn("getConnectContext failed, " + e.getMessage(), e);
throw CallStatus.INTERNAL.withDescription(Util.getRootCauseMessage(e)).withCause(e).toRuntimeException();
}
return connectContext;
}
@Override