[Refactor](Export) delete useless code of Export (#24953)
This commit is contained in:
@ -71,10 +71,7 @@ public class ExportStmt extends StatementBase {
|
||||
private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
|
||||
.add(LABEL)
|
||||
.add(PARALLELISM)
|
||||
.add(LoadStmt.EXEC_MEM_LIMIT)
|
||||
.add(LoadStmt.TIMEOUT_PROPERTY)
|
||||
.add(LoadStmt.KEY_IN_PARAM_COLUMNS)
|
||||
.add(LoadStmt.TIMEOUT_PROPERTY)
|
||||
.add(OutFileClause.PROP_MAX_FILE_SIZE)
|
||||
.add(OutFileClause.PROP_DELETE_EXISTING_FILES)
|
||||
.add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR)
|
||||
|
||||
@ -1512,8 +1512,6 @@ public class Env {
|
||||
loadJobScheduler.start();
|
||||
loadEtlChecker.start();
|
||||
loadLoadingChecker.start();
|
||||
// export task
|
||||
exportMgr.start();
|
||||
// Tablet checker and scheduler
|
||||
tabletChecker.start();
|
||||
tabletScheduler.start();
|
||||
|
||||
@ -71,9 +71,7 @@ import org.apache.doris.persist.gson.GsonUtils;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.OriginStatement;
|
||||
import org.apache.doris.qe.SessionVariable;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.scheduler.exception.JobException;
|
||||
import org.apache.doris.task.ExportExportingTask;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -93,7 +91,6 @@ import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -176,16 +173,8 @@ public class ExportJob implements Writable {
|
||||
|
||||
private int parallelism;
|
||||
|
||||
public Map<String, Long> getPartitionToVersion() {
|
||||
return partitionToVersion;
|
||||
}
|
||||
|
||||
private Map<String, Long> partitionToVersion = Maps.newHashMap();
|
||||
|
||||
// The selectStmt is sql 'select ... into outfile ...'
|
||||
// TODO(ftw): delete
|
||||
private List<SelectStmt> selectStmtList = Lists.newArrayList();
|
||||
|
||||
/**
|
||||
* Each parallel has an associated Outfile list
|
||||
* which are organized into a two-dimensional list.
|
||||
@ -194,11 +183,6 @@ public class ExportJob implements Writable {
|
||||
*/
|
||||
private List<List<StatementBase>> selectStmtListPerParallel = Lists.newArrayList();
|
||||
|
||||
private List<List<String>> outfileSqlPerParallel = Lists.newArrayList();
|
||||
|
||||
|
||||
private List<StmtExecutor> stmtExecutorList;
|
||||
|
||||
private List<String> exportColumns = Lists.newArrayList();
|
||||
|
||||
private TableIf exportTable;
|
||||
@ -208,10 +192,6 @@ public class ExportJob implements Writable {
|
||||
|
||||
private SessionVariable sessionVariables;
|
||||
|
||||
private Thread doExportingThread;
|
||||
|
||||
private ExportExportingTask task;
|
||||
|
||||
// backend_address => snapshot path
|
||||
private List<Pair<TNetworkAddress, String>> snapshotPaths = Lists.newArrayList();
|
||||
|
||||
@ -408,58 +388,6 @@ public class ExportJob implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
private void generateQueryStmtOld() throws UserException {
|
||||
SelectList list = new SelectList();
|
||||
if (exportColumns.isEmpty()) {
|
||||
list.addItem(SelectListItem.createStarItem(this.tableName));
|
||||
} else {
|
||||
for (Column column : exportTable.getBaseSchema()) {
|
||||
String colName = column.getName().toLowerCase();
|
||||
if (exportColumns.contains(colName)) {
|
||||
SlotRef slotRef = new SlotRef(this.tableName, colName);
|
||||
SelectListItem selectListItem = new SelectListItem(slotRef, null);
|
||||
list.addItem(selectListItem);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
List<List<Long>> tabletsListPerQuery = splitTablets();
|
||||
|
||||
List<List<TableRef>> tableRefListPerQuery = Lists.newArrayList();
|
||||
for (List<Long> tabletsList : tabletsListPerQuery) {
|
||||
TableRef tblRef = new TableRef(this.tableRef.getName(), this.tableRef.getAlias(), null,
|
||||
(ArrayList) tabletsList, this.tableRef.getTableSample(), this.tableRef.getCommonHints());
|
||||
List<TableRef> tableRefList = Lists.newArrayList();
|
||||
tableRefList.add(tblRef);
|
||||
tableRefListPerQuery.add(tableRefList);
|
||||
}
|
||||
LOG.info("Export task is split into {} outfile statements.", tableRefListPerQuery.size());
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
for (int i = 0; i < tableRefListPerQuery.size(); i++) {
|
||||
LOG.debug("Outfile clause {} is responsible for tables: {}", i,
|
||||
tableRefListPerQuery.get(i).get(0).getSampleTabletIds());
|
||||
}
|
||||
}
|
||||
|
||||
for (List<TableRef> tableRefList : tableRefListPerQuery) {
|
||||
FromClause fromClause = new FromClause(tableRefList);
|
||||
// generate outfile clause
|
||||
OutFileClause outfile = new OutFileClause(this.exportPath, this.format, convertOutfileProperties());
|
||||
SelectStmt selectStmt = new SelectStmt(list, fromClause, this.whereExpr, null,
|
||||
null, null, LimitElement.NO_LIMIT);
|
||||
selectStmt.setOutFileClause(outfile);
|
||||
selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0));
|
||||
selectStmtList.add(selectStmt);
|
||||
}
|
||||
stmtExecutorList = Arrays.asList(new StmtExecutor[selectStmtList.size()]);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
for (int i = 0; i < selectStmtList.size(); i++) {
|
||||
LOG.debug("Outfile clause {} is: {}", i, selectStmtList.get(i).toSql());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate outfile select stmt
|
||||
* @throws UserException
|
||||
@ -639,49 +567,6 @@ public class ExportJob implements Writable {
|
||||
this.state = newState;
|
||||
}
|
||||
|
||||
// TODO(ftw): delete
|
||||
public synchronized Thread getDoExportingThread() {
|
||||
return doExportingThread;
|
||||
}
|
||||
|
||||
// TODO(ftw): delete
|
||||
public synchronized void setDoExportingThread(Thread isExportingThread) {
|
||||
this.doExportingThread = isExportingThread;
|
||||
}
|
||||
|
||||
// TODO(ftw): delete
|
||||
public synchronized void setStmtExecutor(int idx, StmtExecutor executor) {
|
||||
this.stmtExecutorList.set(idx, executor);
|
||||
}
|
||||
|
||||
public synchronized StmtExecutor getStmtExecutor(int idx) {
|
||||
return this.stmtExecutorList.get(idx);
|
||||
}
|
||||
|
||||
// TODO(ftw): delete
|
||||
public synchronized void cancel(ExportFailMsg.CancelType type, String msg) {
|
||||
if (msg != null) {
|
||||
failMsg = new ExportFailMsg(type, msg);
|
||||
}
|
||||
|
||||
// maybe user cancel this job
|
||||
if (task != null && state == ExportJobState.EXPORTING && stmtExecutorList != null) {
|
||||
for (int idx = 0; idx < stmtExecutorList.size(); ++idx) {
|
||||
stmtExecutorList.get(idx).cancel();
|
||||
}
|
||||
}
|
||||
|
||||
if (updateState(ExportJobState.CANCELLED, false)) {
|
||||
// release snapshot
|
||||
// Status releaseSnapshotStatus = releaseSnapshotPaths();
|
||||
// if (!releaseSnapshotStatus.ok()) {
|
||||
// // snapshot will be removed by GC thread on BE, finally.
|
||||
// LOG.warn("failed to release snapshot for export job: {}. err: {}", id,
|
||||
// releaseSnapshotStatus.getErrorMsg());
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void updateExportJobState(ExportJobState newState, Long taskId,
|
||||
List<OutfileInfo> outfileInfoList, ExportFailMsg.CancelType type, String msg) throws JobException {
|
||||
switch (newState) {
|
||||
@ -734,15 +619,6 @@ public class ExportJob implements Writable {
|
||||
Env.getCurrentEnv().getEditLog().logExportUpdateState(id, ExportJobState.CANCELLED);
|
||||
}
|
||||
|
||||
// TODO(ftw): delete
|
||||
public synchronized boolean finish(List<OutfileInfo> outfileInfoList) {
|
||||
outfileInfo = GsonUtils.GSON.toJson(outfileInfoList);
|
||||
if (updateState(ExportJobState.FINISHED)) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void exportExportJob() {
|
||||
// The first exportTaskExecutor will set state to EXPORTING,
|
||||
// other exportTaskExecutors do not need to set up state.
|
||||
@ -824,54 +700,6 @@ public class ExportJob implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(ftw): delete
|
||||
public synchronized boolean updateState(ExportJobState newState) {
|
||||
return this.updateState(newState, false);
|
||||
}
|
||||
|
||||
// TODO(ftw): delete
|
||||
public synchronized boolean updateState(ExportJobState newState, boolean isReplay) {
|
||||
// We do not persist EXPORTING state in new version of metadata,
|
||||
// but EXPORTING state may still exist in older versions of metadata.
|
||||
// So if isReplay == true and newState == EXPORTING, we just ignore this update.
|
||||
if (isFinalState() || (isReplay && newState == ExportJobState.EXPORTING)) {
|
||||
return false;
|
||||
}
|
||||
state = newState;
|
||||
switch (newState) {
|
||||
case PENDING:
|
||||
case IN_QUEUE:
|
||||
progress = 0;
|
||||
break;
|
||||
case EXPORTING:
|
||||
// if isReplay == true, startTimeMs will be read from LOG
|
||||
if (!isReplay) {
|
||||
startTimeMs = System.currentTimeMillis();
|
||||
}
|
||||
break;
|
||||
case FINISHED:
|
||||
if (!isReplay) {
|
||||
finishTimeMs = System.currentTimeMillis();
|
||||
}
|
||||
progress = 100;
|
||||
break;
|
||||
case CANCELLED:
|
||||
// if isReplay == true, finishTimeMs will be read from LOG
|
||||
if (!isReplay) {
|
||||
finishTimeMs = System.currentTimeMillis();
|
||||
}
|
||||
break;
|
||||
default:
|
||||
Preconditions.checkState(false, "wrong job state: " + newState.name());
|
||||
break;
|
||||
}
|
||||
// we only persist Pending/Cancel/Finish state
|
||||
if (!isReplay && newState != ExportJobState.IN_QUEUE && newState != ExportJobState.EXPORTING) {
|
||||
Env.getCurrentEnv().getEditLog().logExportUpdateState(id, newState);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public synchronized boolean isFinalState() {
|
||||
return this.state == ExportJobState.CANCELLED || this.state == ExportJobState.FINISHED;
|
||||
}
|
||||
|
||||
@ -19,7 +19,6 @@ package org.apache.doris.load;
|
||||
|
||||
import org.apache.doris.analysis.CancelExportStmt;
|
||||
import org.apache.doris.analysis.CompoundPredicate;
|
||||
import org.apache.doris.analysis.ExportStmt;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
@ -32,13 +31,11 @@ import org.apache.doris.common.LabelAlreadyUsedException;
|
||||
import org.apache.doris.common.PatternMatcher;
|
||||
import org.apache.doris.common.PatternMatcherWrapper;
|
||||
import org.apache.doris.common.util.ListComparator;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.common.util.OrderByPair;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.scheduler.exception.JobException;
|
||||
import org.apache.doris.task.MasterTaskExecutor;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Strings;
|
||||
@ -60,7 +57,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class ExportMgr extends MasterDaemon {
|
||||
public class ExportMgr {
|
||||
private static final Logger LOG = LogManager.getLogger(ExportJob.class);
|
||||
|
||||
// lock for export job
|
||||
@ -70,11 +67,7 @@ public class ExportMgr extends MasterDaemon {
|
||||
private Map<Long, ExportJob> exportIdToJob = Maps.newHashMap(); // exportJobId to exportJob
|
||||
private Map<String, Long> labelToExportJobId = Maps.newHashMap();
|
||||
|
||||
private MasterTaskExecutor exportingExecutor;
|
||||
|
||||
public ExportMgr() {
|
||||
int poolSize = Config.export_running_job_num_limit == 0 ? 5 : Config.export_running_job_num_limit;
|
||||
exportingExecutor = new MasterTaskExecutor("export-exporting-job", poolSize, true);
|
||||
}
|
||||
|
||||
public void readLock() {
|
||||
@ -93,78 +86,10 @@ public class ExportMgr extends MasterDaemon {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
super.start();
|
||||
exportingExecutor.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runAfterCatalogReady() {
|
||||
// List<ExportJob> pendingJobs = getExportJobs(ExportJobState.PENDING);
|
||||
// List<ExportJob> newInQueueJobs = Lists.newArrayList();
|
||||
// for (ExportJob job : pendingJobs) {
|
||||
// if (handlePendingJobs(job)) {
|
||||
// newInQueueJobs.add(job);
|
||||
// }
|
||||
// }
|
||||
// LOG.debug("new IN_QUEUE export job num: {}", newInQueueJobs.size());
|
||||
// for (ExportJob job : newInQueueJobs) {
|
||||
// try {
|
||||
// MasterTask task = new ExportExportingTask(job);
|
||||
// job.setTask((ExportExportingTask) task);
|
||||
// if (exportingExecutor.submit(task)) {
|
||||
// LOG.info("success to submit IN_QUEUE export job. job: {}", job);
|
||||
// } else {
|
||||
// LOG.info("fail to submit IN_QUEUE job to executor. job: {}", job);
|
||||
// }
|
||||
// } catch (Exception e) {
|
||||
// LOG.warn("run export exporting job {}.", job, e);
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
private boolean handlePendingJobs(ExportJob job) {
|
||||
// because maybe this job has been cancelled by user.
|
||||
if (job.getState() != ExportJobState.PENDING) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (job.isReplayed()) {
|
||||
// If the job is created from replay thread, all plan info will be lost.
|
||||
// so the job has to be cancelled.
|
||||
String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled.";
|
||||
job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (job.updateState(ExportJobState.IN_QUEUE)) {
|
||||
LOG.info("Exchange pending status to in_queue status success. job: {}", job);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public List<ExportJob> getJobs() {
|
||||
return Lists.newArrayList(exportIdToJob.values());
|
||||
}
|
||||
|
||||
public void addExportJob(ExportStmt stmt) throws Exception {
|
||||
long jobId = Env.getCurrentEnv().getNextId();
|
||||
ExportJob job = createJob(jobId, stmt);
|
||||
writeLock();
|
||||
try {
|
||||
if (labelToExportJobId.containsKey(job.getLabel())) {
|
||||
throw new LabelAlreadyUsedException(job.getLabel());
|
||||
}
|
||||
unprotectAddJob(job);
|
||||
Env.getCurrentEnv().getEditLog().logExportCreate(job);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
LOG.info("add export job. {}", job);
|
||||
}
|
||||
|
||||
public void addExportJobAndRegisterTask(ExportJob job) throws Exception {
|
||||
long jobId = Env.getCurrentEnv().getNextId();
|
||||
job.setId(jobId);
|
||||
@ -249,12 +174,6 @@ public class ExportMgr extends MasterDaemon {
|
||||
};
|
||||
}
|
||||
|
||||
private ExportJob createJob(long jobId, ExportStmt stmt) {
|
||||
ExportJob exportJob = stmt.getExportJob();
|
||||
exportJob.setId(jobId);
|
||||
return exportJob;
|
||||
}
|
||||
|
||||
public ExportJob getJob(long jobId) {
|
||||
ExportJob job;
|
||||
readLock();
|
||||
@ -266,22 +185,6 @@ public class ExportMgr extends MasterDaemon {
|
||||
return job;
|
||||
}
|
||||
|
||||
public List<ExportJob> getExportJobs(ExportJobState state) {
|
||||
List<ExportJob> result = Lists.newArrayList();
|
||||
readLock();
|
||||
try {
|
||||
for (ExportJob job : exportIdToJob.values()) {
|
||||
if (job.getState() == state) {
|
||||
result.add(job);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// used for `show export` statement
|
||||
// NOTE: jobid and states may both specified, or only one of them, or neither
|
||||
public List<List<String>> getExportJobInfosByIdOrState(
|
||||
|
||||
@ -79,10 +79,7 @@ public class ExportCommand extends Command implements ForwardWithSync {
|
||||
private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
|
||||
.add(LABEL)
|
||||
.add(PARALLELISM)
|
||||
.add(LoadStmt.EXEC_MEM_LIMIT)
|
||||
.add(LoadStmt.TIMEOUT_PROPERTY)
|
||||
.add(LoadStmt.KEY_IN_PARAM_COLUMNS)
|
||||
.add(LoadStmt.TIMEOUT_PROPERTY)
|
||||
.add(OutFileClause.PROP_MAX_FILE_SIZE)
|
||||
.add(OutFileClause.PROP_DELETE_EXISTING_FILES)
|
||||
.add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR)
|
||||
|
||||
@ -2328,7 +2328,6 @@ public class StmtExecutor {
|
||||
|
||||
private void handleExportStmt() throws Exception {
|
||||
ExportStmt exportStmt = (ExportStmt) parsedStmt;
|
||||
// context.getEnv().getExportMgr().addExportJob(exportStmt);
|
||||
context.getEnv().getExportMgr().addExportJobAndRegisterTask(exportStmt.getExportJob());
|
||||
}
|
||||
|
||||
|
||||
@ -1,281 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.task;
|
||||
|
||||
import org.apache.doris.analysis.OutFileClause;
|
||||
import org.apache.doris.analysis.SelectStmt;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.TabletMeta;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.load.ExportFailMsg;
|
||||
import org.apache.doris.load.ExportFailMsg.CancelType;
|
||||
import org.apache.doris.load.ExportJob;
|
||||
import org.apache.doris.load.ExportJobState;
|
||||
import org.apache.doris.load.OutfileInfo;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.QueryState.MysqlStateType;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ExecutorCompletionService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
public class ExportExportingTask extends MasterTask {
|
||||
private static final Logger LOG = LogManager.getLogger(ExportExportingTask.class);
|
||||
|
||||
protected final ExportJob job;
|
||||
|
||||
ThreadPoolExecutor exportExecPool = ThreadPoolManager.newDaemonCacheThreadPool(
|
||||
Config.maximum_parallelism_of_export_job, "exporting-pool-", false);
|
||||
|
||||
public ExportExportingTask(ExportJob job) {
|
||||
this.job = job;
|
||||
this.signature = job.getId();
|
||||
}
|
||||
|
||||
private class ExportResult {
|
||||
private boolean isFailed;
|
||||
|
||||
private ExportFailMsg failMsg;
|
||||
|
||||
private OutfileInfo outfileInfo;
|
||||
|
||||
public ExportResult(boolean isFailed, ExportFailMsg failMsg, OutfileInfo outfileInfo) {
|
||||
this.isFailed = isFailed;
|
||||
this.failMsg = failMsg;
|
||||
this.outfileInfo = outfileInfo;
|
||||
}
|
||||
|
||||
|
||||
public boolean isFailed() {
|
||||
return isFailed;
|
||||
}
|
||||
|
||||
public ExportFailMsg getFailMsg() {
|
||||
return failMsg;
|
||||
}
|
||||
|
||||
public OutfileInfo getOutfileInfo() {
|
||||
return outfileInfo;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void exec() {
|
||||
if (job.getState() == ExportJobState.IN_QUEUE) {
|
||||
handleInQueueState();
|
||||
}
|
||||
|
||||
if (job.getState() != ExportJobState.EXPORTING) {
|
||||
return;
|
||||
}
|
||||
LOG.info("begin execute export job in exporting state. job: {}", job);
|
||||
|
||||
synchronized (job) {
|
||||
if (job.getDoExportingThread() != null) {
|
||||
LOG.warn("export task is already being executed.");
|
||||
return;
|
||||
}
|
||||
job.setDoExportingThread(Thread.currentThread());
|
||||
}
|
||||
|
||||
List<SelectStmt> selectStmtList = job.getSelectStmtList();
|
||||
int completeTaskNum = 0;
|
||||
List<OutfileInfo> outfileInfoList = Lists.newArrayList();
|
||||
|
||||
int parallelNum = selectStmtList.size();
|
||||
CompletionService<ExportResult> completionService = new ExecutorCompletionService<>(exportExecPool);
|
||||
|
||||
// begin exporting
|
||||
for (int i = 0; i < parallelNum; ++i) {
|
||||
final int idx = i;
|
||||
completionService.submit(() -> {
|
||||
// maybe user cancelled this job
|
||||
if (job.getState() != ExportJobState.EXPORTING) {
|
||||
return new ExportResult(true, null, null);
|
||||
}
|
||||
try {
|
||||
Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(
|
||||
job.getTableName().getDb());
|
||||
OlapTable table = db.getOlapTableOrAnalysisException(job.getTableName().getTbl());
|
||||
table.readLock();
|
||||
try {
|
||||
SelectStmt selectStmt = selectStmtList.get(idx);
|
||||
List<Long> tabletIds = selectStmt.getTableRefs().get(0).getSampleTabletIds();
|
||||
for (Long tabletId : tabletIds) {
|
||||
TabletMeta tabletMeta = Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta(
|
||||
tabletId);
|
||||
Partition partition = table.getPartition(tabletMeta.getPartitionId());
|
||||
long nowVersion = partition.getVisibleVersion();
|
||||
long oldVersion = job.getPartitionToVersion().get(partition.getName());
|
||||
if (nowVersion != oldVersion) {
|
||||
LOG.warn("Tablet {} has changed version, old version = {}, now version = {}",
|
||||
tabletId, oldVersion, nowVersion);
|
||||
return new ExportResult(true, new ExportFailMsg(
|
||||
ExportFailMsg.CancelType.RUN_FAIL,
|
||||
"Tablet {" + tabletId + "} has changed"), null);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
table.readUnlock();
|
||||
}
|
||||
} catch (AnalysisException e) {
|
||||
return new ExportResult(true,
|
||||
new ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL, e.getMessage()), null);
|
||||
}
|
||||
try (AutoCloseConnectContext r = buildConnectContext()) {
|
||||
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, selectStmtList.get(idx));
|
||||
job.setStmtExecutor(idx, stmtExecutor);
|
||||
stmtExecutor.execute();
|
||||
if (r.connectContext.getState().getStateType() == MysqlStateType.ERR) {
|
||||
return new ExportResult(true, new ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL,
|
||||
r.connectContext.getState().getErrorMessage()), null);
|
||||
}
|
||||
OutfileInfo outfileInfo = getOutFileInfo(r.connectContext.getResultAttachedInfo());
|
||||
return new ExportResult(false, null, outfileInfo);
|
||||
} catch (Exception e) {
|
||||
return new ExportResult(true, new ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL,
|
||||
e.getMessage()),
|
||||
null);
|
||||
} finally {
|
||||
job.getStmtExecutor(idx).addProfileToSpan();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
Boolean isFailed = false;
|
||||
ExportFailMsg failMsg = new ExportFailMsg();
|
||||
try {
|
||||
for (int i = 0; i < parallelNum; ++i) {
|
||||
Future<ExportResult> future = completionService.take();
|
||||
ExportResult result = future.get();
|
||||
if (!result.isFailed) {
|
||||
outfileInfoList.add(result.getOutfileInfo());
|
||||
++completeTaskNum;
|
||||
int progress = completeTaskNum * 100 / selectStmtList.size();
|
||||
if (progress >= 100) {
|
||||
progress = 99;
|
||||
}
|
||||
job.setProgress(progress);
|
||||
LOG.info("Export Job {} finished {} outfile export and it's progress is {}%", job.getId(),
|
||||
completeTaskNum, progress);
|
||||
} else {
|
||||
isFailed = true;
|
||||
failMsg.setCancelType(result.failMsg.getCancelType());
|
||||
failMsg.setMsg(result.failMsg.getMsg());
|
||||
LOG.warn("Exporting task failed because: {}", result.failMsg.getMsg());
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
isFailed = true;
|
||||
failMsg.setCancelType(CancelType.RUN_FAIL);
|
||||
failMsg.setMsg(e.getMessage());
|
||||
} finally {
|
||||
// cancel all executor
|
||||
if (isFailed) {
|
||||
for (int idx = 0; idx < parallelNum; ++idx) {
|
||||
job.getStmtExecutor(idx).cancel();
|
||||
}
|
||||
}
|
||||
exportExecPool.shutdownNow();
|
||||
}
|
||||
|
||||
if (isFailed) {
|
||||
job.cancel(failMsg.getCancelType(), failMsg.getMsg());
|
||||
return;
|
||||
}
|
||||
|
||||
if (job.finish(outfileInfoList)) {
|
||||
LOG.info("export job success. job: {}", job);
|
||||
// TODO(ftw): when we implement exporting tablet one by one, we should release snapshot here
|
||||
// release snapshot
|
||||
// Status releaseSnapshotStatus = job.releaseSnapshotPaths();
|
||||
// if (!releaseSnapshotStatus.ok()) {
|
||||
// // even if release snapshot failed, do not cancel this job.
|
||||
// // snapshot will be removed by GC thread on BE, finally.
|
||||
// LOG.warn("failed to release snapshot for export job: {}. err: {}", job.getId(),
|
||||
// releaseSnapshotStatus.getErrorMsg());
|
||||
// }
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
job.setDoExportingThread(null);
|
||||
}
|
||||
}
|
||||
|
||||
private AutoCloseConnectContext buildConnectContext() {
|
||||
ConnectContext connectContext = new ConnectContext();
|
||||
connectContext.setSessionVariable(job.getSessionVariables());
|
||||
connectContext.setEnv(Env.getCurrentEnv());
|
||||
connectContext.setDatabase(job.getTableName().getDb());
|
||||
connectContext.setQualifiedUser(job.getQualifiedUser());
|
||||
connectContext.setCurrentUserIdentity(job.getUserIdentity());
|
||||
UUID uuid = UUID.randomUUID();
|
||||
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
|
||||
connectContext.setQueryId(queryId);
|
||||
connectContext.setStartTime();
|
||||
connectContext.setCluster(SystemInfoService.DEFAULT_CLUSTER);
|
||||
return new AutoCloseConnectContext(connectContext);
|
||||
}
|
||||
|
||||
private OutfileInfo getOutFileInfo(Map<String, String> resultAttachedInfo) {
|
||||
OutfileInfo outfileInfo = new OutfileInfo();
|
||||
outfileInfo.setFileNumber(resultAttachedInfo.get(OutFileClause.FILE_NUMBER));
|
||||
outfileInfo.setTotalRows(resultAttachedInfo.get(OutFileClause.TOTAL_ROWS));
|
||||
outfileInfo.setFileSize(resultAttachedInfo.get(OutFileClause.FILE_SIZE) + "bytes");
|
||||
outfileInfo.setUrl(resultAttachedInfo.get(OutFileClause.URL));
|
||||
return outfileInfo;
|
||||
}
|
||||
|
||||
private void handleInQueueState() {
|
||||
long dbId = job.getDbId();
|
||||
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
|
||||
if (db == null) {
|
||||
job.cancel(ExportFailMsg.CancelType.RUN_FAIL, "database does not exist");
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO(ftw): when we implement exporting tablet one by one, we should makeSnapshots here
|
||||
// Status snapshotStatus = job.makeSnapshots();
|
||||
// if (!snapshotStatus.ok()) {
|
||||
// job.cancel(ExportFailMsg.CancelType.RUN_FAIL, snapshotStatus.getErrorMsg());
|
||||
// return;
|
||||
// }
|
||||
|
||||
if (job.updateState(ExportJobState.EXPORTING)) {
|
||||
LOG.info("Exchange pending status to exporting status success. job: {}", job);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -32,6 +32,7 @@ import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.wildfly.common.Assert;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.List;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
@ -155,19 +156,27 @@ public class CancelExportStmtTest extends TestWithFeService {
|
||||
List<ExportJob> exportJobList2 = Lists.newLinkedList();
|
||||
ExportJob job1 = new ExportJob();
|
||||
ExportJob job2 = new ExportJob();
|
||||
job2.updateState(ExportJobState.CANCELLED, true);
|
||||
ExportJob job3 = new ExportJob();
|
||||
job3.updateState(ExportJobState.EXPORTING, false);
|
||||
ExportJob job4 = new ExportJob();
|
||||
ExportJob job5 = new ExportJob();
|
||||
job5.updateState(ExportJobState.IN_QUEUE, false);
|
||||
|
||||
|
||||
try {
|
||||
Method setExportJobState = job1.getClass().getDeclaredMethod("setExportJobState",
|
||||
ExportJobState.class);
|
||||
setExportJobState.setAccessible(true);
|
||||
setExportJobState.invoke(job2, ExportJobState.CANCELLED);
|
||||
setExportJobState.invoke(job3, ExportJobState.EXPORTING);
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new UserException(e);
|
||||
}
|
||||
|
||||
exportJobList1.add(job1);
|
||||
exportJobList1.add(job2);
|
||||
exportJobList1.add(job3);
|
||||
exportJobList1.add(job4);
|
||||
exportJobList2.add(job1);
|
||||
exportJobList2.add(job2);
|
||||
exportJobList2.add(job5);
|
||||
|
||||
SlotRef stateSlotRef = new SlotRef(null, "state");
|
||||
StringLiteral stateStringLiteral = new StringLiteral("PENDING");
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.qe;
|
||||
|
||||
import org.apache.doris.analysis.ExportStmt;
|
||||
import org.apache.doris.analysis.SetStmt;
|
||||
import org.apache.doris.analysis.ShowVariablesStmt;
|
||||
import org.apache.doris.common.CaseSensibility;
|
||||
@ -28,14 +27,9 @@ import org.apache.doris.common.PatternMatcher;
|
||||
import org.apache.doris.common.PatternMatcherWrapper;
|
||||
import org.apache.doris.common.VariableAnnotation;
|
||||
import org.apache.doris.common.util.ProfileManager;
|
||||
import org.apache.doris.common.util.RuntimeProfile;
|
||||
import org.apache.doris.load.ExportJob;
|
||||
import org.apache.doris.load.ExportJobState;
|
||||
import org.apache.doris.task.ExportExportingTask;
|
||||
import org.apache.doris.thrift.TQueryOptions;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
|
||||
import mockit.Expectations;
|
||||
import org.junit.Assert;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@ -162,72 +156,4 @@ public class SessionVariablesTest extends TestWithFeService {
|
||||
Assertions.assertEquals(123, sessionVariable.getQueryTimeoutS());
|
||||
Assertions.assertEquals(123, sessionVariable.getInsertTimeoutS());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnableProfile() {
|
||||
try {
|
||||
SetStmt setStmt = (SetStmt) parseAndAnalyzeStmt("set enable_profile=true", connectContext);
|
||||
SetExecutor setExecutor = new SetExecutor(connectContext, setStmt);
|
||||
setExecutor.execute();
|
||||
|
||||
ExportStmt exportStmt = (ExportStmt)
|
||||
parseAndAnalyzeStmt("EXPORT TABLE test_d.test_t1 TO \"file:///tmp/test_t1\"", connectContext);
|
||||
ExportJob job = exportStmt.getExportJob();
|
||||
job.setId(1234);
|
||||
|
||||
new Expectations(job) {
|
||||
{
|
||||
job.getState();
|
||||
minTimes = 0;
|
||||
result = ExportJobState.EXPORTING;
|
||||
}
|
||||
};
|
||||
|
||||
ExportExportingTask task = new ExportExportingTask(job);
|
||||
task.run();
|
||||
Assertions.assertTrue(job.isFinalState());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
Assertions.fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDisableProfile() {
|
||||
try {
|
||||
connectContext.setThreadLocalInfo();
|
||||
SetStmt setStmt = (SetStmt) parseAndAnalyzeStmt("set enable_profile=false", connectContext);
|
||||
SetExecutor setExecutor = new SetExecutor(connectContext, setStmt);
|
||||
setExecutor.execute();
|
||||
|
||||
ExportStmt exportStmt = (ExportStmt)
|
||||
parseAndAnalyzeStmt("EXPORT TABLE test_d.test_t1 TO \"file:///tmp/test_t1\"", connectContext);
|
||||
ExportJob job = exportStmt.getExportJob();
|
||||
job.setId(1234);
|
||||
|
||||
new Expectations(job) {
|
||||
{
|
||||
job.getState();
|
||||
minTimes = 0;
|
||||
result = ExportJobState.EXPORTING;
|
||||
}
|
||||
};
|
||||
|
||||
new Expectations(profileManager) {
|
||||
{
|
||||
profileManager.pushProfile((RuntimeProfile) any);
|
||||
// if enable_profile=false, method pushProfile will not be called
|
||||
times = 0;
|
||||
}
|
||||
};
|
||||
|
||||
ExportExportingTask task = new ExportExportingTask(job);
|
||||
task.run();
|
||||
Assertions.assertTrue(job.isFinalState());
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
Assertions.fail(e.getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user