From f3b15d859b84c7f73bcfe84b6e2c898e3903064d Mon Sep 17 00:00:00 2001 From: Tiewei Fang <43782773+BePPPower@users.noreply.github.com> Date: Sun, 8 Oct 2023 13:49:09 +0800 Subject: [PATCH] [Refactor](Export) delete useless code of Export (#24953) --- .../java/org/apache/doris/common/Config.java | 12 - .../org/apache/doris/analysis/ExportStmt.java | 3 - .../java/org/apache/doris/catalog/Env.java | 2 - .../java/org/apache/doris/load/ExportJob.java | 172 ----------- .../java/org/apache/doris/load/ExportMgr.java | 99 +----- .../trees/plans/commands/ExportCommand.java | 3 - .../org/apache/doris/qe/StmtExecutor.java | 1 - .../doris/task/ExportExportingTask.java | 281 ------------------ .../doris/analysis/CancelExportStmtTest.java | 19 +- .../apache/doris/qe/SessionVariablesTest.java | 74 ----- 10 files changed, 15 insertions(+), 651 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index a049c651da..d32ea94948 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -665,18 +665,6 @@ public class Config extends ConfigBase { "单个 broker scanner 的最大并发数。", "Maximal concurrency of broker scanners."}) public static int max_broker_concurrency = 10; - @ConfField(mutable = true, masterOnly = true, description = { - "导出作业的最大并发数。", "Limitation of the concurrency of running export jobs."}) - public static int export_running_job_num_limit = 5; - - @ConfField(mutable = true, masterOnly = true, description = { - "导出作业的默认超时时间。", "Default timeout of export jobs."}) - public static int export_task_default_timeout_second = 2 * 3600; // 2h - - @ConfField(mutable = true, masterOnly = true, description = { - "每个导出作业的需要处理的 tablet 数量。", "Number of tablets need to be handled per export job."}) - public static int export_tablet_num_per_task = 5; - // TODO(cmy): Disable by default because current checksum logic has some bugs. @ConfField(mutable = true, masterOnly = true, description = { "一致性检查的开始时间。与 `consistency_check_end_time` 配合使用,决定一致性检查的起止时间。" diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java index 6d6855180b..35e658d446 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java @@ -71,10 +71,7 @@ public class ExportStmt extends StatementBase { private static final ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() .add(LABEL) .add(PARALLELISM) - .add(LoadStmt.EXEC_MEM_LIMIT) - .add(LoadStmt.TIMEOUT_PROPERTY) .add(LoadStmt.KEY_IN_PARAM_COLUMNS) - .add(LoadStmt.TIMEOUT_PROPERTY) .add(OutFileClause.PROP_MAX_FILE_SIZE) .add(OutFileClause.PROP_DELETE_EXISTING_FILES) .add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index a0cdc7068f..6704281287 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1512,8 +1512,6 @@ public class Env { loadJobScheduler.start(); loadEtlChecker.start(); loadLoadingChecker.start(); - // export task - exportMgr.start(); // Tablet checker and scheduler tabletChecker.start(); tabletScheduler.start(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index f0daae1ebe..b0df62afb0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -71,9 +71,7 @@ import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; -import org.apache.doris.qe.StmtExecutor; import org.apache.doris.scheduler.exception.JobException; -import org.apache.doris.task.ExportExportingTask; import org.apache.doris.thrift.TNetworkAddress; import com.google.common.base.Preconditions; @@ -93,7 +91,6 @@ import java.io.DataOutput; import java.io.IOException; import java.io.StringReader; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Map; @@ -176,16 +173,8 @@ public class ExportJob implements Writable { private int parallelism; - public Map getPartitionToVersion() { - return partitionToVersion; - } - private Map partitionToVersion = Maps.newHashMap(); - // The selectStmt is sql 'select ... into outfile ...' - // TODO(ftw): delete - private List selectStmtList = Lists.newArrayList(); - /** * Each parallel has an associated Outfile list * which are organized into a two-dimensional list. @@ -194,11 +183,6 @@ public class ExportJob implements Writable { */ private List> selectStmtListPerParallel = Lists.newArrayList(); - private List> outfileSqlPerParallel = Lists.newArrayList(); - - - private List stmtExecutorList; - private List exportColumns = Lists.newArrayList(); private TableIf exportTable; @@ -208,10 +192,6 @@ public class ExportJob implements Writable { private SessionVariable sessionVariables; - private Thread doExportingThread; - - private ExportExportingTask task; - // backend_address => snapshot path private List> snapshotPaths = Lists.newArrayList(); @@ -408,58 +388,6 @@ public class ExportJob implements Writable { } } - private void generateQueryStmtOld() throws UserException { - SelectList list = new SelectList(); - if (exportColumns.isEmpty()) { - list.addItem(SelectListItem.createStarItem(this.tableName)); - } else { - for (Column column : exportTable.getBaseSchema()) { - String colName = column.getName().toLowerCase(); - if (exportColumns.contains(colName)) { - SlotRef slotRef = new SlotRef(this.tableName, colName); - SelectListItem selectListItem = new SelectListItem(slotRef, null); - list.addItem(selectListItem); - } - } - } - - List> tabletsListPerQuery = splitTablets(); - - List> tableRefListPerQuery = Lists.newArrayList(); - for (List tabletsList : tabletsListPerQuery) { - TableRef tblRef = new TableRef(this.tableRef.getName(), this.tableRef.getAlias(), null, - (ArrayList) tabletsList, this.tableRef.getTableSample(), this.tableRef.getCommonHints()); - List tableRefList = Lists.newArrayList(); - tableRefList.add(tblRef); - tableRefListPerQuery.add(tableRefList); - } - LOG.info("Export task is split into {} outfile statements.", tableRefListPerQuery.size()); - - if (LOG.isDebugEnabled()) { - for (int i = 0; i < tableRefListPerQuery.size(); i++) { - LOG.debug("Outfile clause {} is responsible for tables: {}", i, - tableRefListPerQuery.get(i).get(0).getSampleTabletIds()); - } - } - - for (List tableRefList : tableRefListPerQuery) { - FromClause fromClause = new FromClause(tableRefList); - // generate outfile clause - OutFileClause outfile = new OutFileClause(this.exportPath, this.format, convertOutfileProperties()); - SelectStmt selectStmt = new SelectStmt(list, fromClause, this.whereExpr, null, - null, null, LimitElement.NO_LIMIT); - selectStmt.setOutFileClause(outfile); - selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0)); - selectStmtList.add(selectStmt); - } - stmtExecutorList = Arrays.asList(new StmtExecutor[selectStmtList.size()]); - if (LOG.isDebugEnabled()) { - for (int i = 0; i < selectStmtList.size(); i++) { - LOG.debug("Outfile clause {} is: {}", i, selectStmtList.get(i).toSql()); - } - } - } - /** * Generate outfile select stmt * @throws UserException @@ -639,49 +567,6 @@ public class ExportJob implements Writable { this.state = newState; } - // TODO(ftw): delete - public synchronized Thread getDoExportingThread() { - return doExportingThread; - } - - // TODO(ftw): delete - public synchronized void setDoExportingThread(Thread isExportingThread) { - this.doExportingThread = isExportingThread; - } - - // TODO(ftw): delete - public synchronized void setStmtExecutor(int idx, StmtExecutor executor) { - this.stmtExecutorList.set(idx, executor); - } - - public synchronized StmtExecutor getStmtExecutor(int idx) { - return this.stmtExecutorList.get(idx); - } - - // TODO(ftw): delete - public synchronized void cancel(ExportFailMsg.CancelType type, String msg) { - if (msg != null) { - failMsg = new ExportFailMsg(type, msg); - } - - // maybe user cancel this job - if (task != null && state == ExportJobState.EXPORTING && stmtExecutorList != null) { - for (int idx = 0; idx < stmtExecutorList.size(); ++idx) { - stmtExecutorList.get(idx).cancel(); - } - } - - if (updateState(ExportJobState.CANCELLED, false)) { - // release snapshot - // Status releaseSnapshotStatus = releaseSnapshotPaths(); - // if (!releaseSnapshotStatus.ok()) { - // // snapshot will be removed by GC thread on BE, finally. - // LOG.warn("failed to release snapshot for export job: {}. err: {}", id, - // releaseSnapshotStatus.getErrorMsg()); - // } - } - } - public synchronized void updateExportJobState(ExportJobState newState, Long taskId, List outfileInfoList, ExportFailMsg.CancelType type, String msg) throws JobException { switch (newState) { @@ -734,15 +619,6 @@ public class ExportJob implements Writable { Env.getCurrentEnv().getEditLog().logExportUpdateState(id, ExportJobState.CANCELLED); } - // TODO(ftw): delete - public synchronized boolean finish(List outfileInfoList) { - outfileInfo = GsonUtils.GSON.toJson(outfileInfoList); - if (updateState(ExportJobState.FINISHED)) { - return true; - } - return false; - } - private void exportExportJob() { // The first exportTaskExecutor will set state to EXPORTING, // other exportTaskExecutors do not need to set up state. @@ -824,54 +700,6 @@ public class ExportJob implements Writable { } } - // TODO(ftw): delete - public synchronized boolean updateState(ExportJobState newState) { - return this.updateState(newState, false); - } - - // TODO(ftw): delete - public synchronized boolean updateState(ExportJobState newState, boolean isReplay) { - // We do not persist EXPORTING state in new version of metadata, - // but EXPORTING state may still exist in older versions of metadata. - // So if isReplay == true and newState == EXPORTING, we just ignore this update. - if (isFinalState() || (isReplay && newState == ExportJobState.EXPORTING)) { - return false; - } - state = newState; - switch (newState) { - case PENDING: - case IN_QUEUE: - progress = 0; - break; - case EXPORTING: - // if isReplay == true, startTimeMs will be read from LOG - if (!isReplay) { - startTimeMs = System.currentTimeMillis(); - } - break; - case FINISHED: - if (!isReplay) { - finishTimeMs = System.currentTimeMillis(); - } - progress = 100; - break; - case CANCELLED: - // if isReplay == true, finishTimeMs will be read from LOG - if (!isReplay) { - finishTimeMs = System.currentTimeMillis(); - } - break; - default: - Preconditions.checkState(false, "wrong job state: " + newState.name()); - break; - } - // we only persist Pending/Cancel/Finish state - if (!isReplay && newState != ExportJobState.IN_QUEUE && newState != ExportJobState.EXPORTING) { - Env.getCurrentEnv().getEditLog().logExportUpdateState(id, newState); - } - return true; - } - public synchronized boolean isFinalState() { return this.state == ExportJobState.CANCELLED || this.state == ExportJobState.FINISHED; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java index b74540b5d3..46011a4d91 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportMgr.java @@ -19,7 +19,6 @@ package org.apache.doris.load; import org.apache.doris.analysis.CancelExportStmt; import org.apache.doris.analysis.CompoundPredicate; -import org.apache.doris.analysis.ExportStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -32,13 +31,11 @@ import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.PatternMatcherWrapper; import org.apache.doris.common.util.ListComparator; -import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.OrderByPair; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.scheduler.exception.JobException; -import org.apache.doris.task.MasterTaskExecutor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; @@ -60,7 +57,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; import java.util.stream.Collectors; -public class ExportMgr extends MasterDaemon { +public class ExportMgr { private static final Logger LOG = LogManager.getLogger(ExportJob.class); // lock for export job @@ -70,11 +67,7 @@ public class ExportMgr extends MasterDaemon { private Map exportIdToJob = Maps.newHashMap(); // exportJobId to exportJob private Map labelToExportJobId = Maps.newHashMap(); - private MasterTaskExecutor exportingExecutor; - public ExportMgr() { - int poolSize = Config.export_running_job_num_limit == 0 ? 5 : Config.export_running_job_num_limit; - exportingExecutor = new MasterTaskExecutor("export-exporting-job", poolSize, true); } public void readLock() { @@ -93,78 +86,10 @@ public class ExportMgr extends MasterDaemon { lock.writeLock().unlock(); } - @Override - public synchronized void start() { - super.start(); - exportingExecutor.start(); - } - - @Override - protected void runAfterCatalogReady() { - // List pendingJobs = getExportJobs(ExportJobState.PENDING); - // List newInQueueJobs = Lists.newArrayList(); - // for (ExportJob job : pendingJobs) { - // if (handlePendingJobs(job)) { - // newInQueueJobs.add(job); - // } - // } - // LOG.debug("new IN_QUEUE export job num: {}", newInQueueJobs.size()); - // for (ExportJob job : newInQueueJobs) { - // try { - // MasterTask task = new ExportExportingTask(job); - // job.setTask((ExportExportingTask) task); - // if (exportingExecutor.submit(task)) { - // LOG.info("success to submit IN_QUEUE export job. job: {}", job); - // } else { - // LOG.info("fail to submit IN_QUEUE job to executor. job: {}", job); - // } - // } catch (Exception e) { - // LOG.warn("run export exporting job {}.", job, e); - // } - // } - } - - private boolean handlePendingJobs(ExportJob job) { - // because maybe this job has been cancelled by user. - if (job.getState() != ExportJobState.PENDING) { - return false; - } - - if (job.isReplayed()) { - // If the job is created from replay thread, all plan info will be lost. - // so the job has to be cancelled. - String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled."; - job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg); - return false; - } - - if (job.updateState(ExportJobState.IN_QUEUE)) { - LOG.info("Exchange pending status to in_queue status success. job: {}", job); - return true; - } - return false; - } - public List getJobs() { return Lists.newArrayList(exportIdToJob.values()); } - public void addExportJob(ExportStmt stmt) throws Exception { - long jobId = Env.getCurrentEnv().getNextId(); - ExportJob job = createJob(jobId, stmt); - writeLock(); - try { - if (labelToExportJobId.containsKey(job.getLabel())) { - throw new LabelAlreadyUsedException(job.getLabel()); - } - unprotectAddJob(job); - Env.getCurrentEnv().getEditLog().logExportCreate(job); - } finally { - writeUnlock(); - } - LOG.info("add export job. {}", job); - } - public void addExportJobAndRegisterTask(ExportJob job) throws Exception { long jobId = Env.getCurrentEnv().getNextId(); job.setId(jobId); @@ -249,12 +174,6 @@ public class ExportMgr extends MasterDaemon { }; } - private ExportJob createJob(long jobId, ExportStmt stmt) { - ExportJob exportJob = stmt.getExportJob(); - exportJob.setId(jobId); - return exportJob; - } - public ExportJob getJob(long jobId) { ExportJob job; readLock(); @@ -266,22 +185,6 @@ public class ExportMgr extends MasterDaemon { return job; } - public List getExportJobs(ExportJobState state) { - List result = Lists.newArrayList(); - readLock(); - try { - for (ExportJob job : exportIdToJob.values()) { - if (job.getState() == state) { - result.add(job); - } - } - } finally { - readUnlock(); - } - - return result; - } - // used for `show export` statement // NOTE: jobid and states may both specified, or only one of them, or neither public List> getExportJobInfosByIdOrState( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java index 2be6e84938..42ae03aaec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ExportCommand.java @@ -79,10 +79,7 @@ public class ExportCommand extends Command implements ForwardWithSync { private static final ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() .add(LABEL) .add(PARALLELISM) - .add(LoadStmt.EXEC_MEM_LIMIT) - .add(LoadStmt.TIMEOUT_PROPERTY) .add(LoadStmt.KEY_IN_PARAM_COLUMNS) - .add(LoadStmt.TIMEOUT_PROPERTY) .add(OutFileClause.PROP_MAX_FILE_SIZE) .add(OutFileClause.PROP_DELETE_EXISTING_FILES) .add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index dd5c0d7cd4..2297d16bc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2328,7 +2328,6 @@ public class StmtExecutor { private void handleExportStmt() throws Exception { ExportStmt exportStmt = (ExportStmt) parsedStmt; - // context.getEnv().getExportMgr().addExportJob(exportStmt); context.getEnv().getExportMgr().addExportJobAndRegisterTask(exportStmt.getExportJob()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java deleted file mode 100644 index 1b17806c71..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/task/ExportExportingTask.java +++ /dev/null @@ -1,281 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.task; - -import org.apache.doris.analysis.OutFileClause; -import org.apache.doris.analysis.SelectStmt; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.TabletMeta; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Config; -import org.apache.doris.common.ThreadPoolManager; -import org.apache.doris.load.ExportFailMsg; -import org.apache.doris.load.ExportFailMsg.CancelType; -import org.apache.doris.load.ExportJob; -import org.apache.doris.load.ExportJobState; -import org.apache.doris.load.OutfileInfo; -import org.apache.doris.qe.AutoCloseConnectContext; -import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.QueryState.MysqlStateType; -import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.system.SystemInfoService; -import org.apache.doris.thrift.TUniqueId; - -import com.google.common.collect.Lists; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; - -public class ExportExportingTask extends MasterTask { - private static final Logger LOG = LogManager.getLogger(ExportExportingTask.class); - - protected final ExportJob job; - - ThreadPoolExecutor exportExecPool = ThreadPoolManager.newDaemonCacheThreadPool( - Config.maximum_parallelism_of_export_job, "exporting-pool-", false); - - public ExportExportingTask(ExportJob job) { - this.job = job; - this.signature = job.getId(); - } - - private class ExportResult { - private boolean isFailed; - - private ExportFailMsg failMsg; - - private OutfileInfo outfileInfo; - - public ExportResult(boolean isFailed, ExportFailMsg failMsg, OutfileInfo outfileInfo) { - this.isFailed = isFailed; - this.failMsg = failMsg; - this.outfileInfo = outfileInfo; - } - - - public boolean isFailed() { - return isFailed; - } - - public ExportFailMsg getFailMsg() { - return failMsg; - } - - public OutfileInfo getOutfileInfo() { - return outfileInfo; - } - } - - @Override - protected void exec() { - if (job.getState() == ExportJobState.IN_QUEUE) { - handleInQueueState(); - } - - if (job.getState() != ExportJobState.EXPORTING) { - return; - } - LOG.info("begin execute export job in exporting state. job: {}", job); - - synchronized (job) { - if (job.getDoExportingThread() != null) { - LOG.warn("export task is already being executed."); - return; - } - job.setDoExportingThread(Thread.currentThread()); - } - - List selectStmtList = job.getSelectStmtList(); - int completeTaskNum = 0; - List outfileInfoList = Lists.newArrayList(); - - int parallelNum = selectStmtList.size(); - CompletionService completionService = new ExecutorCompletionService<>(exportExecPool); - - // begin exporting - for (int i = 0; i < parallelNum; ++i) { - final int idx = i; - completionService.submit(() -> { - // maybe user cancelled this job - if (job.getState() != ExportJobState.EXPORTING) { - return new ExportResult(true, null, null); - } - try { - Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException( - job.getTableName().getDb()); - OlapTable table = db.getOlapTableOrAnalysisException(job.getTableName().getTbl()); - table.readLock(); - try { - SelectStmt selectStmt = selectStmtList.get(idx); - List tabletIds = selectStmt.getTableRefs().get(0).getSampleTabletIds(); - for (Long tabletId : tabletIds) { - TabletMeta tabletMeta = Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta( - tabletId); - Partition partition = table.getPartition(tabletMeta.getPartitionId()); - long nowVersion = partition.getVisibleVersion(); - long oldVersion = job.getPartitionToVersion().get(partition.getName()); - if (nowVersion != oldVersion) { - LOG.warn("Tablet {} has changed version, old version = {}, now version = {}", - tabletId, oldVersion, nowVersion); - return new ExportResult(true, new ExportFailMsg( - ExportFailMsg.CancelType.RUN_FAIL, - "Tablet {" + tabletId + "} has changed"), null); - } - } - } finally { - table.readUnlock(); - } - } catch (AnalysisException e) { - return new ExportResult(true, - new ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL, e.getMessage()), null); - } - try (AutoCloseConnectContext r = buildConnectContext()) { - StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, selectStmtList.get(idx)); - job.setStmtExecutor(idx, stmtExecutor); - stmtExecutor.execute(); - if (r.connectContext.getState().getStateType() == MysqlStateType.ERR) { - return new ExportResult(true, new ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL, - r.connectContext.getState().getErrorMessage()), null); - } - OutfileInfo outfileInfo = getOutFileInfo(r.connectContext.getResultAttachedInfo()); - return new ExportResult(false, null, outfileInfo); - } catch (Exception e) { - return new ExportResult(true, new ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL, - e.getMessage()), - null); - } finally { - job.getStmtExecutor(idx).addProfileToSpan(); - } - }); - } - - Boolean isFailed = false; - ExportFailMsg failMsg = new ExportFailMsg(); - try { - for (int i = 0; i < parallelNum; ++i) { - Future future = completionService.take(); - ExportResult result = future.get(); - if (!result.isFailed) { - outfileInfoList.add(result.getOutfileInfo()); - ++completeTaskNum; - int progress = completeTaskNum * 100 / selectStmtList.size(); - if (progress >= 100) { - progress = 99; - } - job.setProgress(progress); - LOG.info("Export Job {} finished {} outfile export and it's progress is {}%", job.getId(), - completeTaskNum, progress); - } else { - isFailed = true; - failMsg.setCancelType(result.failMsg.getCancelType()); - failMsg.setMsg(result.failMsg.getMsg()); - LOG.warn("Exporting task failed because: {}", result.failMsg.getMsg()); - break; - } - } - } catch (Exception e) { - isFailed = true; - failMsg.setCancelType(CancelType.RUN_FAIL); - failMsg.setMsg(e.getMessage()); - } finally { - // cancel all executor - if (isFailed) { - for (int idx = 0; idx < parallelNum; ++idx) { - job.getStmtExecutor(idx).cancel(); - } - } - exportExecPool.shutdownNow(); - } - - if (isFailed) { - job.cancel(failMsg.getCancelType(), failMsg.getMsg()); - return; - } - - if (job.finish(outfileInfoList)) { - LOG.info("export job success. job: {}", job); - // TODO(ftw): when we implement exporting tablet one by one, we should release snapshot here - // release snapshot - // Status releaseSnapshotStatus = job.releaseSnapshotPaths(); - // if (!releaseSnapshotStatus.ok()) { - // // even if release snapshot failed, do not cancel this job. - // // snapshot will be removed by GC thread on BE, finally. - // LOG.warn("failed to release snapshot for export job: {}. err: {}", job.getId(), - // releaseSnapshotStatus.getErrorMsg()); - // } - } - - synchronized (this) { - job.setDoExportingThread(null); - } - } - - private AutoCloseConnectContext buildConnectContext() { - ConnectContext connectContext = new ConnectContext(); - connectContext.setSessionVariable(job.getSessionVariables()); - connectContext.setEnv(Env.getCurrentEnv()); - connectContext.setDatabase(job.getTableName().getDb()); - connectContext.setQualifiedUser(job.getQualifiedUser()); - connectContext.setCurrentUserIdentity(job.getUserIdentity()); - UUID uuid = UUID.randomUUID(); - TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - connectContext.setQueryId(queryId); - connectContext.setStartTime(); - connectContext.setCluster(SystemInfoService.DEFAULT_CLUSTER); - return new AutoCloseConnectContext(connectContext); - } - - private OutfileInfo getOutFileInfo(Map resultAttachedInfo) { - OutfileInfo outfileInfo = new OutfileInfo(); - outfileInfo.setFileNumber(resultAttachedInfo.get(OutFileClause.FILE_NUMBER)); - outfileInfo.setTotalRows(resultAttachedInfo.get(OutFileClause.TOTAL_ROWS)); - outfileInfo.setFileSize(resultAttachedInfo.get(OutFileClause.FILE_SIZE) + "bytes"); - outfileInfo.setUrl(resultAttachedInfo.get(OutFileClause.URL)); - return outfileInfo; - } - - private void handleInQueueState() { - long dbId = job.getDbId(); - Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); - if (db == null) { - job.cancel(ExportFailMsg.CancelType.RUN_FAIL, "database does not exist"); - return; - } - - // TODO(ftw): when we implement exporting tablet one by one, we should makeSnapshots here - // Status snapshotStatus = job.makeSnapshots(); - // if (!snapshotStatus.ok()) { - // job.cancel(ExportFailMsg.CancelType.RUN_FAIL, snapshotStatus.getErrorMsg()); - // return; - // } - - if (job.updateState(ExportJobState.EXPORTING)) { - LOG.info("Exchange pending status to exporting status success. job: {}", job); - } - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java index 1e49f207d0..71cb62120c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelExportStmtTest.java @@ -32,6 +32,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.wildfly.common.Assert; +import java.lang.reflect.Method; import java.util.List; import java.util.function.Predicate; @@ -155,19 +156,27 @@ public class CancelExportStmtTest extends TestWithFeService { List exportJobList2 = Lists.newLinkedList(); ExportJob job1 = new ExportJob(); ExportJob job2 = new ExportJob(); - job2.updateState(ExportJobState.CANCELLED, true); ExportJob job3 = new ExportJob(); - job3.updateState(ExportJobState.EXPORTING, false); ExportJob job4 = new ExportJob(); - ExportJob job5 = new ExportJob(); - job5.updateState(ExportJobState.IN_QUEUE, false); + + + try { + Method setExportJobState = job1.getClass().getDeclaredMethod("setExportJobState", + ExportJobState.class); + setExportJobState.setAccessible(true); + setExportJobState.invoke(job2, ExportJobState.CANCELLED); + setExportJobState.invoke(job3, ExportJobState.EXPORTING); + + } catch (Exception e) { + throw new UserException(e); + } + exportJobList1.add(job1); exportJobList1.add(job2); exportJobList1.add(job3); exportJobList1.add(job4); exportJobList2.add(job1); exportJobList2.add(job2); - exportJobList2.add(job5); SlotRef stateSlotRef = new SlotRef(null, "state"); StringLiteral stateStringLiteral = new StringLiteral("PENDING"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java index e7209d58a4..273c89d80e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/SessionVariablesTest.java @@ -17,7 +17,6 @@ package org.apache.doris.qe; -import org.apache.doris.analysis.ExportStmt; import org.apache.doris.analysis.SetStmt; import org.apache.doris.analysis.ShowVariablesStmt; import org.apache.doris.common.CaseSensibility; @@ -28,14 +27,9 @@ import org.apache.doris.common.PatternMatcher; import org.apache.doris.common.PatternMatcherWrapper; import org.apache.doris.common.VariableAnnotation; import org.apache.doris.common.util.ProfileManager; -import org.apache.doris.common.util.RuntimeProfile; -import org.apache.doris.load.ExportJob; -import org.apache.doris.load.ExportJobState; -import org.apache.doris.task.ExportExportingTask; import org.apache.doris.thrift.TQueryOptions; import org.apache.doris.utframe.TestWithFeService; -import mockit.Expectations; import org.junit.Assert; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -162,72 +156,4 @@ public class SessionVariablesTest extends TestWithFeService { Assertions.assertEquals(123, sessionVariable.getQueryTimeoutS()); Assertions.assertEquals(123, sessionVariable.getInsertTimeoutS()); } - - @Test - public void testEnableProfile() { - try { - SetStmt setStmt = (SetStmt) parseAndAnalyzeStmt("set enable_profile=true", connectContext); - SetExecutor setExecutor = new SetExecutor(connectContext, setStmt); - setExecutor.execute(); - - ExportStmt exportStmt = (ExportStmt) - parseAndAnalyzeStmt("EXPORT TABLE test_d.test_t1 TO \"file:///tmp/test_t1\"", connectContext); - ExportJob job = exportStmt.getExportJob(); - job.setId(1234); - - new Expectations(job) { - { - job.getState(); - minTimes = 0; - result = ExportJobState.EXPORTING; - } - }; - - ExportExportingTask task = new ExportExportingTask(job); - task.run(); - Assertions.assertTrue(job.isFinalState()); - } catch (Exception e) { - e.printStackTrace(); - Assertions.fail(e.getMessage()); - } - } - - @Test - public void testDisableProfile() { - try { - connectContext.setThreadLocalInfo(); - SetStmt setStmt = (SetStmt) parseAndAnalyzeStmt("set enable_profile=false", connectContext); - SetExecutor setExecutor = new SetExecutor(connectContext, setStmt); - setExecutor.execute(); - - ExportStmt exportStmt = (ExportStmt) - parseAndAnalyzeStmt("EXPORT TABLE test_d.test_t1 TO \"file:///tmp/test_t1\"", connectContext); - ExportJob job = exportStmt.getExportJob(); - job.setId(1234); - - new Expectations(job) { - { - job.getState(); - minTimes = 0; - result = ExportJobState.EXPORTING; - } - }; - - new Expectations(profileManager) { - { - profileManager.pushProfile((RuntimeProfile) any); - // if enable_profile=false, method pushProfile will not be called - times = 0; - } - }; - - ExportExportingTask task = new ExportExportingTask(job); - task.run(); - Assertions.assertTrue(job.isFinalState()); - } catch (Exception e) { - e.printStackTrace(); - Assertions.fail(e.getMessage()); - } - - } }