[Feature](export) Support cancel export statement (#15128)

Co-authored-by: wangxiangyu@360shuke.com <wangxiangyu@360shuke.com>
This commit is contained in:
wxy
2023-01-04 14:08:25 +08:00
committed by GitHub
parent 73d4070708
commit e0c56bcd20
8 changed files with 457 additions and 20 deletions

View File

@ -3993,6 +3993,10 @@ cancel_param ::=
{:
RESULT = new CancelLoadStmt(db, parser.where);
:}
| KW_EXPORT opt_db:db opt_wild_where
{:
RESULT = new CancelExportStmt(db, parser.where);
:}
| KW_ALTER KW_TABLE opt_alter_type:type KW_FROM table_name:table cancel_rollup_job_id_list:list
{:
RESULT = new CancelAlterTableStmt(type, table, list);

View File

@ -0,0 +1,176 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.analysis.BinaryPredicate.Operator;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.load.ExportJob;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import lombok.Getter;
import java.util.Set;
/**
* CANCEL EXPORT statement used to cancel export job.
* syntax:
* CANCEL EXPORT [FROM db] WHERE [LABEL = "export_label" | LABEL like "label_pattern" | STATE = "PENDING/EXPORTING"]
**/
public class CancelExportStmt extends DdlStmt {
private static final Set<String> SUPPORT_COLUMNS = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
@Getter
private String dbName;
@Getter
private CompoundPredicate.Operator operator;
@Getter
private String label;
@Getter
private String state;
private Expr whereClause;
public CancelExportStmt(String dbName, Expr whereClause) {
this.dbName = dbName;
this.whereClause = whereClause;
this.SUPPORT_COLUMNS.add("label");
this.SUPPORT_COLUMNS.add("state");
}
private void checkColumn(Expr expr, boolean like) throws AnalysisException {
String inputCol = ((SlotRef) expr.getChild(0)).getColumnName();
if (!SUPPORT_COLUMNS.contains(inputCol)) {
throw new AnalysisException("Current only support label and state, invalid column: " + inputCol);
}
if (!(expr.getChild(1) instanceof StringLiteral)) {
throw new AnalysisException("Value must be a string");
}
String inputValue = expr.getChild(1).getStringValue();
if (Strings.isNullOrEmpty(inputValue)) {
throw new AnalysisException("Value can't be null");
}
if (inputCol.equalsIgnoreCase("label")) {
label = inputValue;
}
if (inputCol.equalsIgnoreCase("state")) {
if (like) {
throw new AnalysisException("Only label can use like");
}
state = inputValue;
try {
ExportJob.JobState jobState = ExportJob.JobState.valueOf(state);
if (jobState != ExportJob.JobState.PENDING && jobState != ExportJob.JobState.EXPORTING) {
throw new AnalysisException("Only support PENDING/EXPORTING, invalid state: " + state);
}
} catch (IllegalArgumentException e) {
throw new AnalysisException("Only support PENDING/EXPORTING, invalid state: " + state);
}
}
}
private void likeCheck(Expr expr) throws AnalysisException {
if (expr instanceof LikePredicate) {
LikePredicate likePredicate = (LikePredicate) expr;
boolean like = LikePredicate.Operator.LIKE.equals(likePredicate.getOp());
if (!like) {
throw new AnalysisException("Not support REGEXP");
}
checkColumn(expr, true);
}
}
private void binaryCheck(Expr expr) throws AnalysisException {
if (expr instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) expr;
if (!Operator.EQ.equals(binaryPredicate.getOp())) {
throw new AnalysisException("Only support equal or like");
}
checkColumn(expr, false);
}
}
private void compoundCheck(Expr expr) throws AnalysisException {
if (expr == null) {
throw new AnalysisException("Where clause can't be null");
}
if (expr instanceof CompoundPredicate) {
// current only support label and state
CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
if (CompoundPredicate.Operator.NOT == compoundPredicate.getOp()) {
throw new AnalysisException("Current not support NOT operator");
}
for (int i = 0; i < 2; i++) {
Expr child = compoundPredicate.getChild(i);
if (child instanceof CompoundPredicate) {
throw new AnalysisException("Current not support nested clause");
}
likeCheck(child);
binaryCheck(child);
}
operator = compoundPredicate.getOp();
}
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
super.analyze(analyzer);
if (Strings.isNullOrEmpty(dbName)) {
dbName = analyzer.getDefaultDb();
if (Strings.isNullOrEmpty(dbName)) {
throw new AnalysisException("No database selected");
}
} else {
dbName = ClusterNamespace.getFullName(getClusterName(), dbName);
}
likeCheck(whereClause);
binaryCheck(whereClause);
compoundCheck(whereClause);
}
@Override
public String toSql() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("CANCEL EXPORT ");
if (!Strings.isNullOrEmpty(dbName)) {
stringBuilder.append("FROM ").append(dbName);
}
if (whereClause != null) {
stringBuilder.append(" WHERE ").append(whereClause.toSql());
}
return stringBuilder.toString();
}
@Override
public String toString() {
return toSql();
}
}

View File

@ -123,6 +123,9 @@ public class CancelLoadStmt extends DdlStmt {
if (expr instanceof CompoundPredicate) {
// current only support label and state
CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
if (CompoundPredicate.Operator.NOT == compoundPredicate.getOp()) {
throw new AnalysisException("Current not support NOT operator");
}
for (int i = 0; i < 2; i++) {
Expr child = compoundPredicate.getChild(i);
if (child instanceof CompoundPredicate) {

View File

@ -521,7 +521,7 @@ public class ExportJob implements Writable {
return whereExpr;
}
public JobState getState() {
public synchronized JobState getState() {
return state;
}
@ -651,11 +651,12 @@ public class ExportJob implements Writable {
}
public synchronized void cancel(ExportFailMsg.CancelType type, String msg) {
releaseSnapshotPaths();
if (msg != null) {
failMsg = new ExportFailMsg(type, msg);
}
updateState(ExportJob.JobState.CANCELLED, false);
if (updateState(ExportJob.JobState.CANCELLED, false)) {
releaseSnapshotPaths();
}
}
public synchronized boolean updateState(ExportJob.JobState newState) {
@ -663,6 +664,9 @@ public class ExportJob implements Writable {
}
public synchronized boolean updateState(ExportJob.JobState newState, boolean isReplay) {
if (isFinalState()) {
return false;
}
state = newState;
switch (newState) {
case PENDING:
@ -686,6 +690,10 @@ public class ExportJob implements Writable {
return true;
}
public synchronized boolean isFinalState() {
return this.state == ExportJob.JobState.CANCELLED || this.state == ExportJob.JobState.FINISHED;
}
public Status releaseSnapshotPaths() {
List<Pair<TNetworkAddress, String>> snapshotPaths = getSnapshotPaths();
LOG.debug("snapshotPaths:{}", snapshotPaths);

View File

@ -17,6 +17,8 @@
package org.apache.doris.load;
import org.apache.doris.analysis.CancelExportStmt;
import org.apache.doris.analysis.CompoundPredicate;
import org.apache.doris.analysis.ExportStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Database;
@ -24,6 +26,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.PatternMatcher;
@ -33,10 +36,12 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -48,6 +53,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
public class ExportMgr {
@ -99,11 +105,62 @@ public class ExportMgr {
LOG.info("add export job. {}", job);
}
public void cancelExportJob(CancelExportStmt stmt) throws DdlException, AnalysisException {
// List of export jobs waiting to be cancelled
List<ExportJob> matchExportJobs = getWaitingCancelJobs(stmt);
if (matchExportJobs.isEmpty()) {
throw new DdlException("Export job(s) do not exist");
}
matchExportJobs = matchExportJobs.stream()
.filter(job -> !job.isFinalState()).collect(Collectors.toList());
if (matchExportJobs.isEmpty()) {
throw new DdlException("All export job(s) are at final state (CANCELLED/FINISHED)");
}
for (ExportJob exportJob : matchExportJobs) {
exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
}
}
public void unprotectAddJob(ExportJob job) {
idToJob.put(job.getId(), job);
labelToJobId.putIfAbsent(job.getLabel(), job.getId());
}
private List<ExportJob> getWaitingCancelJobs(CancelExportStmt stmt) throws AnalysisException {
Predicate<ExportJob> jobFilter = buildCancelJobFilter(stmt);
readLock();
try {
return getJobs().stream().filter(jobFilter).collect(Collectors.toList());
} finally {
readUnlock();
}
}
@VisibleForTesting
public static Predicate<ExportJob> buildCancelJobFilter(CancelExportStmt stmt) throws AnalysisException {
String label = stmt.getLabel();
String state = stmt.getState();
PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility());
return job -> {
boolean labelFilter = true;
boolean stateFilter = true;
if (StringUtils.isNotEmpty(label)) {
labelFilter = label.contains("%") ? matcher.match(job.getLabel()) :
job.getLabel().equalsIgnoreCase(label);
}
if (StringUtils.isNotEmpty(state)) {
stateFilter = job.getState().name().equalsIgnoreCase(state);
}
if (stmt.getOperator() != null && CompoundPredicate.Operator.OR.equals(stmt.getOperator())) {
return labelFilter || stateFilter;
}
return labelFilter && stateFilter;
};
}
private ExportJob createJob(long jobId, ExportStmt stmt) throws Exception {
ExportJob job = new ExportJob(jobId);
job.setJob(stmt);
@ -294,12 +351,12 @@ public class ExportMgr {
}
public void replayUpdateJobState(long jobId, ExportJob.JobState newState) {
writeLock();
readLock();
try {
ExportJob job = idToJob.get(jobId);
job.updateState(newState, true);
} finally {
writeUnlock();
readUnlock();
}
}

View File

@ -47,6 +47,7 @@ import org.apache.doris.analysis.BackupStmt;
import org.apache.doris.analysis.CancelAlterSystemStmt;
import org.apache.doris.analysis.CancelAlterTableStmt;
import org.apache.doris.analysis.CancelBackupStmt;
import org.apache.doris.analysis.CancelExportStmt;
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.CleanLabelStmt;
import org.apache.doris.analysis.CreateCatalogStmt;
@ -185,6 +186,8 @@ public class DdlExecutor {
} else {
env.getLoadManager().createLoadJobFromStmt(loadStmt);
}
} else if (ddlStmt instanceof CancelExportStmt) {
env.getExportMgr().cancelExportJob((CancelExportStmt) ddlStmt);
} else if (ddlStmt instanceof CancelLoadStmt) {
env.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt);
} else if (ddlStmt instanceof CreateRoutineLoadStmt) {

View File

@ -161,18 +161,17 @@ public class ExportExportingTask extends MasterTask {
}
}
// release snapshot
Status releaseSnapshotStatus = job.releaseSnapshotPaths();
if (!releaseSnapshotStatus.ok()) {
// even if release snapshot failed, do nothing cancel this job.
// snapshot will be removed by GC thread on BE, finally.
LOG.warn("failed to release snapshot for export job: {}. err: {}", job.getId(),
releaseSnapshotStatus.getErrorMsg());
}
if (job.updateState(ExportJob.JobState.FINISHED)) {
LOG.warn("export job success. job: {}", job);
registerProfile();
// release snapshot
Status releaseSnapshotStatus = job.releaseSnapshotPaths();
if (!releaseSnapshotStatus.ok()) {
// even if release snapshot failed, do not cancel this job.
// snapshot will be removed by GC thread on BE, finally.
LOG.warn("failed to release snapshot for export job: {}. err: {}", job.getId(),
releaseSnapshotStatus.getErrorMsg());
}
}
synchronized (this) {
@ -336,12 +335,9 @@ public class ExportExportingTask extends MasterTask {
}
}
if (!failed) {
exportedFiles.clear();
job.addExportedFiles(newFiles);
ClientPool.brokerPool.returnObject(address, client);
}
exportedFiles.clear();
job.addExportedFiles(newFiles);
ClientPool.brokerPool.returnObject(address, client);
return Status.OK;
}
}

View File

@ -0,0 +1,190 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.analysis.CompoundPredicate.Operator;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.utframe.TestWithFeService;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.wildfly.common.Assert;
import java.util.List;
import java.util.function.Predicate;
public class CancelExportStmtTest extends TestWithFeService {
private Analyzer analyzer;
private String dbName = "testDb";
private String tblName = "table1";
@Override
protected void runBeforeAll() throws Exception {
FeConstants.runningUnitTest = true;
createDatabase(dbName);
useDatabase(dbName);
createTable("create table " + tblName + "\n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n"
+ "properties(\"replication_num\" = \"1\");");
analyzer = new Analyzer(connectContext.getEnv(), connectContext);
}
@Test
public void testNormal() throws UserException {
SlotRef labelSlotRef = new SlotRef(null, "label");
StringLiteral labelStringLiteral = new StringLiteral("doris_test_label");
SlotRef stateSlotRef = new SlotRef(null, "state");
BinaryPredicate labelBinaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef,
labelStringLiteral);
CancelExportStmt stmt = new CancelExportStmt(null, labelBinaryPredicate);
stmt.analyze(analyzer);
Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb WHERE `label` = 'doris_test_label'",
stmt.toString());
SlotRef labelSlotRefUpper = new SlotRef(null, "LABEL");
BinaryPredicate labelBinaryPredicateUpper = new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRefUpper,
labelStringLiteral);
CancelExportStmt stmtUpper = new CancelExportStmt(null, labelBinaryPredicateUpper);
stmtUpper.analyze(analyzer);
Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb WHERE `LABEL` = 'doris_test_label'",
stmtUpper.toString());
StringLiteral stateStringLiteral = new StringLiteral("PENDING");
BinaryPredicate stateBinaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef,
stateStringLiteral);
stmt = new CancelExportStmt(null, stateBinaryPredicate);
stmt.analyze(analyzer);
Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb WHERE `state` = 'PENDING'", stmt.toString());
LikePredicate labelLikePredicate = new LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef,
labelStringLiteral);
stmt = new CancelExportStmt(null, labelLikePredicate);
stmt.analyze(analyzer);
Assertions.assertEquals("CANCEL EXPORT FROM default_cluster:testDb WHERE `label` LIKE 'doris_test_label'",
stmt.toString());
CompoundPredicate compoundAndPredicate = new CompoundPredicate(Operator.AND, labelBinaryPredicate,
stateBinaryPredicate);
stmt = new CancelExportStmt(null, compoundAndPredicate);
stmt.analyze(analyzer);
Assertions.assertEquals(
"CANCEL EXPORT FROM default_cluster:testDb WHERE `label` = 'doris_test_label' AND `state` = 'PENDING'",
stmt.toString());
CompoundPredicate compoundOrPredicate = new CompoundPredicate(Operator.OR, labelBinaryPredicate,
stateBinaryPredicate);
stmt = new CancelExportStmt(null, compoundOrPredicate);
stmt.analyze(analyzer);
Assertions.assertEquals(
"CANCEL EXPORT FROM default_cluster:testDb WHERE `label` = 'doris_test_label' OR `state` = 'PENDING'",
stmt.toString());
}
@Test
public void testError1() {
SlotRef stateSlotRef = new SlotRef(null, "state");
StringLiteral stateStringLiteral = new StringLiteral("FINISHED");
LikePredicate stateLikePredicate =
new LikePredicate(LikePredicate.Operator.LIKE, stateSlotRef, stateStringLiteral);
CancelExportStmt stmt = new CancelExportStmt(null, stateLikePredicate);
ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Only label can use like",
() -> stmt.analyze(analyzer));
}
@Test
public void testError2() {
SlotRef stateSlotRef = new SlotRef(null, "state");
StringLiteral stateStringLiteral1 = new StringLiteral("EXPORTING");
BinaryPredicate stateEqPredicate1 =
new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral1);
StringLiteral stateStringLiteral2 = new StringLiteral("PENDING");
BinaryPredicate stateEqPredicate2 =
new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral2);
SlotRef labelSlotRef = new SlotRef(null, "label");
StringLiteral labelStringLiteral1 = new StringLiteral("test_label");
BinaryPredicate labelEqPredicate1 =
new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, labelStringLiteral1);
CompoundPredicate compoundAndPredicate1 = new CompoundPredicate(Operator.AND, stateEqPredicate1,
stateEqPredicate2);
CompoundPredicate compoundAndPredicate2 = new CompoundPredicate(Operator.AND, compoundAndPredicate1,
labelEqPredicate1);
CompoundPredicate compoundAndPredicate3 = new CompoundPredicate(Operator.NOT, stateEqPredicate1, null);
CancelExportStmt stmt1 = new CancelExportStmt(null, compoundAndPredicate2);
ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Current not support nested clause",
() -> stmt1.analyze(analyzer));
CancelExportStmt stmt2 = new CancelExportStmt(null, compoundAndPredicate3);
ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Current not support NOT operator",
() -> stmt2.analyze(analyzer));
}
@Test
public void testCancelJobFilter() throws UserException {
List<ExportJob> exportJobList1 = Lists.newLinkedList();
List<ExportJob> exportJobList2 = Lists.newLinkedList();
ExportJob job1 = new ExportJob();
ExportJob job2 = new ExportJob();
job2.updateState(ExportJob.JobState.CANCELLED, true);
ExportJob job3 = new ExportJob();
job3.updateState(ExportJob.JobState.EXPORTING, true);
ExportJob job4 = new ExportJob();
exportJobList1.add(job1);
exportJobList1.add(job2);
exportJobList1.add(job3);
exportJobList1.add(job4);
exportJobList2.add(job1);
exportJobList2.add(job2);
SlotRef stateSlotRef = new SlotRef(null, "state");
StringLiteral stateStringLiteral = new StringLiteral("PENDING");
BinaryPredicate stateEqPredicate =
new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral);
CancelExportStmt stmt = new CancelExportStmt(null, stateEqPredicate);
stmt.analyze(analyzer);
Predicate<ExportJob> filter = ExportMgr.buildCancelJobFilter(stmt);
Assert.assertTrue(exportJobList1.stream().filter(filter).count() == 2);
Assert.assertTrue(exportJobList2.stream().filter(filter).count() == 1);
stateStringLiteral = new StringLiteral("EXPORTING");
stateEqPredicate =
new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral);
stmt = new CancelExportStmt(null, stateEqPredicate);
stmt.analyze(analyzer);
filter = ExportMgr.buildCancelJobFilter(stmt);
Assert.assertTrue(exportJobList1.stream().filter(filter).count() == 1);
}
}