[Enhencement](Export) support export with outfile syntax (#18325)
`Export` syntax provides asynchronous export function, but `Export` does not achieve vectorization. `Outfile` syntax provides synchronous export function`. So we can reimplement the export syntax with oufile syntax.
This commit is contained in:
@ -500,6 +500,15 @@ Status VFileResultWriter::_send_result() {
|
||||
std::unique_ptr<TFetchDataResult> result = std::make_unique<TFetchDataResult>();
|
||||
result->result_batch.rows.resize(1);
|
||||
result->result_batch.rows[0].assign(row_buffer.buf(), row_buffer.length());
|
||||
|
||||
std::map<std::string, string> attach_infos;
|
||||
attach_infos.insert(std::make_pair("FileNumber", std::to_string(_file_idx)));
|
||||
attach_infos.insert(
|
||||
std::make_pair("TotalRows", std::to_string(_written_rows_counter->value())));
|
||||
attach_infos.insert(std::make_pair("FileSize", std::to_string(_written_data_bytes->value())));
|
||||
attach_infos.insert(std::make_pair("URL", file_url));
|
||||
|
||||
result->result_batch.__set_attached_infos(attach_infos);
|
||||
RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "failed to send outfile result");
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -176,7 +176,7 @@ EXPORT TABLE testTbl TO "file:///home/data/a" PROPERTIES ("columns" = "k1,v1");
|
||||
8. 将 testTbl 表中的所有数据导出到 s3 上,以不可见字符 "\x07" 作为列或者行分隔符。
|
||||
|
||||
```sql
|
||||
EXPORT TABLE testTbl TO "hdfs://hdfs_host:port/a/b/c"
|
||||
EXPORT TABLE testTbl TO "s3://hdfs_host:port/a/b/c"
|
||||
PROPERTIES (
|
||||
"column_separator"="\\x07",
|
||||
"line_delimiter" = "\\x07"
|
||||
|
||||
@ -59,8 +59,10 @@ public final class FeMetaVersion {
|
||||
// TablePropertyInfo add db id
|
||||
public static final int VERSION_119 = 119;
|
||||
|
||||
public static final int VERSION_120 = 120;
|
||||
|
||||
// note: when increment meta version, should assign the latest version to VERSION_CURRENT
|
||||
public static final int VERSION_CURRENT = VERSION_119;
|
||||
public static final int VERSION_CURRENT = VERSION_120;
|
||||
|
||||
// all logs meta version should >= the minimum version, so that we could remove many if clause, for example
|
||||
// if (FE_METAVERSION < VERSION_94) ...
|
||||
|
||||
@ -35,6 +35,7 @@ import org.apache.doris.common.util.URI;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.SessionVariable;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -75,6 +76,17 @@ public class ExportStmt extends StatementBase {
|
||||
|
||||
private TableRef tableRef;
|
||||
|
||||
private String format;
|
||||
|
||||
private String label;
|
||||
|
||||
private String maxFileSize;
|
||||
private SessionVariable sessionVariables;
|
||||
|
||||
private String qualifiedUser;
|
||||
|
||||
private UserIdentity userIdentity;
|
||||
|
||||
public ExportStmt(TableRef tableRef, Expr whereExpr, String path,
|
||||
Map<String, String> properties, BrokerDesc brokerDesc) {
|
||||
this.tableRef = tableRef;
|
||||
@ -87,6 +99,7 @@ public class ExportStmt extends StatementBase {
|
||||
this.columnSeparator = DEFAULT_COLUMN_SEPARATOR;
|
||||
this.lineDelimiter = DEFAULT_LINE_DELIMITER;
|
||||
this.columns = DEFAULT_COLUMNS;
|
||||
this.sessionVariables = ConnectContext.get().getSessionVariable();
|
||||
}
|
||||
|
||||
public String getColumns() {
|
||||
@ -113,10 +126,6 @@ public class ExportStmt extends StatementBase {
|
||||
return brokerDesc;
|
||||
}
|
||||
|
||||
public Map<String, String> getProperties() {
|
||||
return properties;
|
||||
}
|
||||
|
||||
public String getColumnSeparator() {
|
||||
return this.columnSeparator;
|
||||
}
|
||||
@ -125,6 +134,30 @@ public class ExportStmt extends StatementBase {
|
||||
return this.lineDelimiter;
|
||||
}
|
||||
|
||||
public TableRef getTableRef() {
|
||||
return this.tableRef;
|
||||
}
|
||||
|
||||
public String getFormat() {
|
||||
return format;
|
||||
}
|
||||
|
||||
public String getLabel() {
|
||||
return label;
|
||||
}
|
||||
|
||||
public SessionVariable getSessionVariables() {
|
||||
return sessionVariables;
|
||||
}
|
||||
|
||||
public String getQualifiedUser() {
|
||||
return qualifiedUser;
|
||||
}
|
||||
|
||||
public UserIdentity getUserIdentity() {
|
||||
return this.userIdentity;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needAuditEncryption() {
|
||||
if (brokerDesc != null) {
|
||||
@ -162,6 +195,8 @@ public class ExportStmt extends StatementBase {
|
||||
ConnectContext.get().getRemoteIP(),
|
||||
tblName.getDb() + ": " + tblName.getTbl());
|
||||
}
|
||||
qualifiedUser = ConnectContext.get().getQualifiedUser();
|
||||
userIdentity = ConnectContext.get().getCurrentUserIdentity();
|
||||
|
||||
// check table && partitions whether exist
|
||||
checkTable(analyzer.getEnv());
|
||||
@ -171,8 +206,6 @@ public class ExportStmt extends StatementBase {
|
||||
brokerDesc = new BrokerDesc("local", StorageBackend.StorageType.LOCAL, null);
|
||||
}
|
||||
|
||||
// where expr will be checked in export job
|
||||
|
||||
// check path is valid
|
||||
path = checkPath(path, brokerDesc.getStorageType());
|
||||
if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) {
|
||||
@ -261,7 +294,6 @@ public class ExportStmt extends StatementBase {
|
||||
throw new AnalysisException(
|
||||
"Invalid export path. please use valid '" + OutFileClause.LOCAL_FILE_PREFIX + "' path.");
|
||||
}
|
||||
path = path.substring(OutFileClause.LOCAL_FILE_PREFIX.length() - 1);
|
||||
}
|
||||
return path;
|
||||
}
|
||||
@ -271,31 +303,26 @@ public class ExportStmt extends StatementBase {
|
||||
properties, ExportStmt.DEFAULT_COLUMN_SEPARATOR));
|
||||
this.lineDelimiter = Separator.convertSeparator(PropertyAnalyzer.analyzeLineDelimiter(
|
||||
properties, ExportStmt.DEFAULT_LINE_DELIMITER));
|
||||
this.columns = properties.get(LoadStmt.KEY_IN_PARAM_COLUMNS);
|
||||
// exec_mem_limit
|
||||
if (properties.containsKey(LoadStmt.EXEC_MEM_LIMIT)) {
|
||||
try {
|
||||
Long.parseLong(properties.get(LoadStmt.EXEC_MEM_LIMIT));
|
||||
} catch (NumberFormatException e) {
|
||||
throw new DdlException("Invalid exec_mem_limit value: " + e.getMessage());
|
||||
}
|
||||
} else {
|
||||
// use session variables
|
||||
properties.put(LoadStmt.EXEC_MEM_LIMIT,
|
||||
String.valueOf(ConnectContext.get().getSessionVariable().getMaxExecMemByte()));
|
||||
}
|
||||
this.columns = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_COLUMNS, DEFAULT_COLUMNS);
|
||||
|
||||
// timeout
|
||||
if (properties.containsKey(LoadStmt.TIMEOUT_PROPERTY)) {
|
||||
try {
|
||||
Long.parseLong(properties.get(LoadStmt.TIMEOUT_PROPERTY));
|
||||
Integer.parseInt(properties.get(LoadStmt.TIMEOUT_PROPERTY));
|
||||
} catch (NumberFormatException e) {
|
||||
throw new DdlException("Invalid timeout value: " + e.getMessage());
|
||||
}
|
||||
} else {
|
||||
// use session variables
|
||||
properties.put(LoadStmt.TIMEOUT_PROPERTY, String.valueOf(Config.export_task_default_timeout_second));
|
||||
}
|
||||
|
||||
// format
|
||||
if (properties.containsKey(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE)) {
|
||||
this.format = properties.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE).toLowerCase();
|
||||
} else {
|
||||
this.format = "csv";
|
||||
}
|
||||
|
||||
// tablet num per task
|
||||
if (properties.containsKey(TABLET_NUMBER_PER_TASK_PROP)) {
|
||||
try {
|
||||
@ -308,13 +335,17 @@ public class ExportStmt extends StatementBase {
|
||||
properties.put(TABLET_NUMBER_PER_TASK_PROP, String.valueOf(Config.export_tablet_num_per_task));
|
||||
}
|
||||
|
||||
// max_file_size
|
||||
this.maxFileSize = properties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, "");
|
||||
|
||||
if (properties.containsKey(LABEL)) {
|
||||
FeNameFormat.checkLabel(properties.get(LABEL));
|
||||
} else {
|
||||
// generate a random label
|
||||
String label = "export_" + UUID.randomUUID().toString();
|
||||
String label = "export_" + UUID.randomUUID();
|
||||
properties.put(LABEL, label);
|
||||
}
|
||||
label = properties.get(LABEL);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -361,4 +392,8 @@ public class ExportStmt extends StatementBase {
|
||||
public String toString() {
|
||||
return toSql();
|
||||
}
|
||||
|
||||
public String getMaxFileSize() {
|
||||
return maxFileSize;
|
||||
}
|
||||
}
|
||||
|
||||
@ -68,12 +68,16 @@ public class OutFileClause {
|
||||
public static final Map<String, TParquetCompressionType> PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap();
|
||||
public static final Map<String, TParquetVersion> PARQUET_VERSION_MAP = Maps.newHashMap();
|
||||
public static final Set<String> ORC_DATA_TYPE = Sets.newHashSet();
|
||||
public static final String FILE_NUMBER = "FileNumber";
|
||||
public static final String TOTAL_ROWS = "TotalRows";
|
||||
public static final String FILE_SIZE = "FileSize";
|
||||
public static final String URL = "URL";
|
||||
|
||||
static {
|
||||
RESULT_COL_NAMES.add("FileNumber");
|
||||
RESULT_COL_NAMES.add("TotalRows");
|
||||
RESULT_COL_NAMES.add("FileSize");
|
||||
RESULT_COL_NAMES.add("URL");
|
||||
RESULT_COL_NAMES.add(FILE_NUMBER);
|
||||
RESULT_COL_NAMES.add(TOTAL_ROWS);
|
||||
RESULT_COL_NAMES.add(FILE_SIZE);
|
||||
RESULT_COL_NAMES.add(URL);
|
||||
|
||||
RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.INT));
|
||||
RESULT_COL_TYPES.add(ScalarType.createType(PrimitiveType.BIGINT));
|
||||
@ -122,9 +126,9 @@ public class OutFileClause {
|
||||
private static final String HADOOP_PROP_PREFIX = "hadoop.";
|
||||
private static final String BROKER_PROP_PREFIX = "broker.";
|
||||
private static final String PROP_BROKER_NAME = "broker.name";
|
||||
private static final String PROP_COLUMN_SEPARATOR = "column_separator";
|
||||
private static final String PROP_LINE_DELIMITER = "line_delimiter";
|
||||
private static final String PROP_MAX_FILE_SIZE = "max_file_size";
|
||||
public static final String PROP_COLUMN_SEPARATOR = "column_separator";
|
||||
public static final String PROP_LINE_DELIMITER = "line_delimiter";
|
||||
public static final String PROP_MAX_FILE_SIZE = "max_file_size";
|
||||
private static final String PROP_SUCCESS_FILE_NAME = "success_file_name";
|
||||
private static final String PARQUET_PROP_PREFIX = "parquet.";
|
||||
private static final String ORC_PROP_PREFIX = "orc.";
|
||||
|
||||
@ -143,7 +143,7 @@ public class SelectStmt extends QueryStmt {
|
||||
this.colLabels = Lists.newArrayList();
|
||||
}
|
||||
|
||||
SelectStmt(
|
||||
public SelectStmt(
|
||||
SelectList selectList,
|
||||
FromClause fromClause,
|
||||
Expr wherePredicate,
|
||||
|
||||
@ -149,7 +149,6 @@ import org.apache.doris.journal.JournalCursor;
|
||||
import org.apache.doris.journal.JournalEntity;
|
||||
import org.apache.doris.journal.bdbje.Timestamp;
|
||||
import org.apache.doris.load.DeleteHandler;
|
||||
import org.apache.doris.load.ExportChecker;
|
||||
import org.apache.doris.load.ExportJob;
|
||||
import org.apache.doris.load.ExportMgr;
|
||||
import org.apache.doris.load.Load;
|
||||
@ -1402,9 +1401,8 @@ public class Env {
|
||||
loadJobScheduler.start();
|
||||
loadEtlChecker.start();
|
||||
loadLoadingChecker.start();
|
||||
// Export checker
|
||||
ExportChecker.init(Config.export_checker_interval_second * 1000L);
|
||||
ExportChecker.startAll();
|
||||
// export task
|
||||
exportMgr.start();
|
||||
// Tablet checker and scheduler
|
||||
tabletChecker.start();
|
||||
tabletScheduler.start();
|
||||
|
||||
@ -32,7 +32,7 @@ public class ExportProcNode implements ProcNodeInterface {
|
||||
.add("JobId").add("Label").add("State").add("Progress")
|
||||
.add("TaskInfo").add("Path")
|
||||
.add("CreateTime").add("StartTime").add("FinishTime")
|
||||
.add("Timeout").add("ErrorMsg")
|
||||
.add("Timeout").add("ErrorMsg").add("OutfileInfo")
|
||||
.build();
|
||||
|
||||
// label and state column index of result
|
||||
|
||||
@ -312,8 +312,7 @@ public class JournalEntity implements Writable {
|
||||
isRead = true;
|
||||
break;
|
||||
case OperationType.OP_EXPORT_UPDATE_STATE:
|
||||
data = new ExportJob.StateTransfer();
|
||||
((ExportJob.StateTransfer) data).readFields(in);
|
||||
data = ExportJob.StateTransfer.read(in);
|
||||
isRead = true;
|
||||
break;
|
||||
case OperationType.OP_FINISH_DELETE: {
|
||||
|
||||
@ -1,139 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.load;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.load.ExportJob.JobState;
|
||||
import org.apache.doris.task.ExportExportingTask;
|
||||
import org.apache.doris.task.ExportPendingTask;
|
||||
import org.apache.doris.task.MasterTask;
|
||||
import org.apache.doris.task.MasterTaskExecutor;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public final class ExportChecker extends MasterDaemon {
|
||||
private static final Logger LOG = LogManager.getLogger(ExportChecker.class);
|
||||
|
||||
// checkers for running job state
|
||||
private static Map<JobState, ExportChecker> checkers = Maps.newHashMap();
|
||||
// executors for pending tasks
|
||||
private static Map<JobState, MasterTaskExecutor> executors = Maps.newHashMap();
|
||||
private JobState jobState;
|
||||
|
||||
private ExportChecker(JobState jobState, long intervalMs) {
|
||||
super("export checker " + jobState.name().toLowerCase(), intervalMs);
|
||||
this.jobState = jobState;
|
||||
}
|
||||
|
||||
public static void init(long intervalMs) {
|
||||
checkers.put(JobState.PENDING, new ExportChecker(JobState.PENDING, intervalMs));
|
||||
checkers.put(JobState.EXPORTING, new ExportChecker(JobState.EXPORTING, intervalMs));
|
||||
|
||||
int poolSize = Config.export_running_job_num_limit == 0 ? 5 : Config.export_running_job_num_limit;
|
||||
MasterTaskExecutor pendingTaskExecutor = new MasterTaskExecutor("export-pending-job", poolSize, true);
|
||||
executors.put(JobState.PENDING, pendingTaskExecutor);
|
||||
|
||||
MasterTaskExecutor exportingTaskExecutor = new MasterTaskExecutor("export-exporting-job", poolSize, true);
|
||||
executors.put(JobState.EXPORTING, exportingTaskExecutor);
|
||||
}
|
||||
|
||||
public static void startAll() {
|
||||
for (ExportChecker exportChecker : checkers.values()) {
|
||||
exportChecker.start();
|
||||
}
|
||||
for (MasterTaskExecutor masterTaskExecutor : executors.values()) {
|
||||
masterTaskExecutor.start();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runAfterCatalogReady() {
|
||||
LOG.debug("start check export jobs. job state: {}", jobState.name());
|
||||
switch (jobState) {
|
||||
case PENDING:
|
||||
runPendingJobs();
|
||||
break;
|
||||
case EXPORTING:
|
||||
runExportingJobs();
|
||||
break;
|
||||
default:
|
||||
LOG.warn("wrong export job state: {}", jobState.name());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private void runPendingJobs() {
|
||||
ExportMgr exportMgr = Env.getCurrentEnv().getExportMgr();
|
||||
List<ExportJob> pendingJobs = exportMgr.getExportJobs(JobState.PENDING);
|
||||
|
||||
// check to limit running etl job num
|
||||
int runningJobNumLimit = Config.export_running_job_num_limit;
|
||||
if (runningJobNumLimit > 0 && !pendingJobs.isEmpty()) {
|
||||
// pending executor running + exporting state
|
||||
int runningJobNum = executors.get(JobState.PENDING).getTaskNum()
|
||||
+ executors.get(JobState.EXPORTING).getTaskNum();
|
||||
if (runningJobNum >= runningJobNumLimit) {
|
||||
LOG.info("running export job num {} exceeds system limit {}", runningJobNum, runningJobNumLimit);
|
||||
return;
|
||||
}
|
||||
|
||||
int remain = runningJobNumLimit - runningJobNum;
|
||||
if (pendingJobs.size() > remain) {
|
||||
pendingJobs = pendingJobs.subList(0, remain);
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("pending export job num: {}", pendingJobs.size());
|
||||
|
||||
for (ExportJob job : pendingJobs) {
|
||||
try {
|
||||
MasterTask task = new ExportPendingTask(job);
|
||||
if (executors.get(JobState.PENDING).submit(task)) {
|
||||
LOG.info("run pending export job. job: {}", job);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("run pending export job error", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void runExportingJobs() {
|
||||
List<ExportJob> jobs = Env.getCurrentEnv().getExportMgr().getExportJobs(JobState.EXPORTING);
|
||||
LOG.debug("exporting export job num: {}", jobs.size());
|
||||
for (ExportJob job : jobs) {
|
||||
try {
|
||||
MasterTask task = new ExportExportingTask(job);
|
||||
if (executors.get(JobState.EXPORTING).submit(task)) {
|
||||
LOG.info("run exporting export job. job: {}", job);
|
||||
} else {
|
||||
LOG.info("fail to submit exporting job to executor. job: {}", job);
|
||||
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("run export exporting job error", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -20,6 +20,8 @@ package org.apache.doris.load;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
@ -33,7 +35,9 @@ public class ExportFailMsg implements Writable {
|
||||
UNKNOWN
|
||||
}
|
||||
|
||||
@SerializedName("cancelType")
|
||||
private CancelType cancelType;
|
||||
@SerializedName("msg")
|
||||
private String msg;
|
||||
|
||||
public ExportFailMsg() {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -32,10 +32,15 @@ import org.apache.doris.common.LabelAlreadyUsedException;
|
||||
import org.apache.doris.common.PatternMatcher;
|
||||
import org.apache.doris.common.PatternMatcherWrapper;
|
||||
import org.apache.doris.common.util.ListComparator;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.common.util.OrderByPair;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.load.ExportJob.JobState;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.task.ExportExportingTask;
|
||||
import org.apache.doris.task.MasterTask;
|
||||
import org.apache.doris.task.MasterTaskExecutor;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Strings;
|
||||
@ -57,7 +62,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class ExportMgr {
|
||||
public class ExportMgr extends MasterDaemon {
|
||||
private static final Logger LOG = LogManager.getLogger(ExportJob.class);
|
||||
|
||||
// lock for export job
|
||||
@ -67,7 +72,11 @@ public class ExportMgr {
|
||||
private Map<Long, ExportJob> idToJob = Maps.newHashMap(); // exportJobId to exportJob
|
||||
private Map<String, Long> labelToJobId = Maps.newHashMap();
|
||||
|
||||
private MasterTaskExecutor exportingExecutor;
|
||||
|
||||
public ExportMgr() {
|
||||
int poolSize = Config.export_running_job_num_limit == 0 ? 5 : Config.export_running_job_num_limit;
|
||||
exportingExecutor = new MasterTaskExecutor("export-exporting-job", poolSize, true);
|
||||
}
|
||||
|
||||
public void readLock() {
|
||||
@ -86,6 +95,53 @@ public class ExportMgr {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
super.start();
|
||||
exportingExecutor.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runAfterCatalogReady() {
|
||||
List<ExportJob> pendingJobs = getExportJobs(JobState.PENDING);
|
||||
List<ExportJob> newInQueueJobs = Lists.newArrayList();
|
||||
for (ExportJob job : pendingJobs) {
|
||||
if (handlePendingJobs(job)) {
|
||||
newInQueueJobs.add(job);
|
||||
}
|
||||
}
|
||||
LOG.debug("new IN_QUEUE export job num: {}", newInQueueJobs.size());
|
||||
for (ExportJob job : newInQueueJobs) {
|
||||
try {
|
||||
MasterTask task = new ExportExportingTask(job);
|
||||
if (exportingExecutor.submit(task)) {
|
||||
LOG.info("success to submit IN_QUEUE export job. job: {}", job);
|
||||
} else {
|
||||
LOG.info("fail to submit IN_QUEUE job to executor. job: {}", job);
|
||||
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("run export exporting job {}.", job, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean handlePendingJobs(ExportJob job) {
|
||||
if (job.isReplayed()) {
|
||||
// If the job is created from replay thread, all plan info will be lost.
|
||||
// so the job has to be cancelled.
|
||||
String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled.";
|
||||
job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (job.updateState(JobState.IN_QUEUE)) {
|
||||
LOG.info("Exchange pending status to in_queue status success. job: {}", job);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public List<ExportJob> getJobs() {
|
||||
return Lists.newArrayList(idToJob.values());
|
||||
}
|
||||
@ -169,6 +225,17 @@ public class ExportMgr {
|
||||
return job;
|
||||
}
|
||||
|
||||
public ExportJob getJob(long jobId) {
|
||||
ExportJob job = null;
|
||||
readLock();
|
||||
try {
|
||||
job = idToJob.get(jobId);
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
return job;
|
||||
}
|
||||
|
||||
public List<ExportJob> getExportJobs(ExportJob.JobState state) {
|
||||
List<ExportJob> result = Lists.newArrayList();
|
||||
readLock();
|
||||
@ -185,6 +252,7 @@ public class ExportMgr {
|
||||
return result;
|
||||
}
|
||||
|
||||
// used for `show export` statement
|
||||
// NOTE: jobid and states may both specified, or only one of them, or neither
|
||||
public List<List<String>> getExportJobInfosByIdOrState(
|
||||
long dbId, long jobId, String label, boolean isLabelUseLike, Set<ExportJob.JobState> states,
|
||||
@ -341,15 +409,15 @@ public class ExportMgr {
|
||||
}
|
||||
infoMap.put("partitions", partitions);
|
||||
infoMap.put("broker", job.getBrokerDesc().getName());
|
||||
infoMap.put("column separator", job.getColumnSeparator());
|
||||
infoMap.put("line delimiter", job.getLineDelimiter());
|
||||
infoMap.put("exec mem limit", job.getExecMemLimit());
|
||||
infoMap.put("column_separator", job.getColumnSeparator());
|
||||
infoMap.put("format", job.getFormat());
|
||||
infoMap.put("line_delimiter", job.getLineDelimiter());
|
||||
infoMap.put("columns", job.getColumns());
|
||||
infoMap.put("coord num", job.getCoordList().size());
|
||||
infoMap.put("tablet num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size());
|
||||
infoMap.put("tablet_num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size());
|
||||
infoMap.put("max_file_size", job.getMaxFileSize());
|
||||
jobInfo.add(new Gson().toJson(infoMap));
|
||||
// path
|
||||
jobInfo.add(job.getShowExportPath());
|
||||
jobInfo.add(job.getExportPath());
|
||||
|
||||
jobInfo.add(TimeUtils.longToTimeString(job.getCreateTimeMs()));
|
||||
jobInfo.add(TimeUtils.longToTimeString(job.getStartTimeMs()));
|
||||
@ -364,6 +432,13 @@ public class ExportMgr {
|
||||
jobInfo.add(FeConstants.null_string);
|
||||
}
|
||||
|
||||
// outfileInfo
|
||||
if (job.getState() == JobState.FINISHED) {
|
||||
jobInfo.add(job.getOutfileInfo());
|
||||
} else {
|
||||
jobInfo.add(FeConstants.null_string);
|
||||
}
|
||||
|
||||
return jobInfo;
|
||||
}
|
||||
|
||||
@ -397,11 +472,15 @@ public class ExportMgr {
|
||||
}
|
||||
}
|
||||
|
||||
public void replayUpdateJobState(long jobId, ExportJob.JobState newState) {
|
||||
public void replayUpdateJobState(ExportJob.StateTransfer stateTransfer) {
|
||||
readLock();
|
||||
try {
|
||||
ExportJob job = idToJob.get(jobId);
|
||||
job.updateState(newState, true);
|
||||
ExportJob job = idToJob.get(stateTransfer.getJobId());
|
||||
job.updateState(stateTransfer.getState(), true);
|
||||
job.setStartTimeMs(stateTransfer.getStartTimeMs());
|
||||
job.setFinishTimeMs(stateTransfer.getFinishTimeMs());
|
||||
job.setFailMsg(stateTransfer.getFailMsg());
|
||||
job.setOutfileInfo(stateTransfer.getOutFileInfo());
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
|
||||
@ -354,7 +354,7 @@ public class EditLog {
|
||||
case OperationType.OP_EXPORT_UPDATE_STATE:
|
||||
ExportJob.StateTransfer op = (ExportJob.StateTransfer) journal.getData();
|
||||
ExportMgr exportMgr = env.getExportMgr();
|
||||
exportMgr.replayUpdateJobState(op.getJobId(), op.getState());
|
||||
exportMgr.replayUpdateJobState(op);
|
||||
break;
|
||||
case OperationType.OP_FINISH_DELETE: {
|
||||
DeleteInfo info = (DeleteInfo) journal.getData();
|
||||
|
||||
@ -156,6 +156,8 @@ public class ConnectContext {
|
||||
|
||||
private StatsErrorEstimator statsErrorEstimator;
|
||||
|
||||
private Map<String, String> resultAttachedInfo;
|
||||
|
||||
public void setUserQueryTimeout(int queryTimeout) {
|
||||
if (queryTimeout > 0) {
|
||||
sessionVariable.setQueryTimeoutS(queryTimeout);
|
||||
@ -366,6 +368,10 @@ public class ConnectContext {
|
||||
return sessionVariable;
|
||||
}
|
||||
|
||||
public void setSessionVariable(SessionVariable sessionVariable) {
|
||||
this.sessionVariable = sessionVariable;
|
||||
}
|
||||
|
||||
public ConnectScheduler getConnectScheduler() {
|
||||
return connectScheduler;
|
||||
}
|
||||
@ -651,6 +657,14 @@ public class ConnectContext {
|
||||
}
|
||||
}
|
||||
|
||||
public void setResultAttachedInfo(Map<String, String> resultAttachedInfo) {
|
||||
this.resultAttachedInfo = resultAttachedInfo;
|
||||
}
|
||||
|
||||
public Map<String, String> getResultAttachedInfo() {
|
||||
return resultAttachedInfo;
|
||||
}
|
||||
|
||||
public class ThreadInfo {
|
||||
public boolean isFull;
|
||||
|
||||
|
||||
@ -1365,6 +1365,7 @@ public class StmtExecutor implements ProfileWriter {
|
||||
}
|
||||
plannerProfile.freshWriteResultConsumeTime();
|
||||
context.updateReturnRows(batch.getBatch().getRows().size());
|
||||
context.setResultAttachedInfo(batch.getBatch().getAttachedInfos());
|
||||
}
|
||||
if (batch.isEos()) {
|
||||
break;
|
||||
|
||||
@ -17,13 +17,10 @@
|
||||
|
||||
package org.apache.doris.task;
|
||||
|
||||
import org.apache.doris.analysis.StorageBackend;
|
||||
import org.apache.doris.analysis.OutFileClause;
|
||||
import org.apache.doris.analysis.QueryStmt;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.FsBroker;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.ClientPool;
|
||||
import org.apache.doris.common.Status;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.Version;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.ProfileManager;
|
||||
@ -31,40 +28,32 @@ import org.apache.doris.common.util.RuntimeProfile;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.load.ExportFailMsg;
|
||||
import org.apache.doris.load.ExportJob;
|
||||
import org.apache.doris.qe.Coordinator;
|
||||
import org.apache.doris.qe.QeProcessorImpl;
|
||||
import org.apache.doris.service.FrontendOptions;
|
||||
import org.apache.doris.thrift.TBrokerOperationStatus;
|
||||
import org.apache.doris.thrift.TBrokerOperationStatusCode;
|
||||
import org.apache.doris.thrift.TBrokerRenamePathRequest;
|
||||
import org.apache.doris.thrift.TBrokerVersion;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TPaloBrokerService;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
import org.apache.doris.load.ExportJob.JobState;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.QueryState.MysqlStateType;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
public class ExportExportingTask extends MasterTask {
|
||||
private static final Logger LOG = LogManager.getLogger(ExportExportingTask.class);
|
||||
private static final int RETRY_NUM = 2;
|
||||
|
||||
protected final ExportJob job;
|
||||
|
||||
private boolean isCancelled = false;
|
||||
private Status failStatus = Status.OK;
|
||||
private ExportFailMsg.CancelType cancelType = ExportFailMsg.CancelType.UNKNOWN;
|
||||
|
||||
private RuntimeProfile profile = new RuntimeProfile("Export");
|
||||
private List<RuntimeProfile> fragmentProfiles = Lists.newArrayList();
|
||||
|
||||
private StmtExecutor stmtExecutor;
|
||||
|
||||
public ExportExportingTask(ExportJob job) {
|
||||
this.job = job;
|
||||
this.signature = job.getId();
|
||||
@ -72,6 +61,10 @@ public class ExportExportingTask extends MasterTask {
|
||||
|
||||
@Override
|
||||
protected void exec() {
|
||||
if (job.getState() == JobState.IN_QUEUE) {
|
||||
handleInQueueState();
|
||||
}
|
||||
|
||||
if (job.getState() != ExportJob.JobState.EXPORTING) {
|
||||
return;
|
||||
}
|
||||
@ -85,93 +78,60 @@ public class ExportExportingTask extends MasterTask {
|
||||
job.setDoExportingThread(Thread.currentThread());
|
||||
}
|
||||
|
||||
if (job.isReplayed()) {
|
||||
// If the job is created from replay thread, all plan info will be lost.
|
||||
// so the job has to be cancelled.
|
||||
String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled.";
|
||||
job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
|
||||
return;
|
||||
}
|
||||
|
||||
// if one instance finished, we send request to BE to exec next instance
|
||||
List<Coordinator> coords = job.getCoordList();
|
||||
int coordSize = coords.size();
|
||||
for (int i = 0; i < coordSize; i++) {
|
||||
if (isCancelled) {
|
||||
break;
|
||||
}
|
||||
Coordinator coord = coords.get(i);
|
||||
for (int j = 0; j < RETRY_NUM; ++j) {
|
||||
execOneCoord(coord);
|
||||
if (coord.getExecStatus().ok()) {
|
||||
List<QueryStmt> selectStmtList = job.getSelectStmtList();
|
||||
boolean isFailed = false;
|
||||
ExportFailMsg errorMsg = null;
|
||||
int completeTaskNum = 0;
|
||||
List<ExportJob.OutfileInfo> outfileInfoList = Lists.newArrayList();
|
||||
// begin exporting
|
||||
for (int i = 0; i < selectStmtList.size(); ++i) {
|
||||
try (AutoCloseConnectContext r = buildConnectContext()) {
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, selectStmtList.get(i));
|
||||
this.stmtExecutor.execute();
|
||||
if (r.connectContext.getState().getStateType() == MysqlStateType.ERR) {
|
||||
errorMsg = new ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL,
|
||||
r.connectContext.getState().getErrorMessage());
|
||||
isFailed = true;
|
||||
break;
|
||||
}
|
||||
if (j < RETRY_NUM - 1) {
|
||||
TUniqueId queryId = coord.getQueryId();
|
||||
coord.clearExportStatus();
|
||||
|
||||
// generate one new queryId here, to avoid being rejected by BE,
|
||||
// because the request is considered as a repeat request.
|
||||
// we make the high part of query id unchanged to facilitate tracing problem by log.
|
||||
UUID uuid = UUID.randomUUID();
|
||||
TUniqueId newQueryId = new TUniqueId(queryId.hi, uuid.getLeastSignificantBits());
|
||||
coord.setQueryId(newQueryId);
|
||||
LOG.warn("export exporting job fail. err: {}. query_id: {}, job: {}. retry. {}, new query id: {}",
|
||||
coord.getExecStatus().getErrorMsg(), DebugUtil.printId(queryId), job.getId(), j,
|
||||
DebugUtil.printId(newQueryId));
|
||||
}
|
||||
ExportJob.OutfileInfo outfileInfo = getOutFileInfo(r.connectContext.getResultAttachedInfo());
|
||||
outfileInfoList.add(outfileInfo);
|
||||
++completeTaskNum;
|
||||
} catch (Exception e) {
|
||||
errorMsg = new ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
|
||||
isFailed = true;
|
||||
break;
|
||||
} finally {
|
||||
this.stmtExecutor.addProfileToSpan();
|
||||
}
|
||||
|
||||
if (!coord.getExecStatus().ok()) {
|
||||
onFailed(coord);
|
||||
} else {
|
||||
int progress = (int) (i + 1) * 100 / coordSize;
|
||||
if (progress >= 100) {
|
||||
progress = 99;
|
||||
}
|
||||
job.setProgress(progress);
|
||||
LOG.info("finish coordinator with query id {}, export job: {}. progress: {}",
|
||||
DebugUtil.printId(coord.getQueryId()), job.getId(), progress);
|
||||
}
|
||||
|
||||
RuntimeProfile queryProfile = coord.getQueryProfile();
|
||||
if (queryProfile != null) {
|
||||
queryProfile.getCounterTotalTime().setValue(TimeUtils.getEstimatedTime(job.getStartTimeMs()));
|
||||
}
|
||||
coord.endProfile();
|
||||
fragmentProfiles.add(coord.getQueryProfile());
|
||||
}
|
||||
|
||||
if (isCancelled) {
|
||||
job.cancel(cancelType, null /* error msg is already set */);
|
||||
int progress = completeTaskNum * 100 / selectStmtList.size();
|
||||
if (progress >= 100) {
|
||||
progress = 99;
|
||||
}
|
||||
job.setProgress(progress);
|
||||
LOG.info("Exporting task progress is {}%, export job: {}", progress, job.getId());
|
||||
|
||||
if (isFailed) {
|
||||
registerProfile();
|
||||
job.cancel(errorMsg.getCancelType(), errorMsg.getMsg());
|
||||
LOG.warn("Exporting task failed because Exception: {}", errorMsg.getMsg());
|
||||
return;
|
||||
}
|
||||
|
||||
if (job.getBrokerDesc().getStorageType() == StorageBackend.StorageType.BROKER) {
|
||||
// move tmp file to final destination
|
||||
Status mvStatus = moveTmpFiles();
|
||||
if (!mvStatus.ok()) {
|
||||
String failMsg = "move tmp file to final destination fail.";
|
||||
failMsg += mvStatus.getErrorMsg();
|
||||
job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
|
||||
LOG.warn("move tmp file to final destination fail. job:{}", job);
|
||||
registerProfile();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (job.updateState(ExportJob.JobState.FINISHED)) {
|
||||
LOG.warn("export job success. job: {}", job);
|
||||
registerProfile();
|
||||
registerProfile();
|
||||
if (job.finish(outfileInfoList)) {
|
||||
LOG.info("export job success. job: {}", job);
|
||||
// TODO(ftw): when we implement exporting tablet one by one, we should release snapshot here
|
||||
// release snapshot
|
||||
Status releaseSnapshotStatus = job.releaseSnapshotPaths();
|
||||
if (!releaseSnapshotStatus.ok()) {
|
||||
// even if release snapshot failed, do not cancel this job.
|
||||
// snapshot will be removed by GC thread on BE, finally.
|
||||
LOG.warn("failed to release snapshot for export job: {}. err: {}", job.getId(),
|
||||
releaseSnapshotStatus.getErrorMsg());
|
||||
}
|
||||
// Status releaseSnapshotStatus = job.releaseSnapshotPaths();
|
||||
// if (!releaseSnapshotStatus.ok()) {
|
||||
// // even if release snapshot failed, do not cancel this job.
|
||||
// // snapshot will be removed by GC thread on BE, finally.
|
||||
// LOG.warn("failed to release snapshot for export job: {}. err: {}", job.getId(),
|
||||
// releaseSnapshotStatus.getErrorMsg());
|
||||
// }
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
@ -179,74 +139,28 @@ public class ExportExportingTask extends MasterTask {
|
||||
}
|
||||
}
|
||||
|
||||
private Status execOneCoord(Coordinator coord) {
|
||||
TUniqueId queryId = coord.getQueryId();
|
||||
boolean needUnregister = false;
|
||||
try {
|
||||
QeProcessorImpl.INSTANCE.registerQuery(queryId, coord);
|
||||
needUnregister = true;
|
||||
actualExecCoord(queryId, coord);
|
||||
} catch (UserException e) {
|
||||
LOG.warn("export exporting internal error, job: {}", job.getId(), e);
|
||||
return new Status(TStatusCode.INTERNAL_ERROR, e.getMessage());
|
||||
} finally {
|
||||
if (needUnregister) {
|
||||
QeProcessorImpl.INSTANCE.unregisterQuery(queryId);
|
||||
}
|
||||
}
|
||||
return Status.OK;
|
||||
private AutoCloseConnectContext buildConnectContext() {
|
||||
ConnectContext connectContext = new ConnectContext();
|
||||
connectContext.setSessionVariable(job.getSessionVariables());
|
||||
connectContext.setEnv(Env.getCurrentEnv());
|
||||
connectContext.setDatabase(job.getTableName().getDb());
|
||||
connectContext.setQualifiedUser(job.getQualifiedUser());
|
||||
connectContext.setCurrentUserIdentity(job.getUserIdentity());
|
||||
UUID uuid = UUID.randomUUID();
|
||||
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
|
||||
connectContext.setQueryId(queryId);
|
||||
connectContext.setStartTime();
|
||||
connectContext.setCluster(SystemInfoService.DEFAULT_CLUSTER);
|
||||
return new AutoCloseConnectContext(connectContext);
|
||||
}
|
||||
|
||||
private void actualExecCoord(TUniqueId queryId, Coordinator coord) {
|
||||
int leftTimeSecond = getLeftTimeSecond();
|
||||
if (leftTimeSecond <= 0) {
|
||||
onTimeout();
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
coord.setTimeout(leftTimeSecond);
|
||||
coord.exec();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("export Coordinator execute failed. job: {}", job.getId(), e);
|
||||
}
|
||||
|
||||
if (coord.join(leftTimeSecond)) {
|
||||
Status status = coord.getExecStatus();
|
||||
if (status.ok()) {
|
||||
onSubTaskFinished(coord.getExportFiles());
|
||||
}
|
||||
} else {
|
||||
coord.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
private int getLeftTimeSecond() {
|
||||
return (int) (job.getTimeoutSecond() - (System.currentTimeMillis() - job.getCreateTimeMs()) / 1000);
|
||||
}
|
||||
|
||||
private synchronized void onSubTaskFinished(List<String> exportFiles) {
|
||||
job.addExportedFiles(exportFiles);
|
||||
}
|
||||
|
||||
private synchronized void onFailed(Coordinator coordinator) {
|
||||
isCancelled = true;
|
||||
this.failStatus = coordinator.getExecStatus();
|
||||
cancelType = ExportFailMsg.CancelType.RUN_FAIL;
|
||||
String failMsg = "export exporting job fail. query id: " + DebugUtil.printId(coordinator.getQueryId())
|
||||
+ ", ";
|
||||
failMsg += failStatus.getErrorMsg();
|
||||
job.setFailMsg(new ExportFailMsg(cancelType, failMsg));
|
||||
LOG.warn("export exporting job fail. err: {}. job: {}", failMsg, job);
|
||||
}
|
||||
|
||||
public synchronized void onTimeout() {
|
||||
isCancelled = true;
|
||||
this.failStatus = new Status(TStatusCode.TIMEOUT, "timeout");
|
||||
cancelType = ExportFailMsg.CancelType.TIMEOUT;
|
||||
String failMsg = "export exporting job timeout.";
|
||||
job.setFailMsg(new ExportFailMsg(cancelType, failMsg));
|
||||
LOG.warn("export exporting job timeout. job: {}", job);
|
||||
private ExportJob.OutfileInfo getOutFileInfo(Map<String, String> resultAttachedInfo) {
|
||||
ExportJob.OutfileInfo outfileInfo = new ExportJob.OutfileInfo();
|
||||
outfileInfo.setFileNumber(resultAttachedInfo.get(OutFileClause.FILE_NUMBER));
|
||||
outfileInfo.setTotalRows(resultAttachedInfo.get(OutFileClause.TOTAL_ROWS));
|
||||
outfileInfo.setFileSize(resultAttachedInfo.get(OutFileClause.FILE_SIZE) + "bytes");
|
||||
outfileInfo.setUrl(resultAttachedInfo.get(OutFileClause.URL));
|
||||
return outfileInfo;
|
||||
}
|
||||
|
||||
private void initProfile() {
|
||||
@ -264,7 +178,7 @@ public class ExportExportingTask extends MasterTask {
|
||||
summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Export");
|
||||
summaryProfile.addInfoString(ProfileManager.QUERY_STATE, job.getState().toString());
|
||||
summaryProfile.addInfoString(ProfileManager.DORIS_VERSION, Version.DORIS_BUILD_VERSION);
|
||||
summaryProfile.addInfoString(ProfileManager.USER, job.getUser());
|
||||
summaryProfile.addInfoString(ProfileManager.USER, job.getQualifiedUser());
|
||||
summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, String.valueOf(job.getDbId()));
|
||||
summaryProfile.addInfoString(ProfileManager.SQL_STATEMENT, job.getSql());
|
||||
profile.addChild(summaryProfile);
|
||||
@ -281,67 +195,23 @@ public class ExportExportingTask extends MasterTask {
|
||||
ProfileManager.getInstance().pushProfile(profile);
|
||||
}
|
||||
|
||||
private Status moveTmpFiles() {
|
||||
FsBroker broker = null;
|
||||
try {
|
||||
String localIP = FrontendOptions.getLocalHostAddress();
|
||||
broker = Env.getCurrentEnv().getBrokerMgr().getBroker(job.getBrokerDesc().getName(), localIP);
|
||||
} catch (AnalysisException e) {
|
||||
String failMsg = "get broker failed. export job: " + job.getId() + ". msg: " + e.getMessage();
|
||||
LOG.warn(failMsg);
|
||||
return new Status(TStatusCode.CANCELLED, failMsg);
|
||||
}
|
||||
TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port);
|
||||
TPaloBrokerService.Client client = null;
|
||||
try {
|
||||
client = ClientPool.brokerPool.borrowObject(address);
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
client = ClientPool.brokerPool.borrowObject(address);
|
||||
} catch (Exception e1) {
|
||||
String failMsg = "create connection to broker(" + address + ") failed";
|
||||
LOG.warn(failMsg);
|
||||
return new Status(TStatusCode.CANCELLED, failMsg);
|
||||
}
|
||||
}
|
||||
boolean failed = false;
|
||||
Set<String> exportedFiles = job.getExportedFiles();
|
||||
List<String> newFiles = Lists.newArrayList();
|
||||
String exportPath = job.getExportPath();
|
||||
for (String exportedFile : exportedFiles) {
|
||||
// move exportPath/__doris_tmp/file to exportPath/file
|
||||
String file = exportedFile.substring(exportedFile.lastIndexOf("/") + 1);
|
||||
String destPath = exportPath + "/" + file;
|
||||
LOG.debug("rename {} to {}, export job: {}", exportedFile, destPath, job.getId());
|
||||
String failMsg = "";
|
||||
try {
|
||||
TBrokerRenamePathRequest request = new TBrokerRenamePathRequest(
|
||||
TBrokerVersion.VERSION_ONE, exportedFile, destPath, job.getBrokerDesc().getProperties());
|
||||
TBrokerOperationStatus tBrokerOperationStatus = null;
|
||||
tBrokerOperationStatus = client.renamePath(request);
|
||||
if (tBrokerOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
|
||||
failed = true;
|
||||
failMsg = "Broker renamePath failed. srcPath=" + exportedFile + ", destPath=" + destPath
|
||||
+ ", broker=" + address + ", msg=" + tBrokerOperationStatus.getMessage();
|
||||
return new Status(TStatusCode.CANCELLED, failMsg);
|
||||
} else {
|
||||
newFiles.add(destPath);
|
||||
}
|
||||
} catch (TException e) {
|
||||
failed = true;
|
||||
failMsg = "Broker renamePath failed. srcPath=" + exportedFile + ", destPath=" + destPath
|
||||
+ ", broker=" + address + ", msg=" + e.getMessage();
|
||||
return new Status(TStatusCode.CANCELLED, failMsg);
|
||||
} finally {
|
||||
if (failed) {
|
||||
ClientPool.brokerPool.invalidateObject(address, client);
|
||||
}
|
||||
}
|
||||
private void handleInQueueState() {
|
||||
long dbId = job.getDbId();
|
||||
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
|
||||
if (db == null) {
|
||||
job.cancel(ExportFailMsg.CancelType.RUN_FAIL, "database does not exist");
|
||||
return;
|
||||
}
|
||||
|
||||
exportedFiles.clear();
|
||||
job.addExportedFiles(newFiles);
|
||||
ClientPool.brokerPool.returnObject(address, client);
|
||||
return Status.OK;
|
||||
// TODO(ftw): when we implement exporting tablet one by one, we should makeSnapshots here
|
||||
// Status snapshotStatus = job.makeSnapshots();
|
||||
// if (!snapshotStatus.ok()) {
|
||||
// job.cancel(ExportFailMsg.CancelType.RUN_FAIL, snapshotStatus.getErrorMsg());
|
||||
// return;
|
||||
// }
|
||||
|
||||
if (job.updateState(ExportJob.JobState.EXPORTING)) {
|
||||
LOG.info("Exchange pending status to exporting status success. job: {}", job);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,133 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.task;
|
||||
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.Status;
|
||||
import org.apache.doris.load.ExportFailMsg;
|
||||
import org.apache.doris.load.ExportJob;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.thrift.TAgentResult;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TPaloScanRange;
|
||||
import org.apache.doris.thrift.TScanRange;
|
||||
import org.apache.doris.thrift.TScanRangeLocation;
|
||||
import org.apache.doris.thrift.TScanRangeLocations;
|
||||
import org.apache.doris.thrift.TSnapshotRequest;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
import org.apache.doris.thrift.TypesConstants;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class ExportPendingTask extends MasterTask {
|
||||
private static final Logger LOG = LogManager.getLogger(ExportPendingTask.class);
|
||||
|
||||
protected final ExportJob job;
|
||||
protected Database db;
|
||||
|
||||
public ExportPendingTask(ExportJob job) {
|
||||
super();
|
||||
this.job = job;
|
||||
this.signature = job.getId();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void exec() {
|
||||
if (job.getState() != ExportJob.JobState.PENDING) {
|
||||
return;
|
||||
}
|
||||
|
||||
long dbId = job.getDbId();
|
||||
db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
|
||||
if (db == null) {
|
||||
job.cancel(ExportFailMsg.CancelType.RUN_FAIL, "database does not exist");
|
||||
return;
|
||||
}
|
||||
|
||||
if (job.isReplayed()) {
|
||||
// If the job is created from replay thread, all plan info will be lost.
|
||||
// so the job has to be cancelled.
|
||||
String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled.";
|
||||
job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
|
||||
return;
|
||||
}
|
||||
|
||||
// make snapshots
|
||||
Status snapshotStatus = makeSnapshots();
|
||||
if (!snapshotStatus.ok()) {
|
||||
job.cancel(ExportFailMsg.CancelType.RUN_FAIL, snapshotStatus.getErrorMsg());
|
||||
return;
|
||||
}
|
||||
|
||||
if (job.updateState(ExportJob.JobState.EXPORTING)) {
|
||||
LOG.info("submit pending export job success. job: {}", job);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private Status makeSnapshots() {
|
||||
List<TScanRangeLocations> tabletLocations = job.getTabletLocations();
|
||||
if (tabletLocations == null) {
|
||||
return Status.OK;
|
||||
}
|
||||
for (TScanRangeLocations tablet : tabletLocations) {
|
||||
TScanRange scanRange = tablet.getScanRange();
|
||||
if (!scanRange.isSetPaloScanRange()) {
|
||||
continue;
|
||||
}
|
||||
TPaloScanRange paloScanRange = scanRange.getPaloScanRange();
|
||||
List<TScanRangeLocation> locations = tablet.getLocations();
|
||||
for (TScanRangeLocation location : locations) {
|
||||
TNetworkAddress address = location.getServer();
|
||||
String host = address.getHostname();
|
||||
int port = address.getPort();
|
||||
Backend backend = Env.getCurrentSystemInfo().getBackendWithBePort(host, port);
|
||||
if (backend == null) {
|
||||
return Status.CANCELLED;
|
||||
}
|
||||
long backendId = backend.getId();
|
||||
if (!Env.getCurrentSystemInfo().checkBackendQueryAvailable(backendId)) {
|
||||
return Status.CANCELLED;
|
||||
}
|
||||
TSnapshotRequest snapshotRequest = new TSnapshotRequest();
|
||||
snapshotRequest.setTabletId(paloScanRange.getTabletId());
|
||||
snapshotRequest.setSchemaHash(Integer.parseInt(paloScanRange.getSchemaHash()));
|
||||
snapshotRequest.setVersion(Long.parseLong(paloScanRange.getVersion()));
|
||||
snapshotRequest.setTimeout(job.getTimeoutSecond());
|
||||
snapshotRequest.setPreferredSnapshotVersion(TypesConstants.TPREFER_SNAPSHOT_REQ_VERSION);
|
||||
|
||||
AgentClient client = new AgentClient(host, port);
|
||||
TAgentResult result = client.makeSnapshot(snapshotRequest);
|
||||
if (result == null || result.getStatus().getStatusCode() != TStatusCode.OK) {
|
||||
String err = "snapshot for tablet " + paloScanRange.getTabletId() + " failed on backend "
|
||||
+ address.toString() + ". reason: "
|
||||
+ (result == null ? "unknown" : result.getStatus().error_msgs);
|
||||
LOG.warn("{}, export job: {}", err, job.getId());
|
||||
return new Status(TStatusCode.CANCELLED, err);
|
||||
}
|
||||
job.addSnapshotPath(Pair.of(address, result.getSnapshotPath()));
|
||||
}
|
||||
}
|
||||
return Status.OK;
|
||||
}
|
||||
}
|
||||
@ -156,7 +156,7 @@ public class CancelExportStmtTest extends TestWithFeService {
|
||||
ExportJob job2 = new ExportJob();
|
||||
job2.updateState(ExportJob.JobState.CANCELLED, true);
|
||||
ExportJob job3 = new ExportJob();
|
||||
job3.updateState(ExportJob.JobState.EXPORTING, true);
|
||||
job3.updateState(ExportJob.JobState.EXPORTING, false);
|
||||
ExportJob job4 = new ExportJob();
|
||||
exportJobList1.add(job1);
|
||||
exportJobList1.add(job2);
|
||||
|
||||
@ -18,7 +18,6 @@
|
||||
package org.apache.doris.load.loadv2;
|
||||
|
||||
import org.apache.doris.analysis.BrokerDesc;
|
||||
import org.apache.doris.analysis.LoadStmt;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.common.jmockit.Deencapsulation;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
@ -27,7 +26,6 @@ import org.apache.doris.load.ExportMgr;
|
||||
import org.apache.doris.mysql.privilege.AccessControllerManager;
|
||||
import org.apache.doris.mysql.privilege.MockedAuth;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import mockit.Mocked;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -35,7 +33,6 @@ import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class ExportMgrTest {
|
||||
private final ExportMgr exportMgr = new ExportMgr();
|
||||
@ -89,12 +86,7 @@ public class ExportMgrTest {
|
||||
BrokerDesc bd = new BrokerDesc("broker", new HashMap<>());
|
||||
Deencapsulation.setField(job1, "brokerDesc", bd);
|
||||
|
||||
Map<String, String> properties = Maps.newHashMap();
|
||||
properties.put(LoadStmt.EXEC_MEM_LIMIT, "-1");
|
||||
properties.put(LoadStmt.TIMEOUT_PROPERTY, "-1");
|
||||
Deencapsulation.setField(job1, "properties", properties);
|
||||
|
||||
|
||||
Deencapsulation.setField(job1, "timeoutSecond", -1);
|
||||
return job1;
|
||||
}
|
||||
|
||||
|
||||
@ -34,7 +34,6 @@ import org.apache.doris.task.ExportExportingTask;
|
||||
import org.apache.doris.thrift.TQueryOptions;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import mockit.Expectations;
|
||||
import org.junit.Assert;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
@ -71,6 +70,7 @@ public class SessionVariablesTest extends TestWithFeService {
|
||||
|
||||
@Test
|
||||
public void testExperimentalSessionVariables() throws Exception {
|
||||
connectContext.setThreadLocalInfo();
|
||||
// 1. set without experimental
|
||||
SessionVariable sessionVar = connectContext.getSessionVariable();
|
||||
boolean enableNereids = sessionVar.isEnableNereidsPlanner();
|
||||
@ -179,10 +179,6 @@ public class SessionVariablesTest extends TestWithFeService {
|
||||
job.getState();
|
||||
minTimes = 0;
|
||||
result = ExportJob.JobState.EXPORTING;
|
||||
|
||||
job.getCoordList();
|
||||
minTimes = 0;
|
||||
result = Lists.newArrayList();
|
||||
}
|
||||
};
|
||||
|
||||
@ -207,6 +203,7 @@ public class SessionVariablesTest extends TestWithFeService {
|
||||
@Test
|
||||
public void testDisableProfile() {
|
||||
try {
|
||||
connectContext.setThreadLocalInfo();
|
||||
SetStmt setStmt = (SetStmt) parseAndAnalyzeStmt("set enable_profile=false", connectContext);
|
||||
SetExecutor setExecutor = new SetExecutor(connectContext, setStmt);
|
||||
setExecutor.execute();
|
||||
@ -221,10 +218,6 @@ public class SessionVariablesTest extends TestWithFeService {
|
||||
job.getState();
|
||||
minTimes = 0;
|
||||
result = ExportJob.JobState.EXPORTING;
|
||||
|
||||
job.getCoordList();
|
||||
minTimes = 0;
|
||||
result = Lists.newArrayList();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@ -75,4 +75,6 @@ struct TResultBatch {
|
||||
|
||||
// packet seq used to check if there has packet lost
|
||||
3: required i64 packet_seq
|
||||
|
||||
4: optional map<string,string> attached_infos
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user