[Enhancement](Load) Stream Load using SQL (#22509)

This PR was originally #16940 , but it has not been updated for a long time due to the original author @Cai-Yao . At present, we will merge some of the code into the master first.

thanks @Cai-Yao @yiguolei
This commit is contained in:
zzzzzzzs
2023-08-08 13:49:04 +08:00
committed by GitHub
parent c4def9db5c
commit 66784cef71
46 changed files with 4110 additions and 12 deletions

View File

@ -85,6 +85,38 @@ public class LoadAction extends RestBaseController {
return executeWithoutPassword(request, response, db, table, true);
}
@RequestMapping(path = "/api/_stream_load_with_sql",
method = RequestMethod.PUT)
public Object streamLoadWithSql(HttpServletRequest request,
HttpServletResponse response) {
String sql = request.getHeader("sql");
LOG.info("streaming load sql={}", sql);
executeCheckPassword(request, response);
try {
// A 'Load' request must have 100-continue header
if (request.getHeader(HttpHeaderNames.EXPECT.toString()) == null) {
return new RestBaseResult("There is no 100-continue header");
}
final String clusterName = ConnectContext.get().getClusterName();
if (Strings.isNullOrEmpty(clusterName)) {
return new RestBaseResult("No cluster selected.");
}
String label = request.getHeader(LABEL_KEY);
TNetworkAddress redirectAddr;
redirectAddr = selectRedirectBackend(clusterName);
LOG.info("redirect load action to destination={}, label: {}",
redirectAddr.toString(), label);
RedirectView redirectView = redirectTo(request, redirectAddr);
return redirectView;
} catch (Exception e) {
return new RestBaseResult(e.getMessage());
}
}
@RequestMapping(path = "/api/{" + DB_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT)
public Object streamLoad2PC(HttpServletRequest request,
HttpServletResponse response,

View File

@ -40,6 +40,7 @@ import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.resource.Tag;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionStatus;
@ -75,6 +76,11 @@ public class ConnectContext {
protected volatile long stmtId;
protected volatile long forwardedStmtId;
// set for stream load with sql
protected volatile TUniqueId loadId;
protected volatile long backendId;
protected volatile LoadTaskInfo streamLoadInfo;
protected volatile TUniqueId queryId;
protected volatile String traceId;
// id for this connection
@ -320,6 +326,30 @@ public class ConnectContext {
return stmtId;
}
public long getBackendId() {
return backendId;
}
public void setBackendId(long backendId) {
this.backendId = backendId;
}
public TUniqueId getLoadId() {
return loadId;
}
public void setLoadId(TUniqueId loadId) {
this.loadId = loadId;
}
public void setStreamLoadInfo(LoadTaskInfo streamLoadInfo) {
this.streamLoadInfo = streamLoadInfo;
}
public LoadTaskInfo getStreamLoadInfo() {
return streamLoadInfo;
}
public void setStmtId(long stmtId) {
this.stmtId = stmtId;
}

View File

@ -187,6 +187,8 @@ public class Coordinator {
// Once this is set to true, errors from remote fragments are ignored.
private boolean returnedAllResults;
private List<RuntimeProfile> fragmentProfile;
// populated in computeFragmentExecParams()
private final Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap = Maps.newHashMap();
@ -252,6 +254,7 @@ public class Coordinator {
public List<RuntimeFilter> assignedRuntimeFilters = new ArrayList<>();
// Runtime filter ID to the builder instance number
public Map<RuntimeFilterId, Integer> ridToBuilderNum = Maps.newHashMap();
private ConnectContext context;
private boolean isPointQuery = false;
private PointQueryExec pointExec = null;
@ -282,6 +285,7 @@ public class Coordinator {
// Used for query/insert/test
public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) {
this.context = context;
this.isBlockQuery = planner.isBlockQuery();
this.queryId = context.queryId();
this.fragments = planner.getFragments();
@ -392,6 +396,10 @@ public class Coordinator {
this.queryOptions.setEnableScanNodeRunSerial(context.getSessionVariable().isEnableScanRunSerial());
}
public ConnectContext getConnectContext() {
return context;
}
public long getJobId() {
return jobId;
}

View File

@ -142,6 +142,11 @@ public class BackendServiceClient {
return stub.getColumnIdsByTabletIds(request);
}
public Future<InternalService.PReportStreamLoadStatusResponse> reportStreamLoadStatus(
InternalService.PReportStreamLoadStatusRequest request) {
return stub.reportStreamLoadStatus(request);
}
public void shutdown() {
if (!channel.isShutdown()) {
channel.shutdown();

View File

@ -269,6 +269,18 @@ public class BackendServiceProxy {
}
}
public Future<InternalService.PReportStreamLoadStatusResponse> reportStreamLoadStatus(
TNetworkAddress address, InternalService.PReportStreamLoadStatusRequest request) throws RpcException {
try {
final BackendServiceClient client = getProxy(address);
return client.reportStreamLoadStatus(request);
} catch (Throwable e) {
LOG.warn("report stream load status catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
throw new RpcException(address.hostname, e.getMessage());
}
}
public Future<InternalService.PCacheResponse> updateCache(
TNetworkAddress address, InternalService.PUpdateCacheRequest request) throws RpcException {
try {

View File

@ -21,9 +21,13 @@ import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.analysis.AddColumnsClause;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.ColumnDef;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.analysis.SetType;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TypeDef;
import org.apache.doris.analysis.UserIdentity;
@ -58,11 +62,15 @@ import org.apache.doris.common.ThriftServerEventProcessor;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.annotation.LogException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.cooldown.CooldownDelete;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.master.MasterImpl;
import org.apache.doris.mysql.privilege.AccessControllerManager;
@ -71,10 +79,13 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.DdlExecutor;
import org.apache.doris.qe.MasterCatalogExecutor;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.StatisticsCacheKey;
@ -83,6 +94,7 @@ import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.tablefunction.MetadataGenerator;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.task.StreamLoadTask;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.FrontendServiceVersion;
@ -151,7 +163,9 @@ import org.apache.doris.thrift.TPrivilegeCtrl;
import org.apache.doris.thrift.TPrivilegeHier;
import org.apache.doris.thrift.TPrivilegeStatus;
import org.apache.doris.thrift.TPrivilegeType;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TQueryStatsResult;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReplicaInfo;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
@ -169,9 +183,12 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStreamLoadMultiTablePutResult;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TStreamLoadPutResult;
import org.apache.doris.thrift.TStreamLoadWithLoadStatusRequest;
import org.apache.doris.thrift.TStreamLoadWithLoadStatusResult;
import org.apache.doris.thrift.TTableIndexQueryStats;
import org.apache.doris.thrift.TTableQueryStats;
import org.apache.doris.thrift.TTableStatus;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TUpdateExportTaskStatusRequest;
import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
import org.apache.doris.thrift.TWaitingTxnStatusRequest;
@ -181,6 +198,7 @@ import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionStatus;
import org.apache.doris.transaction.TxnCommitAttachment;
import com.google.common.base.Preconditions;
@ -192,6 +210,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.io.StringReader;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
@ -1812,6 +1831,46 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return result;
}
private void streamLoadPutWithSqlImpl(TStreamLoadPutRequest request) throws UserException {
LOG.info("receive stream load put request");
String loadSql = request.getLoadSql();
ConnectContext ctx = new ConnectContext(null);
ctx.setEnv(Env.getCurrentEnv());
ctx.setQueryId(request.getLoadId());
ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);
ctx.setCurrentUserIdentity(UserIdentity.ROOT);
ctx.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser());
ctx.setThreadLocalInfo();
ctx.setBackendId(request.getBackendId());
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(request);
ctx.setStreamLoadInfo(streamLoadTask);
ctx.setLoadId(request.getLoadId());
SqlScanner input = new SqlScanner(new StringReader(loadSql), ctx.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
try {
StatementBase parsedStmt = SqlParserUtils.getFirstStmt(parser);
parsedStmt.setOrigStmt(new OriginStatement(loadSql, 0));
parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
StmtExecutor executor = new StmtExecutor(ctx, parsedStmt);
ctx.setExecutor(executor);
TQueryOptions tQueryOptions = ctx.getSessionVariable().toThrift();
executor.analyze(tQueryOptions);
Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
Coordinator coord = new Coordinator(ctx, analyzer, executor.planner());
coord.setLoadMemLimit(request.getExecMemLimit());
coord.setQueryType(TQueryType.LOAD);
QeProcessorImpl.INSTANCE.registerQuery(request.getLoadId(), coord);
coord.exec();
} catch (UserException e) {
LOG.warn("exec sql error {}", e.getMessage());
throw new UserException("exec sql error");
} catch (Throwable e) {
LOG.warn("exec sql error catch unknown result.", e);
throw new UserException("exec sql error catch unknown result");
}
}
private TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest request) throws UserException {
String cluster = request.getCluster();
if (Strings.isNullOrEmpty(cluster)) {
@ -1928,6 +1987,93 @@ public class FrontendServiceImpl implements FrontendService.Iface {
}
}
// this function need to be improved
@Override
public TStreamLoadWithLoadStatusResult streamLoadWithLoadStatus(TStreamLoadWithLoadStatusRequest request) {
TStreamLoadWithLoadStatusResult result = new TStreamLoadWithLoadStatusResult();
TUniqueId loadId = request.getLoadId();
Coordinator coord = QeProcessorImpl.INSTANCE.getCoordinator(loadId);
long totalRows = 0;
long loadedRows = 0;
int filteredRows = 0;
int unselectedRows = 0;
long txnId = -1;
Throwable throwable = null;
String label = "";
if (coord == null) {
result.setStatus(new TStatus(TStatusCode.RUNTIME_ERROR));
LOG.info("runtime error, query {} does not exist", DebugUtil.printId(loadId));
return result;
}
ConnectContext context = coord.getConnectContext();
StmtExecutor exec = context.getExecutor();
InsertStmt insertStmt = (InsertStmt) exec.getParsedStmt();
label = insertStmt.getLabel();
txnId = insertStmt.getTransactionId();
result.setTxnId(txnId);
TransactionStatus txnStatus = TransactionStatus.ABORTED;
if (coord.getExecStatus().ok()) {
if (coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) {
totalRows = Long.parseLong(coord.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL));
}
if (coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL) != null) {
filteredRows = Integer.parseInt(coord.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL));
}
if (coord.getLoadCounters().get(LoadJob.UNSELECTED_ROWS) != null) {
unselectedRows = Integer.parseInt(coord.getLoadCounters().get(LoadJob.UNSELECTED_ROWS));
}
loadedRows = totalRows - filteredRows - unselectedRows;
try {
if (Env.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
insertStmt.getDbObj(), Lists.newArrayList(insertStmt.getTargetTable()),
insertStmt.getTransactionId(),
TabletCommitInfo.fromThrift(coord.getCommitInfos()),
context.getSessionVariable().getInsertVisibleTimeoutMs())) {
txnStatus = TransactionStatus.VISIBLE;
} else {
txnStatus = TransactionStatus.COMMITTED;
}
} catch (Throwable t) {
// if any throwable being thrown during insert operation, first we should abort this txn
LOG.warn("handle insert stmt fail: {}", label, t);
try {
Env.getCurrentGlobalTransactionMgr().abortTransaction(
insertStmt.getDbObj().getId(), insertStmt.getTransactionId(),
t.getMessage() == null ? "unknown reason" : t.getMessage());
} catch (Exception abortTxnException) {
// just print a log if abort txn failed. This failure do not need to pass to user.
// user only concern abort how txn failed.
LOG.warn("errors when abort txn", abortTxnException);
}
throwable = t;
} finally {
QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
}
try {
context.getEnv().getLoadManager()
.recordFinishedLoadJob(label, txnId, insertStmt.getDbName(),
insertStmt.getTargetTable().getId(),
EtlJobType.INSERT, System.currentTimeMillis(),
throwable == null ? "" : throwable.getMessage(),
coord.getTrackingUrl(), insertStmt.getUserInfo());
} catch (MetaNotFoundException e) {
LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
}
context.setOrUpdateInsertResult(txnId, label, insertStmt.getDbName(), insertStmt.getTbl(),
txnStatus, loadedRows, filteredRows);
context.updateReturnRows((int) loadedRows);
result.setStatus(new TStatus(TStatusCode.OK));
result.setTotalRows(totalRows);
result.setLoadedRows(loadedRows);
result.setFilteredRows(filteredRows);
result.setUnselectedRows(unselectedRows);
} else {
QeProcessorImpl.INSTANCE.unregisterQuery(loadId);
result.setStatus(new TStatus(TStatusCode.CANCELLED));
}
return result;
}
@Override
public TStatus snapshotLoaderReport(TSnapshotLoaderReportRequest request) throws TException {
if (Env.getCurrentEnv().getBackupHandler().report(request.getTaskType(), request.getJobId(),

View File

@ -44,6 +44,7 @@ import org.apache.doris.proto.InternalService.PFetchTableSchemaRequest;
import org.apache.doris.proto.Types.PScalarType;
import org.apache.doris.proto.Types.PTypeDesc;
import org.apache.doris.proto.Types.PTypeNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
@ -470,6 +471,8 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
fileScanRangeParams.setFormatType(fileFormatType);
fileScanRangeParams.setProperties(locationProperties);
fileScanRangeParams.setFileAttributes(getFileAttributes());
ConnectContext ctx = ConnectContext.get();
fileScanRangeParams.setLoadId(ctx.getLoadId());
if (getTFileType() == TFileType.FILE_HDFS) {
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
String fsNmae = getLocationProperties().get(HdfsResource.HADOOP_FS_NAME);

View File

@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.thrift.TFileType;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
/**
* The Implement of table valued function
* stream("FORMAT" = "csv").
*/
public class StreamTableValuedFunction extends ExternalFileTableValuedFunction {
private static final Logger LOG = LogManager.getLogger(StreamTableValuedFunction.class);
public static final String NAME = "stream";
public StreamTableValuedFunction(Map<String, String> params) throws AnalysisException {
Map<String, String> validParams = new CaseInsensitiveMap();
for (String key : params.keySet()) {
if (!FILE_FORMAT_PROPERTIES.contains(key.toLowerCase())) {
throw new AnalysisException(key + " is invalid property");
}
validParams.put(key, params.get(key));
}
parseProperties(validParams);
}
// =========== implement abstract methods of ExternalFileTableValuedFunction =================
@Override
public TFileType getTFileType() {
switch (getTFileFormatType()) {
case FORMAT_PARQUET:
case FORMAT_ORC:
return TFileType.FILE_LOCAL;
default:
return TFileType.FILE_STREAM;
}
}
@Override
public String getFilePath() {
return null;
}
@Override
public BrokerDesc getBrokerDesc() {
return null;
}
// =========== implement abstract methods of TableValuedFunctionIf =================
@Override
public String getTableName() {
return "StreamTableValuedFunction";
}
}

View File

@ -49,6 +49,8 @@ public abstract class TableValuedFunctionIf {
return new S3TableValuedFunction(params);
case HdfsTableValuedFunction.NAME:
return new HdfsTableValuedFunction(params);
case StreamTableValuedFunction.NAME:
return new StreamTableValuedFunction(params);
case IcebergTableValuedFunction.NAME:
return new IcebergTableValuedFunction(params);
case BackendsTableValuedFunction.NAME: