From 8e2146f48cfcd77af72d43caa5be8256c024f122 Mon Sep 17 00:00:00 2001 From: Tiewei Fang <43782773+BePPPower@users.noreply.github.com> Date: Thu, 20 Apr 2023 17:27:04 +0800 Subject: [PATCH] [Enhencement](Export) support export with outfile syntax (#18325) `Export` syntax provides asynchronous export function, but `Export` does not achieve vectorization. `Outfile` syntax provides synchronous export function`. So we can reimplement the export syntax with oufile syntax. --- be/src/vec/runtime/vfile_result_writer.cpp | 9 + .../Manipulation/EXPORT.md | 2 +- .../apache/doris/common/FeMetaVersion.java | 4 +- .../org/apache/doris/analysis/ExportStmt.java | 81 +- .../apache/doris/analysis/OutFileClause.java | 18 +- .../org/apache/doris/analysis/SelectStmt.java | 2 +- .../java/org/apache/doris/catalog/Env.java | 6 +- .../doris/common/proc/ExportProcNode.java | 2 +- .../apache/doris/journal/JournalEntity.java | 3 +- .../org/apache/doris/load/ExportChecker.java | 139 --- .../org/apache/doris/load/ExportFailMsg.java | 4 + .../java/org/apache/doris/load/ExportJob.java | 788 ++++++++---------- .../java/org/apache/doris/load/ExportMgr.java | 99 ++- .../org/apache/doris/persist/EditLog.java | 2 +- .../org/apache/doris/qe/ConnectContext.java | 14 + .../org/apache/doris/qe/StmtExecutor.java | 1 + .../doris/task/ExportExportingTask.java | 324 +++---- .../apache/doris/task/ExportPendingTask.java | 133 --- .../doris/analysis/CancelExportStmtTest.java | 2 +- .../doris/load/loadv2/ExportMgrTest.java | 10 +- .../apache/doris/qe/SessionVariablesTest.java | 11 +- gensrc/thrift/Data.thrift | 2 + 22 files changed, 643 insertions(+), 1013 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/ExportChecker.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/ExportPendingTask.java diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/runtime/vfile_result_writer.cpp index 8820250eae..c94c7f9752 100644 --- a/be/src/vec/runtime/vfile_result_writer.cpp +++ b/be/src/vec/runtime/vfile_result_writer.cpp @@ -500,6 +500,15 @@ Status VFileResultWriter::_send_result() { std::unique_ptr result = std::make_unique(); result->result_batch.rows.resize(1); result->result_batch.rows[0].assign(row_buffer.buf(), row_buffer.length()); + + std::map attach_infos; + attach_infos.insert(std::make_pair("FileNumber", std::to_string(_file_idx))); + attach_infos.insert( + std::make_pair("TotalRows", std::to_string(_written_rows_counter->value()))); + attach_infos.insert(std::make_pair("FileSize", std::to_string(_written_data_bytes->value()))); + attach_infos.insert(std::make_pair("URL", file_url)); + + result->result_batch.__set_attached_infos(attach_infos); RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "failed to send outfile result"); return Status::OK(); } diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md index 4f14e79889..a64e0034ee 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/Manipulation/EXPORT.md @@ -176,7 +176,7 @@ EXPORT TABLE testTbl TO "file:///home/data/a" PROPERTIES ("columns" = "k1,v1"); 8. 将 testTbl 表中的所有数据导出到 s3 上,以不可见字符 "\x07" 作为列或者行分隔符。 ```sql -EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c" +EXPORT TABLE testTbl TO "s3://hdfs_host:port/a/b/c" PROPERTIES ( "column_separator"="\\x07", "line_delimiter" = "\\x07" diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java index e220b64fb0..34b1d7eecf 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -59,8 +59,10 @@ public final class FeMetaVersion { // TablePropertyInfo add db id public static final int VERSION_119 = 119; + public static final int VERSION_120 = 120; + // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_119; + public static final int VERSION_CURRENT = VERSION_120; // all logs meta version should >= the minimum version, so that we could remove many if clause, for example // if (FE_METAVERSION < VERSION_94) ... diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java index 7a51444984..d9a7e59554 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java @@ -35,6 +35,7 @@ import org.apache.doris.common.util.URI; import org.apache.doris.common.util.Util; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -75,6 +76,17 @@ public class ExportStmt extends StatementBase { private TableRef tableRef; + private String format; + + private String label; + + private String maxFileSize; + private SessionVariable sessionVariables; + + private String qualifiedUser; + + private UserIdentity userIdentity; + public ExportStmt(TableRef tableRef, Expr whereExpr, String path, Map properties, BrokerDesc brokerDesc) { this.tableRef = tableRef; @@ -87,6 +99,7 @@ public class ExportStmt extends StatementBase { this.columnSeparator = DEFAULT_COLUMN_SEPARATOR; this.lineDelimiter = DEFAULT_LINE_DELIMITER; this.columns = DEFAULT_COLUMNS; + this.sessionVariables = ConnectContext.get().getSessionVariable(); } public String getColumns() { @@ -113,10 +126,6 @@ public class ExportStmt extends StatementBase { return brokerDesc; } - public Map getProperties() { - return properties; - } - public String getColumnSeparator() { return this.columnSeparator; } @@ -125,6 +134,30 @@ public class ExportStmt extends StatementBase { return this.lineDelimiter; } + public TableRef getTableRef() { + return this.tableRef; + } + + public String getFormat() { + return format; + } + + public String getLabel() { + return label; + } + + public SessionVariable getSessionVariables() { + return sessionVariables; + } + + public String getQualifiedUser() { + return qualifiedUser; + } + + public UserIdentity getUserIdentity() { + return this.userIdentity; + } + @Override public boolean needAuditEncryption() { if (brokerDesc != null) { @@ -162,6 +195,8 @@ public class ExportStmt extends StatementBase { ConnectContext.get().getRemoteIP(), tblName.getDb() + ": " + tblName.getTbl()); } + qualifiedUser = ConnectContext.get().getQualifiedUser(); + userIdentity = ConnectContext.get().getCurrentUserIdentity(); // check table && partitions whether exist checkTable(analyzer.getEnv()); @@ -171,8 +206,6 @@ public class ExportStmt extends StatementBase { brokerDesc = new BrokerDesc("local", StorageBackend.StorageType.LOCAL, null); } - // where expr will be checked in export job - // check path is valid path = checkPath(path, brokerDesc.getStorageType()); if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) { @@ -261,7 +294,6 @@ public class ExportStmt extends StatementBase { throw new AnalysisException( "Invalid export path. please use valid '" + OutFileClause.LOCAL_FILE_PREFIX + "' path."); } - path = path.substring(OutFileClause.LOCAL_FILE_PREFIX.length() - 1); } return path; } @@ -271,31 +303,26 @@ public class ExportStmt extends StatementBase { properties, ExportStmt.DEFAULT_COLUMN_SEPARATOR)); this.lineDelimiter = Separator.convertSeparator(PropertyAnalyzer.analyzeLineDelimiter( properties, ExportStmt.DEFAULT_LINE_DELIMITER)); - this.columns = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS); - // exec_mem_limit - if (properties.containsKey(LoadStmt.EXEC_MEM_LIMIT)) { - try { - Long.parseLong(properties.get(LoadStmt.EXEC_MEM_LIMIT)); - } catch (NumberFormatException e) { - throw new DdlException("Invalid exec_mem_limit value: " + e.getMessage()); - } - } else { - // use session variables - properties.put(LoadStmt.EXEC_MEM_LIMIT, - String.valueOf(ConnectContext.get().getSessionVariable().getMaxExecMemByte())); - } + this.columns = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_COLUMNS, DEFAULT_COLUMNS); + // timeout if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) { try { - Long.parseLong(properties.get(LoadStmt.TIMEOUT_PROPERTY)); + Integer.parseInt(properties.get(LoadStmt.TIMEOUT_PROPERTY)); } catch (NumberFormatException e) { throw new DdlException("Invalid timeout value: " + e.getMessage()); } } else { - // use session variables properties.put(LoadStmt.TIMEOUT_PROPERTY, String.valueOf(Config.export_task_default_timeout_second)); } + // format + if (properties.containsKey(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE)) { + this.format = properties.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE).toLowerCase(); + } else { + this.format = "csv"; + } + // tablet num per task if (properties.containsKey(TABLET_NUMBER_PER_TASK_PROP)) { try { @@ -308,13 +335,17 @@ public class ExportStmt extends StatementBase { properties.put(TABLET_NUMBER_PER_TASK_PROP, String.valueOf(Config.export_tablet_num_per_task)); } + // max_file_size + this.maxFileSize = properties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, ""); + if (properties.containsKey(LABEL)) { FeNameFormat.checkLabel(properties.get(LABEL)); } else { // generate a random label - String label = "export_" + UUID.randomUUID().toString(); + String label = "export_" + UUID.randomUUID(); properties.put(LABEL, label); } + label = properties.get(LABEL); } @Override @@ -361,4 +392,8 @@ public class ExportStmt extends StatementBase { public String toString() { return toSql(); } + + public String getMaxFileSize() { + return maxFileSize; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 1ec20fd625..df4dbacf55 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -68,12 +68,16 @@ public class OutFileClause { public static final Map PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap(); public static final Map PARQUET_VERSION_MAP = Maps.newHashMap(); public static final Set ORC_DATA_TYPE = Sets.newHashSet(); + public static final String FILE_NUMBER = "FileNumber"; + public static final String TOTAL_ROWS = "TotalRows"; + public static final String FILE_SIZE = "FileSize"; + public static final String URL = "URL"; static { - RESULT_COL_NAMES.add("FileNumber"); - RESULT_COL_NAMES.add("TotalRows"); - RESULT_COL_NAMES.add("FileSize"); - RESULT_COL_NAMES.add("URL"); + RESULT_COL_NAMES.add(FILE_NUMBER); + RESULT_COL_NAMES.add(TOTAL_ROWS); + RESULT_COL_NAMES.add(FILE_SIZE); + RESULT_COL_NAMES.add(URL); RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.INT)); RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.BIGINT)); @@ -122,9 +126,9 @@ public class OutFileClause { private static final String HADOOP_PROP_PREFIX = "hadoop."; private static final String BROKER_PROP_PREFIX = "broker."; private static final String PROP_BROKER_NAME = "broker.name"; - private static final String PROP_COLUMN_SEPARATOR = "column_separator"; - private static final String PROP_LINE_DELIMITER = "line_delimiter"; - private static final String PROP_MAX_FILE_SIZE = "max_file_size"; + public static final String PROP_COLUMN_SEPARATOR = "column_separator"; + public static final String PROP_LINE_DELIMITER = "line_delimiter"; + public static final String PROP_MAX_FILE_SIZE = "max_file_size"; private static final String PROP_SUCCESS_FILE_NAME = "success_file_name"; private static final String PARQUET_PROP_PREFIX = "parquet."; private static final String ORC_PROP_PREFIX = "orc."; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index b461d8d6ac..86771edd5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -143,7 +143,7 @@ public class SelectStmt extends QueryStmt { this.colLabels = Lists.newArrayList(); } - SelectStmt( + public SelectStmt( SelectList selectList, FromClause fromClause, Expr wherePredicate, diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 10c6e88bd0..71f20483f1 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -149,7 +149,6 @@ import org.apache.doris.journal.JournalCursor; import org.apache.doris.journal.JournalEntity; import org.apache.doris.journal.bdbje.Timestamp; import org.apache.doris.load.DeleteHandler; -import org.apache.doris.load.ExportChecker; import org.apache.doris.load.ExportJob; import org.apache.doris.load.ExportMgr; import org.apache.doris.load.Load; @@ -1402,9 +1401,8 @@ public class Env { loadJobScheduler.start(); loadEtlChecker.start(); loadLoadingChecker.start(); - // Export checker - ExportChecker.init(Config.export_checker_interval_second * 1000L); - ExportChecker.startAll(); + // export task + exportMgr.start(); // Tablet checker and scheduler tabletChecker.start(); tabletScheduler.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java index 1a154c8fd9..ef2dff1d1d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ExportProcNode.java @@ -32,7 +32,7 @@ public class ExportProcNode implements ProcNodeInterface { .add("JobId").add("Label").add("State").add("Progress") .add("TaskInfo").add("Path") .add("CreateTime").add("StartTime").add("FinishTime") - .add("Timeout").add("ErrorMsg") + .add("Timeout").add("ErrorMsg").add("OutfileInfo") .build(); // label and state column index of result diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java index ccbbd764a3..46c1d28f72 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java @@ -312,8 +312,7 @@ public class JournalEntity implements Writable { isRead = true; break; case OperationType.OP_EXPORT_UPDATE_STATE: - data = new ExportJob.StateTransfer(); - ((ExportJob.StateTransfer) data).readFields(in); + data = ExportJob.StateTransfer.read(in); isRead = true; break; case OperationType.OP_FINISH_DELETE: { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportChecker.java deleted file mode 100644 index 78a90b5c42..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportChecker.java +++ /dev/null @@ -1,139 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.load; - -import org.apache.doris.catalog.Env; -import org.apache.doris.common.Config; -import org.apache.doris.common.util.MasterDaemon; -import org.apache.doris.load.ExportJob.JobState; -import org.apache.doris.task.ExportExportingTask; -import org.apache.doris.task.ExportPendingTask; -import org.apache.doris.task.MasterTask; -import org.apache.doris.task.MasterTaskExecutor; - -import com.google.common.collect.Maps; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.List; -import java.util.Map; - -public final class ExportChecker extends MasterDaemon { - private static final Logger LOG = LogManager.getLogger(ExportChecker.class); - - // checkers for running job state - private static Map checkers = Maps.newHashMap(); - // executors for pending tasks - private static Map executors = Maps.newHashMap(); - private JobState jobState; - - private ExportChecker(JobState jobState, long intervalMs) { - super("export checker " + jobState.name().toLowerCase(), intervalMs); - this.jobState = jobState; - } - - public static void init(long intervalMs) { - checkers.put(JobState.PENDING, new ExportChecker(JobState.PENDING, intervalMs)); - checkers.put(JobState.EXPORTING, new ExportChecker(JobState.EXPORTING, intervalMs)); - - int poolSize = Config.export_running_job_num_limit == 0 ? 5 : Config.export_running_job_num_limit; - MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor("export-pending-job", poolSize, true); - executors.put(JobState.PENDING, pendingTaskExecutor); - - MasterTaskExecutor exportingTaskExecutor = new MasterTaskExecutor("export-exporting-job", poolSize, true); - executors.put(JobState.EXPORTING, exportingTaskExecutor); - } - - public static void startAll() { - for (ExportChecker exportChecker : checkers.values()) { - exportChecker.start(); - } - for (MasterTaskExecutor masterTaskExecutor : executors.values()) { - masterTaskExecutor.start(); - } - } - - @Override - protected void runAfterCatalogReady() { - LOG.debug("start check export jobs. job state: {}", jobState.name()); - switch (jobState) { - case PENDING: - runPendingJobs(); - break; - case EXPORTING: - runExportingJobs(); - break; - default: - LOG.warn("wrong export job state: {}", jobState.name()); - break; - } - } - - private void runPendingJobs() { - ExportMgr exportMgr = Env.getCurrentEnv().getExportMgr(); - List pendingJobs = exportMgr.getExportJobs(JobState.PENDING); - - // check to limit running etl job num - int runningJobNumLimit = Config.export_running_job_num_limit; - if (runningJobNumLimit > 0 && !pendingJobs.isEmpty()) { - // pending executor running + exporting state - int runningJobNum = executors.get(JobState.PENDING).getTaskNum() - + executors.get(JobState.EXPORTING).getTaskNum(); - if (runningJobNum >= runningJobNumLimit) { - LOG.info("running export job num {} exceeds system limit {}", runningJobNum, runningJobNumLimit); - return; - } - - int remain = runningJobNumLimit - runningJobNum; - if (pendingJobs.size() > remain) { - pendingJobs = pendingJobs.subList(0, remain); - } - } - - LOG.debug("pending export job num: {}", pendingJobs.size()); - - for (ExportJob job : pendingJobs) { - try { - MasterTask task = new ExportPendingTask(job); - if (executors.get(JobState.PENDING).submit(task)) { - LOG.info("run pending export job. job: {}", job); - } - } catch (Exception e) { - LOG.warn("run pending export job error", e); - } - } - } - - private void runExportingJobs() { - List jobs = Env.getCurrentEnv().getExportMgr().getExportJobs(JobState.EXPORTING); - LOG.debug("exporting export job num: {}", jobs.size()); - for (ExportJob job : jobs) { - try { - MasterTask task = new ExportExportingTask(job); - if (executors.get(JobState.EXPORTING).submit(task)) { - LOG.info("run exporting export job. job: {}", job); - } else { - LOG.info("fail to submit exporting job to executor. job: {}", job); - - } - } catch (Exception e) { - LOG.warn("run export exporting job error", e); - } - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportFailMsg.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportFailMsg.java index d4a320f5ac..591abe462d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportFailMsg.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportFailMsg.java @@ -20,6 +20,8 @@ package org.apache.doris.load; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import com.google.gson.annotations.SerializedName; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -33,7 +35,9 @@ public class ExportFailMsg implements Writable { UNKNOWN } + @SerializedName("cancelType") private CancelType cancelType; + @SerializedName("msg") private String msg; public ExportFailMsg() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index ab6ae0480c..53dd160a27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -17,35 +17,30 @@ package org.apache.doris.load; -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.BaseTableRef; import org.apache.doris.analysis.BrokerDesc; -import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.ExportStmt; import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.ExprSubstitutionMap; +import org.apache.doris.analysis.FromClause; +import org.apache.doris.analysis.LimitElement; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.analysis.OutFileClause; -import org.apache.doris.analysis.PartitionNames; -import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.QueryStmt; +import org.apache.doris.analysis.SelectList; +import org.apache.doris.analysis.SelectListItem; +import org.apache.doris.analysis.SelectStmt; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; -import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.analysis.StorageBackend.StorageType; import org.apache.doris.analysis.TableName; import org.apache.doris.analysis.TableRef; -import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.MysqlTable; -import org.apache.doris.catalog.OdbcTable; -import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.Type; import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; -import org.apache.doris.common.FeConstants; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.Pair; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; @@ -54,39 +49,29 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.planner.DataPartition; -import org.apache.doris.planner.ExportSink; -import org.apache.doris.planner.JdbcScanNode; -import org.apache.doris.planner.MysqlScanNode; -import org.apache.doris.planner.OdbcScanNode; -import org.apache.doris.planner.OlapScanNode; -import org.apache.doris.planner.PlanFragment; -import org.apache.doris.planner.PlanFragmentId; -import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.planner.ScanNode; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.Coordinator; import org.apache.doris.qe.OriginStatement; -import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.SessionVariable; -import org.apache.doris.qe.SqlModeHelper; -import org.apache.doris.rewrite.ExprRewriter; import org.apache.doris.system.Backend; import org.apache.doris.task.AgentClient; import org.apache.doris.thrift.TAgentResult; -import org.apache.doris.thrift.TFileType; import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPaloScanRange; +import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; +import org.apache.doris.thrift.TSnapshotRequest; import org.apache.doris.thrift.TStatusCode; -import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.thrift.TypesConstants; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import com.google.gson.annotations.SerializedName; +import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -95,14 +80,9 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.StringReader; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.Map.Entry; // NOTE: we must be carefully if we send next request // as soon as receiving one instance's report from one BE, @@ -110,72 +90,96 @@ import java.util.concurrent.atomic.AtomicInteger; public class ExportJob implements Writable { private static final Logger LOG = LogManager.getLogger(ExportJob.class); + private static final String BROKER_PROPERTY_PREFIXES = "broker."; + public enum JobState { PENDING, + IN_QUEUE, EXPORTING, FINISHED, CANCELLED, } + @SerializedName("id") private long id; + @SerializedName("queryId") private String queryId; + @SerializedName("label") private String label; - private String user; + @SerializedName("dbId") private long dbId; + @SerializedName("tableId") private long tableId; + @SerializedName("brokerDesc") private BrokerDesc brokerDesc; - private Expr whereExpr; + @SerializedName("exportPath") private String exportPath; + @SerializedName("columnSeparator") private String columnSeparator; + @SerializedName("lineDelimiter") private String lineDelimiter; - private Map properties = Maps.newHashMap(); + @SerializedName("partitions") private List partitions; + @SerializedName("tableName") private TableName tableName; - private String sql = ""; + @SerializedName("state") private JobState state; - // If set to true, the profile of export job with be pushed to ProfileManager - private volatile boolean enableProfile = false; + @SerializedName("createTimeMs") private long createTimeMs; - private long startTimeMs; - private long finishTimeMs; + // this is the origin stmt of ExportStmt, we use it to persist where expr of Export job, + // because we can not serialize the Expressions contained in job. + @SerializedName("origStmt") + private OriginStatement origStmt; + @SerializedName("qualifiedUser") + private String qualifiedUser; + @SerializedName("userIdentity") + private UserIdentity userIdentity; + @SerializedName("columns") + private String columns; + @SerializedName("format") + private String format; + @SerializedName("timeoutSecond") + private int timeoutSecond; + @SerializedName("maxFileSize") + private String maxFileSize; // progress has two functions at EXPORTING stage: // 1. when progress < 100, it indicates exporting // 2. set progress = 100 ONLY when exporting progress is completely done private int progress; + private long startTimeMs; + private long finishTimeMs; private ExportFailMsg failMsg; - private Set exportedFiles = Sets.newConcurrentHashSet(); + private String outfileInfo; - // descriptor used to register all column and table need - private final DescriptorTable desc; - private TupleDescriptor exportTupleDesc; - private ExportSink exportSink; + private TableRef tableRef; + + private Expr whereExpr; + + private String sql = ""; + + // If set to true, the profile of export job with be pushed to ProfileManager + private volatile boolean enableProfile = false; + + // The selectStmt is sql 'select ... into outfile ...' + @Getter + private List selectStmtList = Lists.newArrayList(); + + private List exportColumns = Lists.newArrayList(); - private Analyzer analyzer; private Table exportTable; - private List coordList = Lists.newArrayList(); - - private AtomicInteger nextId = new AtomicInteger(0); - // when set to true, means this job instance is created by replay thread(FE restarted or master changed) private boolean isReplayed = false; + private SessionVariable sessionVariables; + private Thread doExportingThread; private List tabletLocations = Lists.newArrayList(); // backend_address => snapshot path private List> snapshotPaths = Lists.newArrayList(); - // this is the origin stmt of ExportStmt, we use it to persist where expr of Export job, - // because we can not serialize the Expressions contained in job. - private OriginStatement origStmt; - protected Map sessionVariables = Maps.newHashMap(); - - private List exportColumns = Lists.newArrayList(); - private String columns; - - public ExportJob() { this.id = -1; this.queryId = ""; @@ -187,13 +191,11 @@ public class ExportJob implements Writable { this.startTimeMs = -1; this.finishTimeMs = -1; this.failMsg = new ExportFailMsg(ExportFailMsg.CancelType.UNKNOWN, ""); - this.analyzer = new Analyzer(Env.getCurrentEnv(), null); - this.desc = analyzer.getDescTbl(); + this.outfileInfo = ""; this.exportPath = ""; this.columnSeparator = "\t"; this.lineDelimiter = "\n"; this.columns = ""; - this.user = ""; } public ExportJob(long jobId) { @@ -208,19 +210,25 @@ public class ExportJob implements Writable { this.brokerDesc = stmt.getBrokerDesc(); this.columnSeparator = stmt.getColumnSeparator(); this.lineDelimiter = stmt.getLineDelimiter(); - this.properties = stmt.getProperties(); - this.label = this.properties.get(ExportStmt.LABEL); + this.label = stmt.getLabel(); this.queryId = ConnectContext.get() != null ? DebugUtil.printId(ConnectContext.get().queryId()) : "N/A"; - this.user = ConnectContext.get() != null ? ConnectContext.get().getQualifiedUser() : "N/A"; String path = stmt.getPath(); Preconditions.checkArgument(!Strings.isNullOrEmpty(path)); this.whereExpr = stmt.getWhereExpr(); this.exportPath = path; + this.sessionVariables = stmt.getSessionVariables(); + this.timeoutSecond = sessionVariables.getQueryTimeoutS(); + this.enableProfile = sessionVariables.enableProfile(); + this.qualifiedUser = stmt.getQualifiedUser(); + this.userIdentity = stmt.getUserIdentity(); + this.format = stmt.getFormat(); + this.maxFileSize = stmt.getMaxFileSize(); this.partitions = stmt.getPartitions(); this.exportTable = db.getTableOrDdlException(stmt.getTblName().getTbl()); this.columns = stmt.getColumns(); + this.tableRef = stmt.getTableRef(); if (!Strings.isNullOrEmpty(this.columns)) { Splitter split = Splitter.on(',').trimResults().omitEmptyStrings(); this.exportColumns = split.splitToList(stmt.getColumns().toLowerCase()); @@ -230,281 +238,71 @@ public class ExportJob implements Writable { this.dbId = db.getId(); this.tableId = exportTable.getId(); this.tableName = stmt.getTblName(); - genExecFragment(); + if (selectStmtList.isEmpty()) { + // This scenario is used for 'EXPORT TABLE tbl INTO PATH' + // we need generate Select Statement + generateQueryStmt(); + } } finally { exportTable.readUnlock(); } - this.sql = stmt.toSql(); this.origStmt = stmt.getOrigStmt(); - if (ConnectContext.get() != null) { - SessionVariable var = ConnectContext.get().getSessionVariable(); - this.sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode())); - this.enableProfile = var.enableProfile(); - } else { - this.sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT)); - } } - private void genExecFragment() throws UserException { - registerToDesc(); - String tmpExportPathStr = getExportPath(); - // broker will upload file to tp path and than rename to the final file - if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) { - tmpExportPathStr = tmpExportPathStr + "/__doris_export_tmp_" + id + "/"; - } - try { - URI uri = new URI(tmpExportPathStr); - tmpExportPathStr = uri.normalize().toString(); - } catch (URISyntaxException e) { - throw new DdlException("Invalid export path: " + getExportPath()); - } - String headerStr = genHeader(this.properties); - exportSink = new ExportSink(tmpExportPathStr, getColumnSeparator(), getLineDelimiter(), brokerDesc, headerStr); - plan(); - } - - - private String genNames() { - String names = ""; - for (SlotDescriptor slot : exportTupleDesc.getSlots()) { - names = names + slot.getColumn().getName() + getColumnSeparator(); - } - names = names.substring(0, names.length() - getColumnSeparator().length()); - names = names + getLineDelimiter(); - return names; - } - - private String genTypes() { - String types = ""; - for (SlotDescriptor slot : exportTupleDesc.getSlots()) { - types = types + slot.getColumn().getType().toString() + getColumnSeparator(); - } - types = types.substring(0, types.length() - getColumnSeparator().length()); - types = types + getLineDelimiter(); - return types; - } - - private String genHeader(Map properties) { - String header = ""; - if (properties.containsKey("format")) { - String headerType = properties.get("format"); - if (headerType.equals(FeConstants.csv_with_names)) { - header = genNames(); - } else if (headerType.equals(FeConstants.csv_with_names_and_types)) { - header = genNames(); - header += genTypes(); - } - } - return header; - } - - private void registerToDesc() throws UserException { - TableRef ref = new TableRef(tableName, null, partitions == null ? null : new PartitionNames(false, partitions)); - BaseTableRef tableRef = new BaseTableRef(ref, exportTable, tableName); - analyzer.registerTableRef(tableRef); - exportTupleDesc = desc.createTupleDescriptor(); - exportTupleDesc.setTable(exportTable); - exportTupleDesc.setRef(tableRef); - exportTupleDesc.setAliases(tableRef.getAliases(), tableRef.hasExplicitAlias()); + private void generateQueryStmt() { + SelectList list = new SelectList(); if (exportColumns.isEmpty()) { - for (Column column : exportTable.getBaseSchema()) { - SlotDescriptor slot = desc.addSlotDescriptor(exportTupleDesc); - slot.setIsMaterialized(true); - slot.setColumn(column); - slot.setIsNullable(column.isAllowNull()); - } + list.addItem(SelectListItem.createStarItem(this.tableName)); } else { for (Column column : exportTable.getBaseSchema()) { String colName = column.getName().toLowerCase(); if (exportColumns.contains(colName)) { - SlotDescriptor slot = desc.addSlotDescriptor(exportTupleDesc); - slot.setIsMaterialized(true); - slot.setColumn(column); - slot.setIsNullable(column.isAllowNull()); + SlotRef slotRef = new SlotRef(this.tableName, colName); + SelectListItem selectListItem = new SelectListItem(slotRef, null); + list.addItem(selectListItem); } } } - desc.computeStatAndMemLayout(); + + List tableRefList = Lists.newArrayList(); + tableRefList.add(this.tableRef); + FromClause fromClause = new FromClause(tableRefList); + + SelectStmt selectStmt = new SelectStmt(list, fromClause, this.whereExpr, null, + null, null, LimitElement.NO_LIMIT); + // generate outfile clause + OutFileClause outfile = new OutFileClause(this.exportPath, this.format, convertOutfileProperties()); + selectStmt.setOutFileClause(outfile); + selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0)); + selectStmtList.add(selectStmt); } - private void plan() throws UserException { - List fragments = Lists.newArrayList(); - List scanNodes = Lists.newArrayList(); + private Map convertOutfileProperties() { + Map outfileProperties = Maps.newHashMap(); - // analyze where expr - analyzeWhereExpr(); - // only for - if (exportTable.getType() != Table.TableType.OLAP) { - // not olap scan node - ScanNode scanNode = genScanNode(); - PlanFragment fragment = genPlanFragment(exportTable.getType(), scanNode); - scanNodes.add(scanNode); - fragments.add(fragment); + // file properties + if (format.equals("csv") || format.equals("csv_with_names") || format.equals("csv_with_names_and_types")) { + outfileProperties.put(OutFileClause.PROP_COLUMN_SEPARATOR, columnSeparator); + outfileProperties.put(OutFileClause.PROP_LINE_DELIMITER, lineDelimiter); + } + if (!maxFileSize.isEmpty()) { + outfileProperties.put(OutFileClause.PROP_MAX_FILE_SIZE, maxFileSize); + } + + // broker properties + // outfile clause's broker properties need 'broker.' prefix + if (brokerDesc.getStorageType() == StorageType.BROKER) { + outfileProperties.put(BROKER_PROPERTY_PREFIXES + "name", brokerDesc.getName()); + for (Entry kv : brokerDesc.getProperties().entrySet()) { + outfileProperties.put(BROKER_PROPERTY_PREFIXES + kv.getKey(), kv.getValue()); + } } else { - // The function of this scan node is only to get the tabletlocation. - ScanNode tmpOlapScanNode = genScanNode(); - tabletLocations = tmpOlapScanNode.getScanRangeLocations(0); - for (TScanRangeLocations tablet : tabletLocations) { - List locations = tablet.getLocations(); - Collections.shuffle(locations); - tablet.setLocations(locations.subList(0, 1)); - } - - int size = tabletLocations.size(); - int tabletNum = getTabletNumberPerTask(); - for (int i = 0; i < size; i += tabletNum) { - OlapScanNode olapScanNode = null; - if (i + tabletNum <= size) { - olapScanNode = genOlapScanNodeByLocation(tabletLocations.subList(i, i + tabletNum)); - } else { - olapScanNode = genOlapScanNodeByLocation(tabletLocations.subList(i, size)); - } - PlanFragment fragment = genPlanFragment(exportTable.getType(), olapScanNode); - - fragments.add(fragment); - scanNodes.add(olapScanNode); - } - LOG.info("total {} tablets of export job {}, and assign them to {} coordinators", - size, id, fragments.size()); - } - - // add conjunct - if (whereExpr != null) { - for (ScanNode scanNode : scanNodes) { - scanNode.addConjuncts(whereExpr.getConjuncts()); + for (Entry kv : brokerDesc.getProperties().entrySet()) { + outfileProperties.put(kv.getKey(), kv.getValue()); } } - - genCoordinators(fragments, scanNodes); - } - - private void analyzeWhereExpr() throws UserException { - if (whereExpr == null) { - return; - } - whereExpr = analyzer.getExprRewriter().rewrite(whereExpr, analyzer, ExprRewriter.ClauseType.WHERE_CLAUSE); - - // analyze where slot ref - Map dstDescMap = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - for (SlotDescriptor slotDescriptor : exportTupleDesc.getSlots()) { - dstDescMap.put(slotDescriptor.getColumn().getName(), slotDescriptor); - } - List slots = Lists.newArrayList(); - whereExpr.collect(SlotRef.class, slots); - ExprSubstitutionMap smap = new ExprSubstitutionMap(); - for (SlotRef slot : slots) { - SlotDescriptor slotDesc = dstDescMap.get(slot.getColumnName()); - if (slotDesc == null) { - throw new UserException("unknown column reference in where statement, reference=" - + slot.getColumnName()); - } - smap.getLhs().add(slot); - smap.getRhs().add(new SlotRef(slotDesc)); - } - whereExpr = whereExpr.clone(smap); - - whereExpr.analyze(analyzer); - if (!whereExpr.getType().equals(Type.BOOLEAN)) { - throw new UserException("where statement is not a valid statement return bool"); - } - } - - private ScanNode genScanNode() throws UserException { - ScanNode scanNode = null; - switch (exportTable.getType()) { - case OLAP: - scanNode = new OlapScanNode(new PlanNodeId(0), exportTupleDesc, "OlapScanNodeForExport"); - ((OlapScanNode) scanNode).closePreAggregation("This an export operation"); - ((OlapScanNode) scanNode).selectBestRollupByRollupSelector(analyzer); - break; - case ODBC: - scanNode = new OdbcScanNode(new PlanNodeId(0), exportTupleDesc, (OdbcTable) this.exportTable); - break; - case MYSQL: - scanNode = new MysqlScanNode(new PlanNodeId(0), exportTupleDesc, (MysqlTable) this.exportTable); - break; - case JDBC: - scanNode = new JdbcScanNode(new PlanNodeId(0), exportTupleDesc, false); - break; - default: - break; - } - if (scanNode != null) { - scanNode.init(analyzer); - scanNode.finalize(analyzer); - } - - return scanNode; - } - - private OlapScanNode genOlapScanNodeByLocation(List locations) { - OlapScanNode olapScanNode = OlapScanNode.createOlapScanNodeByLocation( - new PlanNodeId(nextId.getAndIncrement()), - exportTupleDesc, - "OlapScanNodeForExport", - locations); - - return olapScanNode; - } - - private PlanFragment genPlanFragment(Table.TableType type, ScanNode scanNode) throws UserException { - PlanFragment fragment = null; - switch (exportTable.getType()) { - case OLAP: - fragment = new PlanFragment( - new PlanFragmentId(nextId.getAndIncrement()), scanNode, DataPartition.RANDOM); - break; - case ODBC: - case JDBC: - case MYSQL: - fragment = new PlanFragment( - new PlanFragmentId(nextId.getAndIncrement()), scanNode, DataPartition.UNPARTITIONED); - break; - default: - break; - } - fragment.setOutputExprs(createOutputExprs()); - - scanNode.setFragmentId(fragment.getFragmentId()); - fragment.setSink(exportSink); - try { - fragment.finalize(null); - } catch (Exception e) { - LOG.info("Fragment finalize failed. e= {}", e); - throw new UserException("Fragment finalize failed"); - } - - return fragment; - } - - private List createOutputExprs() { - List outputExprs = Lists.newArrayList(); - for (int i = 0; i < exportTupleDesc.getSlots().size(); ++i) { - SlotDescriptor slotDesc = exportTupleDesc.getSlots().get(i); - SlotRef slotRef = new SlotRef(slotDesc); - if (slotDesc.getType().getPrimitiveType() == PrimitiveType.CHAR) { - slotRef.setType(Type.CHAR); - } - outputExprs.add(slotRef); - } - - return outputExprs; - } - - private void genCoordinators(List fragments, List nodes) { - UUID uuid = UUID.randomUUID(); - for (int i = 0; i < fragments.size(); ++i) { - PlanFragment fragment = fragments.get(i); - ScanNode scanNode = nodes.get(i); - TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits() + i, uuid.getLeastSignificantBits()); - Coordinator coord = new Coordinator( - id, queryId, desc, Lists.newArrayList(fragment), Lists.newArrayList(scanNode), - TimeUtils.DEFAULT_TIME_ZONE, true); - coord.setExecMemoryLimit(getExecMemLimit()); - this.coordList.add(coord); - } - LOG.info("create {} coordinators for export job: {}", coordList.size(), id); + return outfileProperties; } public String getColumns() { @@ -543,16 +341,6 @@ public class ExportJob implements Writable { return exportPath; } - public String getShowExportPath() { - if (brokerDesc.getFileType() == TFileType.FILE_LOCAL) { - StringBuilder sb = new StringBuilder(); - sb.append(OutFileClause.LOCAL_FILE_PREFIX.substring(0, OutFileClause.LOCAL_FILE_PREFIX.length() - 1)); - sb.append(exportPath); - return sb.toString(); - } - return exportPath; - } - public String getColumnSeparator() { return this.columnSeparator; } @@ -561,26 +349,24 @@ public class ExportJob implements Writable { return this.lineDelimiter; } - public long getExecMemLimit() { - return Long.parseLong(properties.get(LoadStmt.EXEC_MEM_LIMIT)); - } - public int getTimeoutSecond() { - if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) { - return Integer.parseInt(properties.get(LoadStmt.TIMEOUT_PROPERTY)); - } else { - // for compatibility, some export job in old version does not have this property. use default. - return Config.export_task_default_timeout_second; - } + return timeoutSecond; } - public int getTabletNumberPerTask() { - if (properties.containsKey(ExportStmt.TABLET_NUMBER_PER_TASK_PROP)) { - return Integer.parseInt(properties.get(ExportStmt.TABLET_NUMBER_PER_TASK_PROP)); - } else { - // for compatibility, some export job in old version does not have this property. use default. - return Config.export_tablet_num_per_task; - } + public String getFormat() { + return format; + } + + public String getMaxFileSize() { + return maxFileSize; + } + + public String getQualifiedUser() { + return qualifiedUser; + } + + public UserIdentity getUserIdentity() { + return userIdentity; } public List getPartitions() { @@ -595,10 +381,6 @@ public class ExportJob implements Writable { this.progress = progress; } - public void setFailMsg(ExportFailMsg failMsg) { - this.failMsg = failMsg; - } - public long getCreateTimeMs() { return createTimeMs; } @@ -607,23 +389,35 @@ public class ExportJob implements Writable { return startTimeMs; } + public void setStartTimeMs(long startTimeMs) { + this.startTimeMs = startTimeMs; + } + public long getFinishTimeMs() { return finishTimeMs; } + public void setFinishTimeMs(long finishTimeMs) { + this.finishTimeMs = finishTimeMs; + } + public ExportFailMsg getFailMsg() { return failMsg; } - public Set getExportedFiles() { - return this.exportedFiles; + public void setFailMsg(ExportFailMsg failMsg) { + this.failMsg = failMsg; } - public synchronized void addExportedFiles(List files) { - exportedFiles.addAll(files); - LOG.debug("exported files: {}", this.exportedFiles); + public String getOutfileInfo() { + return outfileInfo; } + public void setOutfileInfo(String outfileInfo) { + this.outfileInfo = outfileInfo; + } + + public synchronized Thread getDoExportingThread() { return doExportingThread; } @@ -632,10 +426,6 @@ public class ExportJob implements Writable { this.doExportingThread = isExportingThread; } - public List getCoordList() { - return coordList; - } - public List getTabletLocations() { return tabletLocations; } @@ -656,55 +446,70 @@ public class ExportJob implements Writable { return tableName; } + public SessionVariable getSessionVariables() { + return sessionVariables; + } + public synchronized void cancel(ExportFailMsg.CancelType type, String msg) { if (msg != null) { failMsg = new ExportFailMsg(type, msg); } if (updateState(ExportJob.JobState.CANCELLED, false)) { - // cancel all running coordinators, so that the scheduler's worker thread will be released - for (Coordinator coordinator : coordList) { - Coordinator registeredCoordinator = QeProcessorImpl.INSTANCE.getCoordinator(coordinator.getQueryId()); - if (registeredCoordinator != null) { - registeredCoordinator.cancel(); - } - } - // release snapshot - Status releaseSnapshotStatus = releaseSnapshotPaths(); - if (!releaseSnapshotStatus.ok()) { - // snapshot will be removed by GC thread on BE, finally. - LOG.warn("failed to release snapshot for export job: {}. err: {}", id, - releaseSnapshotStatus.getErrorMsg()); - } + // Status releaseSnapshotStatus = releaseSnapshotPaths(); + // if (!releaseSnapshotStatus.ok()) { + // // snapshot will be removed by GC thread on BE, finally. + // LOG.warn("failed to release snapshot for export job: {}. err: {}", id, + // releaseSnapshotStatus.getErrorMsg()); + // } } } + public synchronized boolean finish(List outfileInfoList) { + outfileInfo = GsonUtils.GSON.toJson(outfileInfoList); + if (updateState(ExportJob.JobState.FINISHED)) { + return true; + } + return false; + } + public synchronized boolean updateState(ExportJob.JobState newState) { return this.updateState(newState, false); } public synchronized boolean updateState(ExportJob.JobState newState, boolean isReplay) { - if (isFinalState()) { + // We do not persist EXPORTING state in new version of metadata, + // but EXPORTING state may still exist in older versions of metadata. + // So if isReplay == true and newState == EXPORTING, we just ignore this update. + if (isFinalState() || (isReplay && newState == JobState.EXPORTING)) { return false; } state = newState; switch (newState) { case PENDING: + case IN_QUEUE: progress = 0; break; case EXPORTING: - startTimeMs = System.currentTimeMillis(); + // if isReplay == true, startTimeMs will be read from log + if (!isReplay) { + startTimeMs = System.currentTimeMillis(); + } break; case FINISHED: case CANCELLED: - finishTimeMs = System.currentTimeMillis(); + // if isReplay == true, finishTimeMs will be read from log + if (!isReplay) { + finishTimeMs = System.currentTimeMillis(); + } progress = 100; break; default: Preconditions.checkState(false, "wrong job state: " + newState.name()); break; } - if (!isReplay) { + // we only persist Pending/Cancel/Finish state + if (!isReplay && newState != JobState.IN_QUEUE && newState != JobState.EXPORTING) { Env.getCurrentEnv().getEditLog().logExportUpdateState(id, newState); } return true; @@ -714,6 +519,52 @@ public class ExportJob implements Writable { return this.state == ExportJob.JobState.CANCELLED || this.state == ExportJob.JobState.FINISHED; } + private Status makeSnapshots() { + List tabletLocations = getTabletLocations(); + if (tabletLocations == null) { + return Status.OK; + } + for (TScanRangeLocations tablet : tabletLocations) { + TScanRange scanRange = tablet.getScanRange(); + if (!scanRange.isSetPaloScanRange()) { + continue; + } + TPaloScanRange paloScanRange = scanRange.getPaloScanRange(); + List locations = tablet.getLocations(); + for (TScanRangeLocation location : locations) { + TNetworkAddress address = location.getServer(); + String host = address.getHostname(); + int port = address.getPort(); + Backend backend = Env.getCurrentSystemInfo().getBackendWithBePort(host, port); + if (backend == null) { + return Status.CANCELLED; + } + long backendId = backend.getId(); + if (!Env.getCurrentSystemInfo().checkBackendQueryAvailable(backendId)) { + return Status.CANCELLED; + } + TSnapshotRequest snapshotRequest = new TSnapshotRequest(); + snapshotRequest.setTabletId(paloScanRange.getTabletId()); + snapshotRequest.setSchemaHash(Integer.parseInt(paloScanRange.getSchemaHash())); + snapshotRequest.setVersion(Long.parseLong(paloScanRange.getVersion())); + snapshotRequest.setTimeout(getTimeoutSecond()); + snapshotRequest.setPreferredSnapshotVersion(TypesConstants.TPREFER_SNAPSHOT_REQ_VERSION); + + AgentClient client = new AgentClient(host, port); + TAgentResult result = client.makeSnapshot(snapshotRequest); + if (result == null || result.getStatus().getStatusCode() != TStatusCode.OK) { + String err = "snapshot for tablet " + paloScanRange.getTabletId() + " failed on backend " + + address.toString() + ". reason: " + + (result == null ? "unknown" : result.getStatus().error_msgs); + LOG.warn("{}, export job: {}", err, id); + return new Status(TStatusCode.CANCELLED, err); + } + addSnapshotPath(Pair.of(address, result.getSnapshotPath())); + } + } + return Status.OK; + } + public Status releaseSnapshotPaths() { List> snapshotPaths = getSnapshotPaths(); LOG.debug("snapshotPaths:{}", snapshotPaths); @@ -753,10 +604,6 @@ public class ExportJob implements Writable { return queryId; } - public String getUser() { - return user; - } - public boolean getEnableProfile() { return enableProfile; } @@ -775,68 +622,28 @@ public class ExportJob implements Writable { + ", exportStartTimeMs=" + TimeUtils.longToTimeString(startTimeMs) + ", exportFinishTimeMs=" + TimeUtils.longToTimeString(finishTimeMs) + ", failMsg=" + failMsg - + ", files=(" + StringUtils.join(exportedFiles, ",") + ")" + "]"; } public static ExportJob read(DataInput in) throws IOException { - ExportJob job = new ExportJob(); - job.readFields(in); + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_120) { + ExportJob job = new ExportJob(); + job.readFields(in); + return job; + } + String json = Text.readString(in); + ExportJob job = GsonUtils.GSON.fromJson(json, ExportJob.class); + job.isReplayed = true; return job; } @Override public void write(DataOutput out) throws IOException { - // base infos - out.writeLong(id); - out.writeLong(dbId); - out.writeLong(tableId); - Text.writeString(out, exportPath); - Text.writeString(out, columnSeparator); - Text.writeString(out, lineDelimiter); - out.writeInt(properties.size()); - for (Map.Entry property : properties.entrySet()) { - Text.writeString(out, property.getKey()); - Text.writeString(out, property.getValue()); - } - - // partitions - boolean hasPartition = (partitions != null); - if (hasPartition) { - out.writeBoolean(true); - int partitionSize = partitions.size(); - out.writeInt(partitionSize); - for (String partitionName : partitions) { - Text.writeString(out, partitionName); - } - } else { - out.writeBoolean(false); - } - - // task info - Text.writeString(out, state.name()); - out.writeLong(createTimeMs); - out.writeLong(startTimeMs); - out.writeLong(finishTimeMs); - out.writeInt(progress); - failMsg.write(out); - - if (brokerDesc == null) { - out.writeBoolean(false); - } else { - out.writeBoolean(true); - brokerDesc.write(out); - } - tableName.write(out); - - origStmt.write(out); - out.writeInt(sessionVariables.size()); - for (Map.Entry entry : sessionVariables.entrySet()) { - Text.writeString(out, entry.getKey()); - Text.writeString(out, entry.getValue()); - } + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } + @Deprecated private void readFields(DataInput in) throws IOException { isReplayed = true; id = in.readLong(); @@ -846,11 +653,13 @@ public class ExportJob implements Writable { columnSeparator = Text.readString(in); lineDelimiter = Text.readString(in); + // properties + Map properties = Maps.newHashMap(); int count = in.readInt(); for (int i = 0; i < count; i++) { String propertyKey = Text.readString(in); String propertyValue = Text.readString(in); - this.properties.put(propertyKey, propertyValue); + properties.put(propertyKey, propertyValue); } // Because before 0.15, export does not contain label information. // So for compatibility, a label will be added for historical jobs. @@ -858,7 +667,7 @@ public class ExportJob implements Writable { // the label from being different each time. properties.putIfAbsent(ExportStmt.LABEL, "export_" + id); this.label = properties.get(ExportStmt.LABEL); - this.columns = this.properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS); + this.columns = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS); if (!Strings.isNullOrEmpty(this.columns)) { Splitter split = Splitter.on(',').trimResults().omitEmptyStrings(); this.exportColumns = split.splitToList(this.columns.toLowerCase()); @@ -887,11 +696,13 @@ public class ExportJob implements Writable { tableName = new TableName(); tableName.readFields(in); origStmt = OriginStatement.read(in); + + Map tmpSessionVariables = Maps.newHashMap(); int size = in.readInt(); for (int i = 0; i < size; i++) { String key = Text.readString(in); String value = Text.readString(in); - sessionVariables.put(key, value); + tmpSessionVariables.put(key, value); } if (origStmt.originStmt.isEmpty()) { @@ -899,7 +710,7 @@ public class ExportJob implements Writable { } // parse the origin stmt to get where expr SqlParser parser = new SqlParser(new SqlScanner(new StringReader(origStmt.originStmt), - Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE)))); + Long.valueOf(tmpSessionVariables.get(SessionVariable.SQL_MODE)))); ExportStmt stmt = null; try { stmt = (ExportStmt) SqlParserUtils.getStmt(parser, origStmt.idx); @@ -934,17 +745,36 @@ public class ExportJob implements Writable { // for only persist op when switching job state. public static class StateTransfer implements Writable { + @SerializedName("jobId") long jobId; + @SerializedName("state") JobState state; + @SerializedName("startTimeMs") + private long startTimeMs; + @SerializedName("finishTimeMs") + private long finishTimeMs; + @SerializedName("failMsg") + private ExportFailMsg failMsg; + @SerializedName("outFileInfo") + private String outFileInfo; + // used for reading from one log public StateTransfer() { this.jobId = -1; this.state = JobState.CANCELLED; + this.failMsg = new ExportFailMsg(ExportFailMsg.CancelType.UNKNOWN, ""); + this.outFileInfo = ""; } + // used for persisting one log public StateTransfer(long jobId, JobState state) { this.jobId = jobId; this.state = state; + ExportJob job = Env.getCurrentEnv().getExportMgr().getJob(jobId); + this.startTimeMs = job.getStartTimeMs(); + this.finishTimeMs = job.getFinishTimeMs(); + this.failMsg = job.getFailMsg(); + this.outFileInfo = job.getOutfileInfo(); } public long getJobId() { @@ -957,13 +787,83 @@ public class ExportJob implements Writable { @Override public void write(DataOutput out) throws IOException { - out.writeLong(jobId); - Text.writeString(out, state.name()); + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } - public void readFields(DataInput in) throws IOException { + public static StateTransfer read(DataInput in) throws IOException { + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_120) { + StateTransfer transfer = new StateTransfer(); + transfer.readFields(in); + return transfer; + } + String json = Text.readString(in); + StateTransfer transfer = GsonUtils.GSON.fromJson(json, ExportJob.StateTransfer.class); + return transfer; + } + + private void readFields(DataInput in) throws IOException { jobId = in.readLong(); state = JobState.valueOf(Text.readString(in)); } + + public long getStartTimeMs() { + return startTimeMs; + } + + public long getFinishTimeMs() { + return finishTimeMs; + } + + public String getOutFileInfo() { + return outFileInfo; + } + + public ExportFailMsg getFailMsg() { + return failMsg; + } + } + + public static class OutfileInfo { + @SerializedName("fileNumber") + private String fileNumber; + @SerializedName("totalRows") + private String totalRows; + @SerializedName("fileSize") + private String fileSize; + @SerializedName("url") + private String url; + + public String getUrl() { + return url; + } + + public void setUrl(String url) { + this.url = url; + } + + public String getFileNumber() { + return fileNumber; + } + + public void setFileNumber(String fileNumber) { + this.fileNumber = fileNumber; + } + + public String getTotalRows() { + return totalRows; + } + + public void setTotalRows(String totalRows) { + this.totalRows = totalRows; + } + + public String getFileSize() { + return fileSize; + } + + public void setFileSize(String fileSize) { + this.fileSize = fileSize; + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index 5b7210741f..aa064226dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -32,10 +32,15 @@ import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.PatternMatcherWrapper; import org.apache.doris.common.util.ListComparator; +import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.OrderByPair; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.load.ExportJob.JobState; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.task.ExportExportingTask; +import org.apache.doris.task.MasterTask; +import org.apache.doris.task.MasterTaskExecutor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; @@ -57,7 +62,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; import java.util.stream.Collectors; -public class ExportMgr { +public class ExportMgr extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(ExportJob.class); // lock for export job @@ -67,7 +72,11 @@ public class ExportMgr { private Map idToJob = Maps.newHashMap(); // exportJobId to exportJob private Map labelToJobId = Maps.newHashMap(); + private MasterTaskExecutor exportingExecutor; + public ExportMgr() { + int poolSize = Config.export_running_job_num_limit == 0 ? 5 : Config.export_running_job_num_limit; + exportingExecutor = new MasterTaskExecutor("export-exporting-job", poolSize, true); } public void readLock() { @@ -86,6 +95,53 @@ public class ExportMgr { lock.writeLock().unlock(); } + @Override + public synchronized void start() { + super.start(); + exportingExecutor.start(); + } + + @Override + protected void runAfterCatalogReady() { + List pendingJobs = getExportJobs(JobState.PENDING); + List newInQueueJobs = Lists.newArrayList(); + for (ExportJob job : pendingJobs) { + if (handlePendingJobs(job)) { + newInQueueJobs.add(job); + } + } + LOG.debug("new IN_QUEUE export job num: {}", newInQueueJobs.size()); + for (ExportJob job : newInQueueJobs) { + try { + MasterTask task = new ExportExportingTask(job); + if (exportingExecutor.submit(task)) { + LOG.info("success to submit IN_QUEUE export job. job: {}", job); + } else { + LOG.info("fail to submit IN_QUEUE job to executor. job: {}", job); + + } + } catch (Exception e) { + LOG.warn("run export exporting job {}.", job, e); + } + } + } + + private boolean handlePendingJobs(ExportJob job) { + if (job.isReplayed()) { + // If the job is created from replay thread, all plan info will be lost. + // so the job has to be cancelled. + String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled."; + job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg); + return false; + } + + if (job.updateState(JobState.IN_QUEUE)) { + LOG.info("Exchange pending status to in_queue status success. job: {}", job); + return true; + } + return false; + } + public List getJobs() { return Lists.newArrayList(idToJob.values()); } @@ -169,6 +225,17 @@ public class ExportMgr { return job; } + public ExportJob getJob(long jobId) { + ExportJob job = null; + readLock(); + try { + job = idToJob.get(jobId); + } finally { + readUnlock(); + } + return job; + } + public List getExportJobs(ExportJob.JobState state) { List result = Lists.newArrayList(); readLock(); @@ -185,6 +252,7 @@ public class ExportMgr { return result; } + // used for `show export` statement // NOTE: jobid and states may both specified, or only one of them, or neither public List> getExportJobInfosByIdOrState( long dbId, long jobId, String label, boolean isLabelUseLike, Set states, @@ -341,15 +409,15 @@ public class ExportMgr { } infoMap.put("partitions", partitions); infoMap.put("broker", job.getBrokerDesc().getName()); - infoMap.put("column separator", job.getColumnSeparator()); - infoMap.put("line delimiter", job.getLineDelimiter()); - infoMap.put("exec mem limit", job.getExecMemLimit()); + infoMap.put("column_separator", job.getColumnSeparator()); + infoMap.put("format", job.getFormat()); + infoMap.put("line_delimiter", job.getLineDelimiter()); infoMap.put("columns", job.getColumns()); - infoMap.put("coord num", job.getCoordList().size()); - infoMap.put("tablet num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size()); + infoMap.put("tablet_num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size()); + infoMap.put("max_file_size", job.getMaxFileSize()); jobInfo.add(new Gson().toJson(infoMap)); // path - jobInfo.add(job.getShowExportPath()); + jobInfo.add(job.getExportPath()); jobInfo.add(TimeUtils.longToTimeString(job.getCreateTimeMs())); jobInfo.add(TimeUtils.longToTimeString(job.getStartTimeMs())); @@ -364,6 +432,13 @@ public class ExportMgr { jobInfo.add(FeConstants.null_string); } + // outfileInfo + if (job.getState() == JobState.FINISHED) { + jobInfo.add(job.getOutfileInfo()); + } else { + jobInfo.add(FeConstants.null_string); + } + return jobInfo; } @@ -397,11 +472,15 @@ public class ExportMgr { } } - public void replayUpdateJobState(long jobId, ExportJob.JobState newState) { + public void replayUpdateJobState(ExportJob.StateTransfer stateTransfer) { readLock(); try { - ExportJob job = idToJob.get(jobId); - job.updateState(newState, true); + ExportJob job = idToJob.get(stateTransfer.getJobId()); + job.updateState(stateTransfer.getState(), true); + job.setStartTimeMs(stateTransfer.getStartTimeMs()); + job.setFinishTimeMs(stateTransfer.getFinishTimeMs()); + job.setFailMsg(stateTransfer.getFailMsg()); + job.setOutfileInfo(stateTransfer.getOutFileInfo()); } finally { readUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 692937a486..9ca0894d24 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -354,7 +354,7 @@ public class EditLog { case OperationType.OP_EXPORT_UPDATE_STATE: ExportJob.StateTransfer op = (ExportJob.StateTransfer) journal.getData(); ExportMgr exportMgr = env.getExportMgr(); - exportMgr.replayUpdateJobState(op.getJobId(), op.getState()); + exportMgr.replayUpdateJobState(op); break; case OperationType.OP_FINISH_DELETE: { DeleteInfo info = (DeleteInfo) journal.getData(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index f25bb5e4f8..47f42a18c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -156,6 +156,8 @@ public class ConnectContext { private StatsErrorEstimator statsErrorEstimator; + private Map resultAttachedInfo; + public void setUserQueryTimeout(int queryTimeout) { if (queryTimeout > 0) { sessionVariable.setQueryTimeoutS(queryTimeout); @@ -366,6 +368,10 @@ public class ConnectContext { return sessionVariable; } + public void setSessionVariable(SessionVariable sessionVariable) { + this.sessionVariable = sessionVariable; + } + public ConnectScheduler getConnectScheduler() { return connectScheduler; } @@ -651,6 +657,14 @@ public class ConnectContext { } } + public void setResultAttachedInfo(Map resultAttachedInfo) { + this.resultAttachedInfo = resultAttachedInfo; + } + + public Map getResultAttachedInfo() { + return resultAttachedInfo; + } + public class ThreadInfo { public boolean isFull; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 454012875d..39300235b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1365,6 +1365,7 @@ public class StmtExecutor implements ProfileWriter { } plannerProfile.freshWriteResultConsumeTime(); context.updateReturnRows(batch.getBatch().getRows().size()); + context.setResultAttachedInfo(batch.getBatch().getAttachedInfos()); } if (batch.isEos()) { break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java index 2a6bed2d9f..08af959e22 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java @@ -17,13 +17,10 @@ package org.apache.doris.task; -import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.analysis.OutFileClause; +import org.apache.doris.analysis.QueryStmt; +import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.FsBroker; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.ClientPool; -import org.apache.doris.common.Status; -import org.apache.doris.common.UserException; import org.apache.doris.common.Version; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.ProfileManager; @@ -31,40 +28,32 @@ import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.load.ExportFailMsg; import org.apache.doris.load.ExportJob; -import org.apache.doris.qe.Coordinator; -import org.apache.doris.qe.QeProcessorImpl; -import org.apache.doris.service.FrontendOptions; -import org.apache.doris.thrift.TBrokerOperationStatus; -import org.apache.doris.thrift.TBrokerOperationStatusCode; -import org.apache.doris.thrift.TBrokerRenamePathRequest; -import org.apache.doris.thrift.TBrokerVersion; -import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TPaloBrokerService; -import org.apache.doris.thrift.TStatusCode; +import org.apache.doris.load.ExportJob.JobState; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState.MysqlStateType; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TUniqueId; import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.thrift.TException; import java.util.List; -import java.util.Set; +import java.util.Map; import java.util.UUID; public class ExportExportingTask extends MasterTask { private static final Logger LOG = LogManager.getLogger(ExportExportingTask.class); - private static final int RETRY_NUM = 2; protected final ExportJob job; - private boolean isCancelled = false; - private Status failStatus = Status.OK; - private ExportFailMsg.CancelType cancelType = ExportFailMsg.CancelType.UNKNOWN; - private RuntimeProfile profile = new RuntimeProfile("Export"); private List fragmentProfiles = Lists.newArrayList(); + private StmtExecutor stmtExecutor; + public ExportExportingTask(ExportJob job) { this.job = job; this.signature = job.getId(); @@ -72,6 +61,10 @@ public class ExportExportingTask extends MasterTask { @Override protected void exec() { + if (job.getState() == JobState.IN_QUEUE) { + handleInQueueState(); + } + if (job.getState() != ExportJob.JobState.EXPORTING) { return; } @@ -85,93 +78,60 @@ public class ExportExportingTask extends MasterTask { job.setDoExportingThread(Thread.currentThread()); } - if (job.isReplayed()) { - // If the job is created from replay thread, all plan info will be lost. - // so the job has to be cancelled. - String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled."; - job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg); - return; - } - - // if one instance finished, we send request to BE to exec next instance - List coords = job.getCoordList(); - int coordSize = coords.size(); - for (int i = 0; i < coordSize; i++) { - if (isCancelled) { - break; - } - Coordinator coord = coords.get(i); - for (int j = 0; j < RETRY_NUM; ++j) { - execOneCoord(coord); - if (coord.getExecStatus().ok()) { + List selectStmtList = job.getSelectStmtList(); + boolean isFailed = false; + ExportFailMsg errorMsg = null; + int completeTaskNum = 0; + List outfileInfoList = Lists.newArrayList(); + // begin exporting + for (int i = 0; i < selectStmtList.size(); ++i) { + try (AutoCloseConnectContext r = buildConnectContext()) { + this.stmtExecutor = new StmtExecutor(r.connectContext, selectStmtList.get(i)); + this.stmtExecutor.execute(); + if (r.connectContext.getState().getStateType() == MysqlStateType.ERR) { + errorMsg = new ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL, + r.connectContext.getState().getErrorMessage()); + isFailed = true; break; } - if (j < RETRY_NUM - 1) { - TUniqueId queryId = coord.getQueryId(); - coord.clearExportStatus(); - - // generate one new queryId here, to avoid being rejected by BE, - // because the request is considered as a repeat request. - // we make the high part of query id unchanged to facilitate tracing problem by log. - UUID uuid = UUID.randomUUID(); - TUniqueId newQueryId = new TUniqueId(queryId.hi, uuid.getLeastSignificantBits()); - coord.setQueryId(newQueryId); - LOG.warn("export exporting job fail. err: {}. query_id: {}, job: {}. retry. {}, new query id: {}", - coord.getExecStatus().getErrorMsg(), DebugUtil.printId(queryId), job.getId(), j, - DebugUtil.printId(newQueryId)); - } + ExportJob.OutfileInfo outfileInfo = getOutFileInfo(r.connectContext.getResultAttachedInfo()); + outfileInfoList.add(outfileInfo); + ++completeTaskNum; + } catch (Exception e) { + errorMsg = new ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL, e.getMessage()); + isFailed = true; + break; + } finally { + this.stmtExecutor.addProfileToSpan(); } - - if (!coord.getExecStatus().ok()) { - onFailed(coord); - } else { - int progress = (int) (i + 1) * 100 / coordSize; - if (progress >= 100) { - progress = 99; - } - job.setProgress(progress); - LOG.info("finish coordinator with query id {}, export job: {}. progress: {}", - DebugUtil.printId(coord.getQueryId()), job.getId(), progress); - } - - RuntimeProfile queryProfile = coord.getQueryProfile(); - if (queryProfile != null) { - queryProfile.getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(job.getStartTimeMs())); - } - coord.endProfile(); - fragmentProfiles.add(coord.getQueryProfile()); } - if (isCancelled) { - job.cancel(cancelType, null /* error msg is already set */); + int progress = completeTaskNum * 100 / selectStmtList.size(); + if (progress >= 100) { + progress = 99; + } + job.setProgress(progress); + LOG.info("Exporting task progress is {}%, export job: {}", progress, job.getId()); + + if (isFailed) { registerProfile(); + job.cancel(errorMsg.getCancelType(), errorMsg.getMsg()); + LOG.warn("Exporting task failed because Exception: {}", errorMsg.getMsg()); return; } - if (job.getBrokerDesc().getStorageType() == StorageBackend.StorageType.BROKER) { - // move tmp file to final destination - Status mvStatus = moveTmpFiles(); - if (!mvStatus.ok()) { - String failMsg = "move tmp file to final destination fail."; - failMsg += mvStatus.getErrorMsg(); - job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg); - LOG.warn("move tmp file to final destination fail. job:{}", job); - registerProfile(); - return; - } - } - - if (job.updateState(ExportJob.JobState.FINISHED)) { - LOG.warn("export job success. job: {}", job); - registerProfile(); + registerProfile(); + if (job.finish(outfileInfoList)) { + LOG.info("export job success. job: {}", job); + // TODO(ftw): when we implement exporting tablet one by one, we should release snapshot here // release snapshot - Status releaseSnapshotStatus = job.releaseSnapshotPaths(); - if (!releaseSnapshotStatus.ok()) { - // even if release snapshot failed, do not cancel this job. - // snapshot will be removed by GC thread on BE, finally. - LOG.warn("failed to release snapshot for export job: {}. err: {}", job.getId(), - releaseSnapshotStatus.getErrorMsg()); - } + // Status releaseSnapshotStatus = job.releaseSnapshotPaths(); + // if (!releaseSnapshotStatus.ok()) { + // // even if release snapshot failed, do not cancel this job. + // // snapshot will be removed by GC thread on BE, finally. + // LOG.warn("failed to release snapshot for export job: {}. err: {}", job.getId(), + // releaseSnapshotStatus.getErrorMsg()); + // } } synchronized (this) { @@ -179,74 +139,28 @@ public class ExportExportingTask extends MasterTask { } } - private Status execOneCoord(Coordinator coord) { - TUniqueId queryId = coord.getQueryId(); - boolean needUnregister = false; - try { - QeProcessorImpl.INSTANCE.registerQuery(queryId, coord); - needUnregister = true; - actualExecCoord(queryId, coord); - } catch (UserException e) { - LOG.warn("export exporting internal error, job: {}", job.getId(), e); - return new Status(TStatusCode.INTERNAL_ERROR, e.getMessage()); - } finally { - if (needUnregister) { - QeProcessorImpl.INSTANCE.unregisterQuery(queryId); - } - } - return Status.OK; + private AutoCloseConnectContext buildConnectContext() { + ConnectContext connectContext = new ConnectContext(); + connectContext.setSessionVariable(job.getSessionVariables()); + connectContext.setEnv(Env.getCurrentEnv()); + connectContext.setDatabase(job.getTableName().getDb()); + connectContext.setQualifiedUser(job.getQualifiedUser()); + connectContext.setCurrentUserIdentity(job.getUserIdentity()); + UUID uuid = UUID.randomUUID(); + TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + connectContext.setQueryId(queryId); + connectContext.setStartTime(); + connectContext.setCluster(SystemInfoService.DEFAULT_CLUSTER); + return new AutoCloseConnectContext(connectContext); } - private void actualExecCoord(TUniqueId queryId, Coordinator coord) { - int leftTimeSecond = getLeftTimeSecond(); - if (leftTimeSecond <= 0) { - onTimeout(); - return; - } - - try { - coord.setTimeout(leftTimeSecond); - coord.exec(); - } catch (Exception e) { - LOG.warn("export Coordinator execute failed. job: {}", job.getId(), e); - } - - if (coord.join(leftTimeSecond)) { - Status status = coord.getExecStatus(); - if (status.ok()) { - onSubTaskFinished(coord.getExportFiles()); - } - } else { - coord.cancel(); - } - } - - private int getLeftTimeSecond() { - return (int) (job.getTimeoutSecond() - (System.currentTimeMillis() - job.getCreateTimeMs()) / 1000); - } - - private synchronized void onSubTaskFinished(List exportFiles) { - job.addExportedFiles(exportFiles); - } - - private synchronized void onFailed(Coordinator coordinator) { - isCancelled = true; - this.failStatus = coordinator.getExecStatus(); - cancelType = ExportFailMsg.CancelType.RUN_FAIL; - String failMsg = "export exporting job fail. query id: " + DebugUtil.printId(coordinator.getQueryId()) - + ", "; - failMsg += failStatus.getErrorMsg(); - job.setFailMsg(new ExportFailMsg(cancelType, failMsg)); - LOG.warn("export exporting job fail. err: {}. job: {}", failMsg, job); - } - - public synchronized void onTimeout() { - isCancelled = true; - this.failStatus = new Status(TStatusCode.TIMEOUT, "timeout"); - cancelType = ExportFailMsg.CancelType.TIMEOUT; - String failMsg = "export exporting job timeout."; - job.setFailMsg(new ExportFailMsg(cancelType, failMsg)); - LOG.warn("export exporting job timeout. job: {}", job); + private ExportJob.OutfileInfo getOutFileInfo(Map resultAttachedInfo) { + ExportJob.OutfileInfo outfileInfo = new ExportJob.OutfileInfo(); + outfileInfo.setFileNumber(resultAttachedInfo.get(OutFileClause.FILE_NUMBER)); + outfileInfo.setTotalRows(resultAttachedInfo.get(OutFileClause.TOTAL_ROWS)); + outfileInfo.setFileSize(resultAttachedInfo.get(OutFileClause.FILE_SIZE) + "bytes"); + outfileInfo.setUrl(resultAttachedInfo.get(OutFileClause.URL)); + return outfileInfo; } private void initProfile() { @@ -264,7 +178,7 @@ public class ExportExportingTask extends MasterTask { summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Export"); summaryProfile.addInfoString(ProfileManager.QUERY_STATE, job.getState().toString()); summaryProfile.addInfoString(ProfileManager.DORIS_VERSION, Version.DORIS_BUILD_VERSION); - summaryProfile.addInfoString(ProfileManager.USER, job.getUser()); + summaryProfile.addInfoString(ProfileManager.USER, job.getQualifiedUser()); summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, String.valueOf(job.getDbId())); summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, job.getSql()); profile.addChild(summaryProfile); @@ -281,67 +195,23 @@ public class ExportExportingTask extends MasterTask { ProfileManager.getInstance().pushProfile(profile); } - private Status moveTmpFiles() { - FsBroker broker = null; - try { - String localIP = FrontendOptions.getLocalHostAddress(); - broker = Env.getCurrentEnv().getBrokerMgr().getBroker(job.getBrokerDesc().getName(), localIP); - } catch (AnalysisException e) { - String failMsg = "get broker failed. export job: " + job.getId() + ". msg: " + e.getMessage(); - LOG.warn(failMsg); - return new Status(TStatusCode.CANCELLED, failMsg); - } - TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port); - TPaloBrokerService.Client client = null; - try { - client = ClientPool.brokerPool.borrowObject(address); - } catch (Exception e) { - try { - client = ClientPool.brokerPool.borrowObject(address); - } catch (Exception e1) { - String failMsg = "create connection to broker(" + address + ") failed"; - LOG.warn(failMsg); - return new Status(TStatusCode.CANCELLED, failMsg); - } - } - boolean failed = false; - Set exportedFiles = job.getExportedFiles(); - List newFiles = Lists.newArrayList(); - String exportPath = job.getExportPath(); - for (String exportedFile : exportedFiles) { - // move exportPath/__doris_tmp/file to exportPath/file - String file = exportedFile.substring(exportedFile.lastIndexOf("/") + 1); - String destPath = exportPath + "/" + file; - LOG.debug("rename {} to {}, export job: {}", exportedFile, destPath, job.getId()); - String failMsg = ""; - try { - TBrokerRenamePathRequest request = new TBrokerRenamePathRequest( - TBrokerVersion.VERSION_ONE, exportedFile, destPath, job.getBrokerDesc().getProperties()); - TBrokerOperationStatus tBrokerOperationStatus = null; - tBrokerOperationStatus = client.renamePath(request); - if (tBrokerOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) { - failed = true; - failMsg = "Broker renamePath failed. srcPath=" + exportedFile + ", destPath=" + destPath - + ", broker=" + address + ", msg=" + tBrokerOperationStatus.getMessage(); - return new Status(TStatusCode.CANCELLED, failMsg); - } else { - newFiles.add(destPath); - } - } catch (TException e) { - failed = true; - failMsg = "Broker renamePath failed. srcPath=" + exportedFile + ", destPath=" + destPath - + ", broker=" + address + ", msg=" + e.getMessage(); - return new Status(TStatusCode.CANCELLED, failMsg); - } finally { - if (failed) { - ClientPool.brokerPool.invalidateObject(address, client); - } - } + private void handleInQueueState() { + long dbId = job.getDbId(); + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db == null) { + job.cancel(ExportFailMsg.CancelType.RUN_FAIL, "database does not exist"); + return; } - exportedFiles.clear(); - job.addExportedFiles(newFiles); - ClientPool.brokerPool.returnObject(address, client); - return Status.OK; + // TODO(ftw): when we implement exporting tablet one by one, we should makeSnapshots here + // Status snapshotStatus = job.makeSnapshots(); + // if (!snapshotStatus.ok()) { + // job.cancel(ExportFailMsg.CancelType.RUN_FAIL, snapshotStatus.getErrorMsg()); + // return; + // } + + if (job.updateState(ExportJob.JobState.EXPORTING)) { + LOG.info("Exchange pending status to exporting status success. job: {}", job); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/ExportPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/ExportPendingTask.java deleted file mode 100644 index e2248cf7e3..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportPendingTask.java +++ /dev/null @@ -1,133 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.task; - -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Env; -import org.apache.doris.common.Pair; -import org.apache.doris.common.Status; -import org.apache.doris.load.ExportFailMsg; -import org.apache.doris.load.ExportJob; -import org.apache.doris.system.Backend; -import org.apache.doris.thrift.TAgentResult; -import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TPaloScanRange; -import org.apache.doris.thrift.TScanRange; -import org.apache.doris.thrift.TScanRangeLocation; -import org.apache.doris.thrift.TScanRangeLocations; -import org.apache.doris.thrift.TSnapshotRequest; -import org.apache.doris.thrift.TStatusCode; -import org.apache.doris.thrift.TypesConstants; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.List; - -public class ExportPendingTask extends MasterTask { - private static final Logger LOG = LogManager.getLogger(ExportPendingTask.class); - - protected final ExportJob job; - protected Database db; - - public ExportPendingTask(ExportJob job) { - super(); - this.job = job; - this.signature = job.getId(); - } - - @Override - protected void exec() { - if (job.getState() != ExportJob.JobState.PENDING) { - return; - } - - long dbId = job.getDbId(); - db = Env.getCurrentInternalCatalog().getDbNullable(dbId); - if (db == null) { - job.cancel(ExportFailMsg.CancelType.RUN_FAIL, "database does not exist"); - return; - } - - if (job.isReplayed()) { - // If the job is created from replay thread, all plan info will be lost. - // so the job has to be cancelled. - String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled."; - job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg); - return; - } - - // make snapshots - Status snapshotStatus = makeSnapshots(); - if (!snapshotStatus.ok()) { - job.cancel(ExportFailMsg.CancelType.RUN_FAIL, snapshotStatus.getErrorMsg()); - return; - } - - if (job.updateState(ExportJob.JobState.EXPORTING)) { - LOG.info("submit pending export job success. job: {}", job); - return; - } - } - - private Status makeSnapshots() { - List tabletLocations = job.getTabletLocations(); - if (tabletLocations == null) { - return Status.OK; - } - for (TScanRangeLocations tablet : tabletLocations) { - TScanRange scanRange = tablet.getScanRange(); - if (!scanRange.isSetPaloScanRange()) { - continue; - } - TPaloScanRange paloScanRange = scanRange.getPaloScanRange(); - List locations = tablet.getLocations(); - for (TScanRangeLocation location : locations) { - TNetworkAddress address = location.getServer(); - String host = address.getHostname(); - int port = address.getPort(); - Backend backend = Env.getCurrentSystemInfo().getBackendWithBePort(host, port); - if (backend == null) { - return Status.CANCELLED; - } - long backendId = backend.getId(); - if (!Env.getCurrentSystemInfo().checkBackendQueryAvailable(backendId)) { - return Status.CANCELLED; - } - TSnapshotRequest snapshotRequest = new TSnapshotRequest(); - snapshotRequest.setTabletId(paloScanRange.getTabletId()); - snapshotRequest.setSchemaHash(Integer.parseInt(paloScanRange.getSchemaHash())); - snapshotRequest.setVersion(Long.parseLong(paloScanRange.getVersion())); - snapshotRequest.setTimeout(job.getTimeoutSecond()); - snapshotRequest.setPreferredSnapshotVersion(TypesConstants.TPREFER_SNAPSHOT_REQ_VERSION); - - AgentClient client = new AgentClient(host, port); - TAgentResult result = client.makeSnapshot(snapshotRequest); - if (result == null || result.getStatus().getStatusCode() != TStatusCode.OK) { - String err = "snapshot for tablet " + paloScanRange.getTabletId() + " failed on backend " - + address.toString() + ". reason: " - + (result == null ? "unknown" : result.getStatus().error_msgs); - LOG.warn("{}, export job: {}", err, job.getId()); - return new Status(TStatusCode.CANCELLED, err); - } - job.addSnapshotPath(Pair.of(address, result.getSnapshotPath())); - } - } - return Status.OK; - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java index 30be49e031..b8e54bf86d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java @@ -156,7 +156,7 @@ public class CancelExportStmtTest extends TestWithFeService { ExportJob job2 = new ExportJob(); job2.updateState(ExportJob.JobState.CANCELLED, true); ExportJob job3 = new ExportJob(); - job3.updateState(ExportJob.JobState.EXPORTING, true); + job3.updateState(ExportJob.JobState.EXPORTING, false); ExportJob job4 = new ExportJob(); exportJobList1.add(job1); exportJobList1.add(job2); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java index 1385dfda19..448e7608a7 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/ExportMgrTest.java @@ -18,7 +18,6 @@ package org.apache.doris.load.loadv2; import org.apache.doris.analysis.BrokerDesc; -import org.apache.doris.analysis.LoadStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.datasource.InternalCatalog; @@ -27,7 +26,6 @@ import org.apache.doris.load.ExportMgr; import org.apache.doris.mysql.privilege.AccessControllerManager; import org.apache.doris.mysql.privilege.MockedAuth; -import com.google.common.collect.Maps; import mockit.Mocked; import org.junit.Assert; import org.junit.Before; @@ -35,7 +33,6 @@ import org.junit.Test; import java.util.HashMap; import java.util.List; -import java.util.Map; public class ExportMgrTest { private final ExportMgr exportMgr = new ExportMgr(); @@ -89,12 +86,7 @@ public class ExportMgrTest { BrokerDesc bd = new BrokerDesc("broker", new HashMap<>()); Deencapsulation.setField(job1, "brokerDesc", bd); - Map properties = Maps.newHashMap(); - properties.put(LoadStmt.EXEC_MEM_LIMIT, "-1"); - properties.put(LoadStmt.TIMEOUT_PROPERTY, "-1"); - Deencapsulation.setField(job1, "properties", properties); - - + Deencapsulation.setField(job1, "timeoutSecond", -1); return job1; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java index 4bce75b92a..eea4788f5d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java @@ -34,7 +34,6 @@ import org.apache.doris.task.ExportExportingTask; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.utframe.TestWithFeService; -import com.google.common.collect.Lists; import mockit.Expectations; import org.junit.Assert; import org.junit.jupiter.api.Assertions; @@ -71,6 +70,7 @@ public class SessionVariablesTest extends TestWithFeService { @Test public void testExperimentalSessionVariables() throws Exception { + connectContext.setThreadLocalInfo(); // 1. set without experimental SessionVariable sessionVar = connectContext.getSessionVariable(); boolean enableNereids = sessionVar.isEnableNereidsPlanner(); @@ -179,10 +179,6 @@ public class SessionVariablesTest extends TestWithFeService { job.getState(); minTimes = 0; result = ExportJob.JobState.EXPORTING; - - job.getCoordList(); - minTimes = 0; - result = Lists.newArrayList(); } }; @@ -207,6 +203,7 @@ public class SessionVariablesTest extends TestWithFeService { @Test public void testDisableProfile() { try { + connectContext.setThreadLocalInfo(); SetStmt setStmt = (SetStmt) parseAndAnalyzeStmt("set enable_profile=false", connectContext); SetExecutor setExecutor = new SetExecutor(connectContext, setStmt); setExecutor.execute(); @@ -221,10 +218,6 @@ public class SessionVariablesTest extends TestWithFeService { job.getState(); minTimes = 0; result = ExportJob.JobState.EXPORTING; - - job.getCoordList(); - minTimes = 0; - result = Lists.newArrayList(); } }; diff --git a/gensrc/thrift/Data.thrift b/gensrc/thrift/Data.thrift index 151ae71b57..d116382107 100644 --- a/gensrc/thrift/Data.thrift +++ b/gensrc/thrift/Data.thrift @@ -75,4 +75,6 @@ struct TResultBatch { // packet seq used to check if there has packet lost 3: required i64 packet_seq + + 4: optional map attached_infos }