[pick](branch-2.1) pick #44286 (#45055)

This commit is contained in:
Xinyi Zou
2024-12-06 14:33:47 +08:00
committed by GitHub
parent d3c10f01e3
commit 2ed306d0b1
5 changed files with 207 additions and 143 deletions

View File

@ -19,7 +19,6 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.BoolLiteral;
import org.apache.doris.analysis.DecimalLiteral;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FloatLiteral;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.LiteralExpr;
@ -56,10 +55,10 @@ import org.apache.doris.plsql.executor.PlSqlOperation;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.arrowflight.results.FlightSqlChannel;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultSinkType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionEntry;
@ -76,7 +75,6 @@ import org.json.JSONObject;
import org.xnio.StreamConnection;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -120,10 +118,7 @@ public class ConnectContext {
protected volatile String peerIdentity;
private final Map<String, String> preparedQuerys = new HashMap<>();
private String runningQuery;
private TNetworkAddress resultFlightServerAddr;
private TNetworkAddress resultInternalServiceAddr;
private ArrayList<Expr> resultOutputExprs;
private TUniqueId finstId;
private final List<FlightSqlEndpointsLocation> flightSqlEndpointsLocations = Lists.newArrayList();
private boolean returnResultFromLocal = true;
// mysql net
protected volatile MysqlChannel mysqlChannel;
@ -713,36 +708,16 @@ public class ConnectContext {
return runningQuery;
}
public void setResultFlightServerAddr(TNetworkAddress resultFlightServerAddr) {
this.resultFlightServerAddr = resultFlightServerAddr;
public void addFlightSqlEndpointsLocation(FlightSqlEndpointsLocation flightSqlEndpointsLocation) {
this.flightSqlEndpointsLocations.add(flightSqlEndpointsLocation);
}
public TNetworkAddress getResultFlightServerAddr() {
return resultFlightServerAddr;
public List<FlightSqlEndpointsLocation> getFlightSqlEndpointsLocations() {
return flightSqlEndpointsLocations;
}
public void setResultInternalServiceAddr(TNetworkAddress resultInternalServiceAddr) {
this.resultInternalServiceAddr = resultInternalServiceAddr;
}
public TNetworkAddress getResultInternalServiceAddr() {
return resultInternalServiceAddr;
}
public void setResultOutputExprs(ArrayList<Expr> resultOutputExprs) {
this.resultOutputExprs = resultOutputExprs;
}
public ArrayList<Expr> getResultOutputExprs() {
return resultOutputExprs;
}
public void setFinstId(TUniqueId finstId) {
this.finstId = finstId;
}
public TUniqueId getFinstId() {
return finstId;
public void clearFlightSqlEndpointsLocations() {
flightSqlEndpointsLocations.clear();
}
public void setReturnResultFromLocal(boolean returnResultFromLocal) {

View File

@ -81,6 +81,7 @@ import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.LoadEtlTask;
@ -741,16 +742,15 @@ public class Coordinator implements CoordInterface {
this.timeoutDeadline = System.currentTimeMillis() + queryOptions.getExecutionTimeout() * 1000L;
if (topDataSink instanceof ResultSink || topDataSink instanceof ResultFileSink) {
TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
receiver = new ResultReceiver(queryId, topParams.instanceExecParams.get(0).instanceId,
addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), this.timeoutDeadline,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
if (!context.isReturnResultFromLocal()) {
if (context.isReturnResultFromLocal()) {
receiver = new ResultReceiver(queryId, topParams.instanceExecParams.get(0).instanceId,
addressToBackendID.get(execBeAddr), toBrpcHost(execBeAddr), this.timeoutDeadline,
context.getSessionVariable().getMaxMsgSizeOfResultReceiver());
} else {
Preconditions.checkState(context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL));
context.setFinstId(topParams.instanceExecParams.get(0).instanceId);
context.setResultFlightServerAddr(toArrowFlightHost(execBeAddr));
context.setResultInternalServiceAddr(toBrpcHost(execBeAddr));
context.setResultOutputExprs(fragments.get(0).getOutputExprs());
TUniqueId finstId = topParams.instanceExecParams.get(0).instanceId;
context.addFlightSqlEndpointsLocation(new FlightSqlEndpointsLocation(finstId,
toArrowFlightHost(execBeAddr), toBrpcHost(execBeAddr), fragments.get(0).getOutputExprs()));
}
LOG.info("dispatch result sink of query {} to {}", DebugUtil.printId(queryId),
@ -761,7 +761,8 @@ public class Coordinator implements CoordInterface {
// set the broker address for OUTFILE sink
ResultFileSink topResultFileSink = (ResultFileSink) topDataSink;
FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
.getBroker(topResultFileSink.getBrokerName(), execBeAddr.getHostname());
.getBroker(topResultFileSink.getBrokerName(),
topParams.instanceExecParams.get(0).host.getHostname());
topResultFileSink.setBrokerAddr(broker.host, broker.port);
}
} else {

View File

@ -25,11 +25,13 @@ import org.apache.doris.common.util.Util;
import org.apache.doris.mysql.MysqlCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.service.arrowflight.results.FlightSqlResultCacheEntry;
import org.apache.doris.service.arrowflight.sessions.FlightSessionsManager;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
@ -187,6 +189,7 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
Preconditions.checkState(!query.isEmpty());
// After the previous query was executed, there was no getStreamStatement to take away the result.
connectContext.getFlightSqlChannel().reset();
connectContext.clearFlightSqlEndpointsLocations();
try (FlightSqlConnectProcessor flightSQLConnectProcessor = new FlightSqlConnectProcessor(connectContext)) {
flightSQLConnectProcessor.handleQuery(query);
if (connectContext.getState().getStateType() == MysqlStateType.ERR) {
@ -225,46 +228,52 @@ public class DorisFlightSqlProducer implements FlightSqlProducer, AutoCloseable
}
} else {
// Now only query stmt will pull results from BE.
Schema schema = flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (schema == null) {
flightSQLConnectProcessor.fetchArrowFlightSchema(5000);
if (flightSQLConnectProcessor.getArrowSchema() == null) {
throw CallStatus.INTERNAL.withDescription("fetch arrow flight schema is null")
.toRuntimeException();
}
TUniqueId queryId = connectContext.getFinstId();
// Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located.
final ByteString handle = ByteString.copyFromUtf8(
DebugUtil.printId(queryId) + "&" + connectContext.getResultInternalServiceAddr().hostname
+ "&" + connectContext.getResultInternalServiceAddr().port + "&" + query);
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder().setStatementHandle(handle)
.build();
Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
// TODO Support multiple endpoints.
Location location;
if (flightSQLConnectProcessor.getPublicAccessAddr().isSetHostname()) {
// In a production environment, it is often inconvenient to expose Doris BE nodes
// to the external network.
// However, a reverse proxy (such as nginx) can be added to all Doris BE nodes,
// and the external client will be randomly routed to a Doris BE node when connecting to nginx.
// The query results of Arrow Flight SQL will be randomly saved on a Doris BE node.
// If it is different from the Doris BE node randomly routed by nginx,
// data forwarding needs to be done inside the Doris BE node.
if (flightSQLConnectProcessor.getPublicAccessAddr().isSetPort()) {
location = Location.forGrpcInsecure(
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
flightSQLConnectProcessor.getPublicAccessAddr().port);
List<FlightEndpoint> endpoints = Lists.newArrayList();
for (FlightSqlEndpointsLocation endpointLoc : connectContext.getFlightSqlEndpointsLocations()) {
TUniqueId tid = endpointLoc.getFinstId();
// Ticket contains the IP and Brpc Port of the Doris BE node where the query result is located.
final ByteString handle = ByteString.copyFromUtf8(
DebugUtil.printId(tid) + "&" + endpointLoc.getResultInternalServiceAddr().hostname + "&"
+ endpointLoc.getResultInternalServiceAddr().port + "&" + query);
TicketStatementQuery ticketStatement = TicketStatementQuery.newBuilder()
.setStatementHandle(handle).build();
Ticket ticket = new Ticket(Any.pack(ticketStatement).toByteArray());
Location location;
if (endpointLoc.getResultPublicAccessAddr().isSetHostname()) {
// In a production environment, it is often inconvenient to expose Doris BE nodes
// to the external network.
// However, a reverse proxy (such as nginx) can be added to all Doris BE nodes,
// and the external client will be randomly routed to a Doris BE node when connecting
// to nginx.
// The query results of Arrow Flight SQL will be randomly saved on a Doris BE node.
// If it is different from the Doris BE node randomly routed by nginx,
// data forwarding needs to be done inside the Doris BE node.
if (endpointLoc.getResultPublicAccessAddr().isSetPort()) {
location = Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname,
endpointLoc.getResultPublicAccessAddr().port);
} else {
location = Location.forGrpcInsecure(endpointLoc.getResultPublicAccessAddr().hostname,
endpointLoc.getResultFlightServerAddr().port);
}
} else {
location = Location.forGrpcInsecure(
flightSQLConnectProcessor.getPublicAccessAddr().hostname,
connectContext.getResultFlightServerAddr().port);
location = Location.forGrpcInsecure(endpointLoc.getResultFlightServerAddr().hostname,
endpointLoc.getResultFlightServerAddr().port);
}
} else {
location = Location.forGrpcInsecure(connectContext.getResultFlightServerAddr().hostname,
connectContext.getResultFlightServerAddr().port);
// By default, the query results of all BE nodes will be aggregated to one BE node.
// ADBC Client will only receive one endpoint and pull data from the BE node
// corresponding to this endpoint.
// `set global enable_parallel_result_sink=true;` to allow each BE to return query results
// separately. ADBC Client will receive multiple endpoints and pull data from each endpoint.
endpoints.add(new FlightEndpoint(ticket, location));
}
List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location));
// TODO Set in BE callback after query end, Client will not callback.
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
return new FlightInfo(flightSQLConnectProcessor.getArrowSchema(), descriptor, endpoints, -1, -1);
}
}
} catch (Exception e) {

View File

@ -31,6 +31,7 @@ import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.service.arrowflight.results.FlightSqlEndpointsLocation;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
@ -58,7 +59,7 @@ import java.util.concurrent.TimeoutException;
*/
public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoCloseable {
private static final Logger LOG = LogManager.getLogger(FlightSqlConnectProcessor.class);
private TNetworkAddress publicAccessAddr = new TNetworkAddress();
private Schema arrowSchema;
public FlightSqlConnectProcessor(ConnectContext context) {
super(context);
@ -67,8 +68,8 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC
context.setReturnResultFromLocal(true);
}
public TNetworkAddress getPublicAccessAddr() {
return publicAccessAddr;
public Schema getArrowSchema() {
return arrowSchema;
}
public void prepare(MysqlCommand command) {
@ -107,74 +108,87 @@ public class FlightSqlConnectProcessor extends ConnectProcessor implements AutoC
// handleFieldList(tableName);
// }
public Schema fetchArrowFlightSchema(int timeoutMs) {
TNetworkAddress address = ctx.getResultInternalServiceAddr();
TUniqueId tid = ctx.getFinstId();
ArrayList<Expr> resultOutputExprs = ctx.getResultOutputExprs();
Types.PUniqueId finstId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
try {
InternalService.PFetchArrowFlightSchemaRequest request =
InternalService.PFetchArrowFlightSchemaRequest.newBuilder()
.setFinstId(finstId)
.build();
public void fetchArrowFlightSchema(int timeoutMs) {
if (ctx.getFlightSqlEndpointsLocations().isEmpty()) {
throw new RuntimeException("fetch arrow flight schema failed, no FlightSqlEndpointsLocations.");
}
for (FlightSqlEndpointsLocation endpointLoc : ctx.getFlightSqlEndpointsLocations()) {
TNetworkAddress address = endpointLoc.getResultInternalServiceAddr();
TUniqueId tid = endpointLoc.getFinstId();
ArrayList<Expr> resultOutputExprs = endpointLoc.getResultOutputExprs();
Types.PUniqueId queryId = Types.PUniqueId.newBuilder().setHi(tid.hi).setLo(tid.lo).build();
try {
InternalService.PFetchArrowFlightSchemaRequest request
= InternalService.PFetchArrowFlightSchemaRequest.newBuilder().setFinstId(queryId).build();
Future<InternalService.PFetchArrowFlightSchemaResult> future
= BackendServiceProxy.getInstance().fetchArrowFlightSchema(address, request);
InternalService.PFetchArrowFlightSchemaResult pResult;
pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS);
if (pResult == null) {
throw new RuntimeException(String.format("fetch arrow flight schema timeout, finstId: %s",
DebugUtil.printId(tid)));
}
Status resultStatus = new Status(pResult.getStatus());
if (resultStatus.getErrorCode() != TStatusCode.OK) {
throw new RuntimeException(String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s",
DebugUtil.printId(tid), resultStatus));
}
if (pResult.hasBeArrowFlightIp()) {
publicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8());
}
if (pResult.hasBeArrowFlightPort()) {
publicAccessAddr.setPort(pResult.getBeArrowFlightPort());
}
if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);
ArrowStreamReader arrowStreamReader = new ArrowStreamReader(
new ByteArrayInputStream(pResult.getSchema().toByteArray()),
rootAllocator
);
try {
VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
List<FieldVector> fieldVectors = root.getFieldVectors();
if (fieldVectors.size() != resultOutputExprs.size()) {
throw new RuntimeException(String.format(
"Schema size %s' is not equal to arrow field size %s, finstId: %s.",
fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid)));
}
return root.getSchema();
} catch (Exception e) {
throw new RuntimeException("Read Arrow Flight Schema failed.", e);
Future<InternalService.PFetchArrowFlightSchemaResult> future = BackendServiceProxy.getInstance()
.fetchArrowFlightSchema(address, request);
InternalService.PFetchArrowFlightSchemaResult pResult;
pResult = future.get(timeoutMs, TimeUnit.MILLISECONDS);
if (pResult == null) {
throw new RuntimeException(
String.format("fetch arrow flight schema timeout, queryId: %s", DebugUtil.printId(tid)));
}
} else {
throw new RuntimeException(String.format("get empty arrow flight schema, finstId: %s",
DebugUtil.printId(tid)));
Status resultStatus = new Status(pResult.getStatus());
if (resultStatus.getErrorCode() != TStatusCode.OK) {
throw new RuntimeException(
String.format("fetch arrow flight schema failed, queryId: %s, errmsg: %s",
DebugUtil.printId(tid), resultStatus));
}
TNetworkAddress resultPublicAccessAddr = new TNetworkAddress();
if (pResult.hasBeArrowFlightIp()) {
resultPublicAccessAddr.setHostname(pResult.getBeArrowFlightIp().toStringUtf8());
}
if (pResult.hasBeArrowFlightPort()) {
resultPublicAccessAddr.setPort(pResult.getBeArrowFlightPort());
}
endpointLoc.setResultPublicAccessAddr(resultPublicAccessAddr);
if (pResult.hasSchema() && pResult.getSchema().size() > 0) {
RootAllocator rootAllocator = new RootAllocator(Integer.MAX_VALUE);
ArrowStreamReader arrowStreamReader = new ArrowStreamReader(
new ByteArrayInputStream(pResult.getSchema().toByteArray()), rootAllocator);
try {
Schema schema;
VectorSchemaRoot root = arrowStreamReader.getVectorSchemaRoot();
List<FieldVector> fieldVectors = root.getFieldVectors();
if (fieldVectors.size() != resultOutputExprs.size()) {
throw new RuntimeException(
String.format("Schema size %s' is not equal to arrow field size %s, queryId: %s.",
fieldVectors.size(), resultOutputExprs.size(), DebugUtil.printId(tid)));
}
schema = root.getSchema();
if (arrowSchema == null) {
arrowSchema = schema;
} else if (!arrowSchema.equals(schema)) {
throw new RuntimeException(String.format(
"The schema returned by results BE is different, first schema: %s, "
+ "new schema: %s, queryId: %s,backend: %s", arrowSchema, schema,
DebugUtil.printId(tid), address));
}
} catch (Exception e) {
throw new RuntimeException("Read Arrow Flight Schema failed.", e);
}
} else {
throw new RuntimeException(
String.format("get empty arrow flight schema, queryId: %s", DebugUtil.printId(tid)));
}
} catch (RpcException e) {
throw new RuntimeException(
String.format("arrow flight schema fetch catch rpc exception, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (InterruptedException e) {
throw new RuntimeException(
String.format("arrow flight schema future get interrupted exception, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (ExecutionException e) {
throw new RuntimeException(
String.format("arrow flight schema future get execution exception, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (TimeoutException e) {
throw new RuntimeException(String.format("arrow flight schema fetch timeout, queryId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
}
} catch (RpcException e) {
throw new RuntimeException(String.format(
"arrow flight schema fetch catch rpc exception, finstId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (InterruptedException e) {
throw new RuntimeException(String.format(
"arrow flight schema future get interrupted exception, finstId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (ExecutionException e) {
throw new RuntimeException(String.format(
"arrow flight schema future get execution exception, finstId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
} catch (TimeoutException e) {
throw new RuntimeException(String.format(
"arrow flight schema fetch timeout, finstId: %s,backend: %s",
DebugUtil.printId(tid), address), e);
}
}

View File

@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.service.arrowflight.results;
import org.apache.doris.analysis.Expr;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUniqueId;
import java.util.ArrayList;
public class FlightSqlEndpointsLocation {
private TUniqueId finstId;
private TNetworkAddress resultFlightServerAddr;
private TNetworkAddress resultInternalServiceAddr;
private TNetworkAddress resultPublicAccessAddr;
private ArrayList<Expr> resultOutputExprs;
public FlightSqlEndpointsLocation(TUniqueId finstId, TNetworkAddress resultFlightServerAddr,
TNetworkAddress resultInternalServiceAddr, ArrayList<Expr> resultOutputExprs) {
this.finstId = finstId;
this.resultFlightServerAddr = resultFlightServerAddr;
this.resultInternalServiceAddr = resultInternalServiceAddr;
this.resultPublicAccessAddr = new TNetworkAddress();
this.resultOutputExprs = resultOutputExprs;
}
public TUniqueId getFinstId() {
return finstId;
}
public TNetworkAddress getResultFlightServerAddr() {
return resultFlightServerAddr;
}
public TNetworkAddress getResultInternalServiceAddr() {
return resultInternalServiceAddr;
}
public void setResultPublicAccessAddr(TNetworkAddress resultPublicAccessAddr) {
this.resultPublicAccessAddr = resultPublicAccessAddr;
}
public TNetworkAddress getResultPublicAccessAddr() {
return resultPublicAccessAddr;
}
public ArrayList<Expr> getResultOutputExprs() {
return resultOutputExprs;
}
}