[feature-wip](arrow-flight)(step6) Support regression test (#27847)
Design Documentation Linked to #25514 Regression test add a new group: arrow_flight_sql, ./run-regression-test.sh -g arrow_flight_sql to run regression-test, can use jdbc:arrow-flight-sql to run all Suites whose group contains arrow_flight_sql. ./run-regression-test.sh -g p0,arrow_flight_sql to run regression-test, can use jdbc:arrow-flight-sql to run all Suites whose group contains arrow_flight_sql, and use jdbc:mysql to run other Suites whose group contains p0 but does not contain arrow_flight_sql. Requires attention, the formats of jdbc:arrow-flight-sql and jdbc:mysql and mysql client query results are different, for example: Datatime field type: jdbc:mysql returns 2010-01-02T05:09:06, mysql client returns 2010-01-02 05:09:06, jdbc:arrow-flight-sql also returns 2010-01-02 05:09 :06. Array and Map field types: jdbc:mysql returns ["ab", "efg", null], {"f1": 1, "f2": "a"}, jdbc:arrow-flight-sql returns ["ab ","efg",null], {"f1":1,"f2":"a"}, which is missing spaces. Float field type: jdbc:mysql and mysql client returns 6.333, jdbc:arrow-flight-sql returns 6.333000183105469, in query_p0/subquery/test_subquery.groovy. If the query result is empty, jdbc:arrow-flight-sql returns empty and jdbc:mysql returns \N. use database; and query should be divided into two SQL executions as much as possible. otherwise the results may not be as expected. For example: USE information_schema; select cast ("0.0101031417" as datetime) The result is 2000-01-01 03:14:1 (constant fold), select cast ("0.0101031417" as datetime) The result is null (no constant fold), In addition, doris jdbc:arrow-flight-sql still has unfinished parts, such as: Unsupported data type: Decimal256. INVALID_ARGUMENT: [INTERNAL_ERROR]Fail to convert block data to arrow data, error: [E3] write_column_to_arrow with type Decimal256 Unsupported null value of map key. INVALID_ARGUMENT: [INTERNAL_ERROR]Fail to convert block data to arrow data, error: [E33] Can not write null value of map key to arrow. Unsupported data type: ARRAY<MAP<TEXT,TEXT>> jdbc:arrow-flight-sql not support connecting to specify DB name, such asjdbc:arrow-flight-sql://127.0.0.1:9090/{db_name}", In order to be compatible with regression-test, use db_nameis added before all SQLs whenjdbc:arrow-flight-sql` runs regression test. select timediff("2010-01-01 01:00:00", "2010-01-02 01:00:00");, error java.lang.NumberFormatException: For input string: "-24:00:00"
This commit is contained in:
@ -1417,11 +1417,14 @@ public class StmtExecutor {
|
||||
}
|
||||
|
||||
// handle selects that fe can do without be, so we can make sql tools happy, especially the setup step.
|
||||
Optional<ResultSet> resultSet = planner.handleQueryInFe(parsedStmt);
|
||||
if (resultSet.isPresent()) {
|
||||
sendResultSet(resultSet.get());
|
||||
LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
|
||||
return;
|
||||
// TODO FE not support doris field type conversion to arrow field type.
|
||||
if (!context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
|
||||
Optional<ResultSet> resultSet = planner.handleQueryInFe(parsedStmt);
|
||||
if (resultSet.isPresent()) {
|
||||
sendResultSet(resultSet.get());
|
||||
LOG.info("Query {} finished", DebugUtil.printId(context.queryId));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
MysqlChannel channel = null;
|
||||
|
||||
@ -88,6 +88,12 @@ import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
* Implementation of Arrow Flight SQL service
|
||||
*
|
||||
* All methods must catch all possible Exceptions, print and throw CallStatus,
|
||||
* otherwise error message will be discarded.
|
||||
*/
|
||||
public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable {
|
||||
private static final Logger LOG = LogManager.getLogger(DorisFlightSqlProducer.class);
|
||||
private final Location location;
|
||||
@ -175,9 +181,9 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
|
||||
|
||||
private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext connectContext, String query,
|
||||
final FlightDescriptor descriptor) {
|
||||
Preconditions.checkState(null != connectContext);
|
||||
Preconditions.checkState(!query.isEmpty());
|
||||
try {
|
||||
Preconditions.checkState(null != connectContext);
|
||||
Preconditions.checkState(!query.isEmpty());
|
||||
// After the previous query was executed, there was no getStreamStatement to take away the result.
|
||||
connectContext.getFlightSqlChannel().reset();
|
||||
final FlightSqlConnectProcessor flightSQLConnectProcessor = new FlightSqlConnectProcessor(connectContext);
|
||||
@ -294,6 +300,9 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
|
||||
public void createPreparedStatement(final ActionCreatePreparedStatementRequest request, final CallContext context,
|
||||
final StreamListener<Result> listener) {
|
||||
// TODO can only execute complete SQL, not support SQL parameters.
|
||||
// For Python: the Python code will try to create a prepared statement (this is to fit DBAPI, IIRC) and
|
||||
// if the server raises any error except for NotImplemented it will fail. (If it gets NotImplemented,
|
||||
// it will ignore and execute without a prepared statement.) see: https://github.com/apache/arrow/issues/38786
|
||||
executorService.submit(() -> {
|
||||
ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity());
|
||||
try {
|
||||
@ -344,20 +353,27 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
|
||||
public Runnable acceptPutPreparedStatementUpdate(CommandPreparedStatementUpdate command, CallContext context,
|
||||
FlightStream flightStream, StreamListener<PutResult> ackStream) {
|
||||
return () -> {
|
||||
while (flightStream.next()) {
|
||||
final VectorSchemaRoot root = flightStream.getRoot();
|
||||
final int rowCount = root.getRowCount();
|
||||
// TODO support update
|
||||
Preconditions.checkState(rowCount == 0);
|
||||
try {
|
||||
while (flightStream.next()) {
|
||||
final VectorSchemaRoot root = flightStream.getRoot();
|
||||
final int rowCount = root.getRowCount();
|
||||
// TODO support update
|
||||
Preconditions.checkState(rowCount == 0);
|
||||
|
||||
final int recordCount = -1;
|
||||
final DoPutUpdateResult build = DoPutUpdateResult.newBuilder().setRecordCount(recordCount).build();
|
||||
try (final ArrowBuf buffer = rootAllocator.buffer(build.getSerializedSize())) {
|
||||
buffer.writeBytes(build.toByteArray());
|
||||
ackStream.onNext(PutResult.metadata(buffer));
|
||||
final int recordCount = -1;
|
||||
final DoPutUpdateResult build = DoPutUpdateResult.newBuilder().setRecordCount(recordCount).build();
|
||||
try (final ArrowBuf buffer = rootAllocator.buffer(build.getSerializedSize())) {
|
||||
buffer.writeBytes(build.toByteArray());
|
||||
ackStream.onNext(PutResult.metadata(buffer));
|
||||
}
|
||||
}
|
||||
ackStream.onCompleted();
|
||||
} catch (Exception e) {
|
||||
String errMsg = "acceptPutPreparedStatementUpdate failed, " + e.getMessage() + ", "
|
||||
+ Util.getRootCauseMessage(e);
|
||||
LOG.warn(errMsg, e);
|
||||
throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
|
||||
}
|
||||
ackStream.onCompleted();
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.service.arrowflight;
|
||||
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.service.arrowflight.auth2.FlightBearerTokenAuthenticator;
|
||||
import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;
|
||||
import org.apache.doris.service.arrowflight.sessions.FlightSessionsWithTokenManager;
|
||||
@ -39,13 +40,13 @@ import java.io.IOException;
|
||||
public class DorisFlightSqlService {
|
||||
private static final Logger LOG = LogManager.getLogger(DorisFlightSqlService.class);
|
||||
private final FlightServer flightServer;
|
||||
private volatile boolean running;
|
||||
private final FlightTokenManager flightTokenManager;
|
||||
private final FlightSessionsManager flightSessionsManager;
|
||||
private volatile boolean running;
|
||||
|
||||
public DorisFlightSqlService(int port) {
|
||||
BufferAllocator allocator = new RootAllocator();
|
||||
Location location = Location.forGrpcInsecure("0.0.0.0", port);
|
||||
Location location = Location.forGrpcInsecure(FrontendOptions.getLocalHostAddress(), port);
|
||||
this.flightTokenManager = new FlightTokenManagerImpl(Config.arrow_flight_token_cache_size,
|
||||
Config.arrow_flight_token_alive_time);
|
||||
this.flightSessionsManager = new FlightSessionsWithTokenManager(flightTokenManager);
|
||||
|
||||
@ -140,7 +140,7 @@ public class FlightSqlChannel {
|
||||
|
||||
public void addOKResult(String queryId, String query) {
|
||||
final FlightSqlResultCacheEntry flightSqlResultCacheEntry = new FlightSqlResultCacheEntry(
|
||||
createOneOneSchemaRoot("StatusResult", "OK"), query);
|
||||
createOneOneSchemaRoot("StatusResult", "0"), query);
|
||||
resultCache.put(queryId, flightSqlResultCacheEntry);
|
||||
}
|
||||
|
||||
|
||||
@ -20,13 +20,10 @@ package org.apache.doris.service.arrowflight.sessions;
|
||||
|
||||
import org.apache.doris.analysis.UserIdentity;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.service.ExecuteEnv;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
|
||||
import org.apache.arrow.flight.CallStatus;
|
||||
|
||||
/**
|
||||
* Manages Flight User Session ConnectContext.
|
||||
*/
|
||||
@ -65,10 +62,6 @@ public interface FlightSessionsManager {
|
||||
connectContext.getEnv().getAuth().getInsertTimeout(connectContext.getQualifiedUser()));
|
||||
|
||||
connectContext.setConnectScheduler(ExecuteEnv.getInstance().getScheduler());
|
||||
if (!ExecuteEnv.getInstance().getScheduler().registerConnection(connectContext)) {
|
||||
connectContext.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, "Reach limit of connections");
|
||||
throw CallStatus.UNAUTHENTICATED.withDescription("Reach limit of connections").toRuntimeException();
|
||||
}
|
||||
return connectContext;
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.service.arrowflight.sessions;
|
||||
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.service.ExecuteEnv;
|
||||
@ -27,13 +28,17 @@ import org.apache.arrow.flight.CallStatus;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class FlightSessionsWithTokenManager implements FlightSessionsManager {
|
||||
private static final Logger LOG = LogManager.getLogger(FlightSessionsWithTokenManager.class);
|
||||
|
||||
private final FlightTokenManager flightTokenManager;
|
||||
private final AtomicInteger nextConnectionId;
|
||||
|
||||
public FlightSessionsWithTokenManager(FlightTokenManager flightTokenManager) {
|
||||
this.flightTokenManager = flightTokenManager;
|
||||
this.nextConnectionId = new AtomicInteger(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -65,8 +70,16 @@ public class FlightSessionsWithTokenManager implements FlightSessionsManager {
|
||||
return null;
|
||||
}
|
||||
flightTokenDetails.setCreatedSession(true);
|
||||
return FlightSessionsManager.buildConnectContext(peerIdentity, flightTokenDetails.getUserIdentity(),
|
||||
flightTokenDetails.getRemoteIp());
|
||||
ConnectContext connectContext = FlightSessionsManager.buildConnectContext(peerIdentity,
|
||||
flightTokenDetails.getUserIdentity(), flightTokenDetails.getRemoteIp());
|
||||
connectContext.setConnectionId(nextConnectionId.getAndAdd(1));
|
||||
connectContext.resetLoginTime();
|
||||
if (!ExecuteEnv.getInstance().getScheduler().registerConnection(connectContext)) {
|
||||
connectContext.getState()
|
||||
.setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, "Reach limit of connections");
|
||||
throw CallStatus.UNAUTHENTICATED.withDescription("Reach limit of connections").toRuntimeException();
|
||||
}
|
||||
return connectContext;
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.error("Bearer token validation failed.", e);
|
||||
throw CallStatus.UNAUTHENTICATED.toRuntimeException();
|
||||
|
||||
Reference in New Issue
Block a user