[Feature] cancel load support state (#9537)

This commit is contained in:
Stalary
2022-05-19 16:37:56 +08:00
committed by GitHub
parent 119ff2c02d
commit cbc7b167b1
6 changed files with 347 additions and 431 deletions

View File

@ -23,35 +23,105 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import lombok.Getter;
// CANCEL LOAD statement used to cancel load job.
//
// syntax:
// CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx")
import java.util.List;
/**
* CANCEL LOAD statement used to cancel load job.
* syntax:
* CANCEL LOAD [FROM db] WHERE load_label (= "xxx" | LIKE "xxx")
**/
public class CancelLoadStmt extends DdlStmt {
private static final List<String> SUPPORT_COLUMNS = Lists.newArrayList("label", "state");
@Getter
private String dbName;
@Getter
private CompoundPredicate.Operator operator;
@Getter
private String label;
@Getter
private String state;
private Expr whereClause;
private boolean isAccurateMatch;
public String getDbName() {
return dbName;
}
public String getLabel() {
return label;
}
public CancelLoadStmt(String dbName, Expr whereClause) {
this.dbName = dbName;
this.whereClause = whereClause;
this.isAccurateMatch = false;
}
public boolean isAccurateMatch() {
return isAccurateMatch;
private void checkColumn(Expr expr, boolean like) throws AnalysisException {
String inputCol = ((SlotRef) expr.getChild(0)).getColumnName();
if (!SUPPORT_COLUMNS.contains(inputCol)) {
throw new AnalysisException("Current not support " + inputCol);
}
if (!(expr.getChild(1) instanceof StringLiteral)) {
throw new AnalysisException("Value must is string");
}
String inputValue = expr.getChild(1).getStringValue();
if (Strings.isNullOrEmpty(inputValue)) {
throw new AnalysisException("Value can't is null");
}
if (like && !inputValue.contains("%")) {
inputValue = "%" + inputValue + "%";
}
if (inputCol.equalsIgnoreCase("label")) {
label = inputValue;
}
if (inputCol.equalsIgnoreCase("state")) {
if (like) {
throw new AnalysisException("Only label can use like");
}
state = inputValue;
}
}
private void likeCheck(Expr expr) throws AnalysisException {
if (expr instanceof LikePredicate) {
LikePredicate likePredicate = (LikePredicate) expr;
boolean like = LikePredicate.Operator.LIKE.equals(likePredicate.getOp());
if (!like) {
throw new AnalysisException("Not support REGEXP");
}
checkColumn(expr, true);
}
}
private void binaryCheck(Expr expr) throws AnalysisException {
if (expr instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) expr;
if (!Operator.EQ.equals(binaryPredicate.getOp())) {
throw new AnalysisException("Only support equal or like");
}
checkColumn(expr, false);
}
}
private void compoundCheck(Expr expr) throws AnalysisException {
if (expr == null) {
throw new AnalysisException("Where clause can't is null");
}
if (expr instanceof CompoundPredicate) {
// current only support label and state
CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
for (int i = 0; i < 2; i++) {
Expr child = compoundPredicate.getChild(i);
if (child instanceof CompoundPredicate) {
throw new AnalysisException("Current only support label and state");
}
likeCheck(child);
binaryCheck(child);
}
operator = compoundPredicate.getOp();
}
}
@Override
@ -67,63 +137,10 @@ public class CancelLoadStmt extends DdlStmt {
}
// check auth after we get real load job
// analyze expr if not null
boolean valid = true;
do {
if (whereClause == null) {
valid = false;
break;
}
if (whereClause instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) whereClause;
isAccurateMatch = true;
if (binaryPredicate.getOp() != Operator.EQ) {
valid = false;
break;
}
} else if (whereClause instanceof LikePredicate) {
LikePredicate likePredicate = (LikePredicate) whereClause;
if (likePredicate.getOp() != LikePredicate.Operator.LIKE) {
valid = false;
break;
}
} else {
valid = false;
break;
}
// left child
if (!(whereClause.getChild(0) instanceof SlotRef)) {
valid = false;
break;
}
if (!((SlotRef) whereClause.getChild(0)).getColumnName().equalsIgnoreCase("label")) {
valid = false;
break;
}
// right child
if (!(whereClause.getChild(1) instanceof StringLiteral)) {
valid = false;
break;
}
label = ((StringLiteral) whereClause.getChild(1)).getStringValue();
if (Strings.isNullOrEmpty(label)) {
valid = false;
break;
}
if (!isAccurateMatch && !label.contains("%")) {
label = "%" + label + "%";
}
} while (false);
if (!valid) {
throw new AnalysisException("Where clause should looks like: LABEL = \"your_load_label\"," +
" or LABEL LIKE \"matcher\"");
}
// analyze expr
likeCheck(whereClause);
binaryCheck(whereClause);
compoundCheck(whereClause);
}
@Override
@ -131,11 +148,11 @@ public class CancelLoadStmt extends DdlStmt {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("CANCEL LOAD ");
if (!Strings.isNullOrEmpty(dbName)) {
stringBuilder.append("FROM " + dbName);
stringBuilder.append("FROM ").append(dbName);
}
if (whereClause != null) {
stringBuilder.append(" WHERE " + whereClause.toSql());
stringBuilder.append(" WHERE ").append(whereClause.toSql());
}
return stringBuilder.toString();
}

View File

@ -17,6 +17,9 @@
package org.apache.doris.common;
/**
* CaseSensibility Enum.
**/
public enum CaseSensibility {
CLUSTER(true),
DATABASE(true),

View File

@ -1728,184 +1728,6 @@ public class Load {
return false;
}
public boolean isLabelExist(String dbName, String labelValue, boolean isAccurateMatch) throws DdlException, AnalysisException {
// get load job and check state
Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName);
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
return false;
}
List<LoadJob> loadJobs = Lists.newArrayList();
if (isAccurateMatch) {
if (labelToLoadJobs.containsKey(labelValue)) {
loadJobs.addAll(labelToLoadJobs.get(labelValue));
}
} else {
PatternMatcher matcher = PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility());
for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
if (matcher.match(entry.getKey())) {
loadJobs.addAll(entry.getValue());
}
}
}
if (loadJobs.isEmpty()) {
return false;
}
if (loadJobs.stream().filter(entity -> entity.getState() != JobState.CANCELLED).count() == 0) {
return false;
}
return true;
} finally {
readUnlock();
}
}
public boolean cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws DdlException, AnalysisException {
// get params
String dbName = stmt.getDbName();
String label = stmt.getLabel();
// get load job and check state
Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName);
// List of load jobs waiting to be cancelled
List<LoadJob> loadJobs = Lists.newArrayList();
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
throw new DdlException("Load job does not exist");
}
// get jobs by label
List<LoadJob> matchLoadJobs = Lists.newArrayList();
if (isAccurateMatch) {
if (labelToLoadJobs.containsKey(label)) {
matchLoadJobs.addAll(labelToLoadJobs.get(label));
}
} else {
PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility());
for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
if (matcher.match(entry.getKey())) {
loadJobs.addAll(entry.getValue());
}
}
}
if (matchLoadJobs.isEmpty()) {
throw new DdlException("Load job does not exist");
}
// check state here
List<LoadJob> uncompletedLoadJob = matchLoadJobs.stream().filter(job -> {
JobState state = job.getState();
return state != JobState.CANCELLED && state != JobState.QUORUM_FINISHED && state != JobState.FINISHED;
}).collect(Collectors.toList());
if (uncompletedLoadJob.isEmpty()) {
throw new DdlException("There is no uncompleted job which label " +
(isAccurateMatch ? "is " : "like ") + stmt.getLabel());
}
loadJobs.addAll(uncompletedLoadJob);
} finally {
readUnlock();
}
// check auth here, cause we need table info
Set<String> tableNames = Sets.newHashSet();
for (LoadJob loadJob : loadJobs) {
tableNames.addAll(loadJob.getTableNames());
}
if (tableNames.isEmpty()) {
if (Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName,
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD");
}
} else {
for (String tblName : tableNames) {
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName,
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), dbName + ": " + tblName);
}
}
}
// cancel job
for (LoadJob loadJob : loadJobs) {
List<String> failedMsg = Lists.newArrayList();
boolean ok = cancelLoadJob(loadJob, CancelType.USER_CANCEL, "user cancel", failedMsg);
if (!ok) {
throw new DdlException("Cancel load job [" + loadJob.getId() + "] fail, " +
"label=[" + loadJob.getLabel() + "] failed msg=" +
(failedMsg.isEmpty() ? "Unknown reason" : failedMsg.get(0)));
}
}
return true;
}
public boolean cancelLoadJob(CancelLoadStmt stmt) throws DdlException {
// get params
String dbName = stmt.getDbName();
String label = stmt.getLabel();
// get load job and check state
Database db = Catalog.getCurrentCatalog().getDbOrDdlException(dbName);
LoadJob job;
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
throw new DdlException("Load job does not exist");
}
List<LoadJob> loadJobs = labelToLoadJobs.get(label);
if (loadJobs == null) {
throw new DdlException("Load job does not exist");
}
// only the last one should be running
job = loadJobs.get(loadJobs.size() - 1);
JobState state = job.getState();
if (state == JobState.CANCELLED) {
throw new DdlException("Load job has been cancelled");
} else if (state == JobState.QUORUM_FINISHED || state == JobState.FINISHED) {
throw new DdlException("Load job has been finished");
}
} finally {
readUnlock();
}
// check auth here, cause we need table info
Set<String> tableNames = job.getTableNames();
if (tableNames.isEmpty()) {
// forward compatibility
if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName,
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_SPECIFIC_ACCESS_DENIED_ERROR, "CANCEL LOAD");
}
} else {
for (String tblName : tableNames) {
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, tblName,
PrivPredicate.LOAD)) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CANCEL LOAD",
ConnectContext.get().getQualifiedUser(),
ConnectContext.get().getRemoteIP(), dbName + ": " + tblName);
}
}
}
// cancel job
List<String> failedMsg = Lists.newArrayList();
if (!cancelLoadJob(job, CancelType.USER_CANCEL, "user cancel", failedMsg)) {
throw new DdlException("Cancel load job fail: " + (failedMsg.isEmpty() ? "Unknown reason" : failedMsg.get(0)));
}
return true;
}
public boolean cancelLoadJob(LoadJob job, CancelType cancelType, String msg) {
return cancelLoadJob(job, cancelType, msg, null);
}

View File

@ -18,6 +18,7 @@
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.CompoundPredicate.Operator;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
@ -46,10 +47,12 @@ import org.apache.doris.thrift.TMiniLoadRequest;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -71,11 +74,10 @@ import java.util.stream.Collectors;
/**
* The broker and mini load jobs(v2) are included in this class.
*
* The lock sequence:
* Database.lock
* LoadManager.lock
* LoadJob.lock
* LoadManager.lock
* LoadJob.lock
*/
public class LoadManager implements Writable {
private static final Logger LOG = LogManager.getLogger(LoadManager.class);
@ -92,8 +94,6 @@ public class LoadManager implements Writable {
/**
* This method will be invoked by the broker load(v2) now.
* @param stmt
* @throws DdlException
*/
public long createLoadJobFromStmt(LoadStmt stmt) throws DdlException {
Database database = checkDb(stmt.getLabel().getDbName());
@ -112,8 +112,10 @@ public class LoadManager implements Writable {
throw new DdlException("LoadManager only support the broker and spark load.");
}
if (unprotectedGetUnfinishedJobNum() >= Config.desired_max_waiting_jobs) {
throw new DdlException("There are more than " + Config.desired_max_waiting_jobs + " unfinished load jobs, "
+ "please retry later. You can use `SHOW LOAD` to view submitted jobs");
throw new DdlException(
"There are more than " + Config.desired_max_waiting_jobs
+ " unfinished load jobs, please retry later. "
+ "You can use `SHOW LOAD` to view submitted jobs");
}
}
@ -139,9 +141,6 @@ public class LoadManager implements Writable {
* This method will be invoked by streaming mini load.
* It will begin the txn of mini load immediately without any scheduler .
*
* @param request
* @return
* @throws UserException
*/
public long createLoadJobFromMiniLoad(TMiniLoadBeginRequest request) throws UserException {
String cluster = SystemInfoService.DEFAULT_CLUSTER;
@ -155,7 +154,8 @@ public class LoadManager implements Writable {
try {
loadJob = new MiniLoadJob(database.getId(), table.getId(), request);
// call unprotectedExecute before adding load job. so that if job is not started ok, no need to add.
// NOTICE(cmy): this order is only for Mini Load, because mini load's unprotectedExecute() only do beginTxn().
// NOTICE(cmy): this order is only for Mini Load, because mini load's
// unprotectedExecute() only do beginTxn().
// for other kind of load job, execute the job after adding job.
// Mini load job must be executed before release write lock.
// Otherwise, the duplicated request maybe get the transaction id before transaction of mini load is begun.
@ -164,7 +164,8 @@ public class LoadManager implements Writable {
createLoadJob(loadJob);
} catch (DuplicatedRequestException e) {
// this is a duplicate request, just return previous txn id
LOG.info("duplicate request for mini load. request id: {}, txn: {}", e.getDuplicatedRequestId(), e.getTxnId());
LOG.info("duplicate request for mini load. request id: {}, txn: {}", e.getDuplicatedRequestId(),
e.getTxnId());
return e.getTxnId();
} catch (UserException e) {
if (loadJob != null) {
@ -190,14 +191,12 @@ public class LoadManager implements Writable {
* Step1: lock the load manager
* Step2: check the label in load manager
* Step3: call the addLoadJob of load class
* Step3.1: lock the load
* Step3.2: check the label in load
* Step3.3: add the loadJob in load rather than load manager
* Step3.4: unlock the load
* Step3.1: lock the load
* Step3.2: check the label in load
* Step3.3: add the loadJob in load rather than load manager
* Step3.4: unlock the load
* Step4: unlock the load manager
* @param stmt
* @param timestamp
* @throws DdlException
*
*/
public void createLoadJobV1FromStmt(LoadStmt stmt, EtlJobType jobType, long timestamp) throws DdlException {
Database database = checkDb(stmt.getLabel().getDbName());
@ -215,10 +214,9 @@ public class LoadManager implements Writable {
* It is used to check the label of v1 and v2 at the same time.
* Finally, the non-streaming mini load will belongs to load class.
*
* @param request
* @return if: mini load is a duplicated load, return false.
* else: return true.
* @throws DdlException
* @param request request
* @return if: mini load is a duplicated load, return false. else: return true.
* @deprecated not support mini load
*/
@Deprecated
public boolean createLoadJobV1FromRequest(TMiniLoadRequest request) throws DdlException {
@ -236,6 +234,9 @@ public class LoadManager implements Writable {
}
}
/**
* MultiLoadMgr use.
**/
public void createLoadJobV1FromMultiStart(String fullDbName, String label) throws DdlException {
Database database = checkDb(fullDbName);
writeLock();
@ -250,9 +251,7 @@ public class LoadManager implements Writable {
public void replayCreateLoadJob(LoadJob loadJob) {
createLoadJob(loadJob);
LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
.add("msg", "replay create load job")
.build());
LOG.info(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId()).add("msg", "replay create load job").build());
}
// add load job and also add to to callback factory
@ -262,7 +261,8 @@ public class LoadManager implements Writable {
return;
}
addLoadJob(loadJob);
// add callback before txn if load job is uncompleted, because callback will be performed on replay without txn begin
// add callback before txn if load job is uncompleted,
// because callback will be performed on replay without txn begin
// register txn state listener
if (!loadJob.isCompleted()) {
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
@ -282,8 +282,11 @@ public class LoadManager implements Writable {
labelToLoadJobs.get(loadJob.getLabel()).add(loadJob);
}
/**
* Record finished load job by editLog.
**/
public void recordFinishedLoadJob(String label, long transactionId, String dbName, long tableId, EtlJobType jobType,
long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException {
long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException {
// get db id
Database db = Catalog.getCurrentCatalog().getDbOrMetaException(dbName);
@ -291,7 +294,8 @@ public class LoadManager implements Writable {
LoadJob loadJob;
switch (jobType) {
case INSERT:
loadJob = new InsertLoadJob(label, transactionId, db.getId(), tableId, createTimestamp, failMsg, trackingUrl);
loadJob = new InsertLoadJob(label, transactionId, db.getId(), tableId, createTimestamp, failMsg,
trackingUrl);
break;
default:
return;
@ -301,60 +305,77 @@ public class LoadManager implements Writable {
Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob);
}
public void cancelLoadJob(CancelLoadStmt stmt, boolean isAccurateMatch) throws DdlException, AnalysisException {
Database db = Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getDbName());
/**
* Match need cancel loadJob by stmt.
**/
@VisibleForTesting
public static void addNeedCancelLoadJob(CancelLoadStmt stmt, List<LoadJob> loadJobs, List<LoadJob> matchLoadJobs)
throws AnalysisException {
String label = stmt.getLabel();
String state = stmt.getState();
PatternMatcher matcher = PatternMatcher.createMysqlPattern(label, CaseSensibility.LABEL.getCaseSensibility());
matchLoadJobs.addAll(loadJobs.stream().filter(job -> {
if (stmt.getOperator() != null) {
// compound
boolean labelFilter =
label.contains("%") ? matcher.match(job.getLabel()) : job.getLabel().equalsIgnoreCase(label);
boolean stateFilter = job.getState().name().equalsIgnoreCase(state);
return Operator.AND.equals(stmt.getOperator()) ? labelFilter && stateFilter :
labelFilter || stateFilter;
}
if (StringUtils.isNotEmpty(label)) {
return label.contains("%") ? matcher.match(job.getLabel()) : job.getLabel().equalsIgnoreCase(label);
}
if (StringUtils.isNotEmpty(state)) {
return job.getState().name().equalsIgnoreCase(state);
}
return false;
}).collect(Collectors.toList()));
}
/**
* Cancel load job by stmt.
**/
public void cancelLoadJob(CancelLoadStmt stmt) throws DdlException, AnalysisException {
Database db = Catalog.getCurrentCatalog().getDbOrDdlException(stmt.getDbName());
// List of load jobs waiting to be cancelled
List<LoadJob> loadJobs = Lists.newArrayList();
List<LoadJob> matchLoadJobs = Lists.newArrayList();
readLock();
try {
Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(db.getId());
if (labelToLoadJobs == null) {
throw new DdlException("Load job does not exist");
}
// get jobs by label
List<LoadJob> matchLoadJobs = Lists.newArrayList();
if (isAccurateMatch) {
if (labelToLoadJobs.containsKey(stmt.getLabel())) {
matchLoadJobs.addAll(labelToLoadJobs.get(stmt.getLabel()));
}
} else {
PatternMatcher matcher = PatternMatcher.createMysqlPattern(stmt.getLabel(), CaseSensibility.LABEL.getCaseSensibility());
for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
if (matcher.match(entry.getKey())) {
matchLoadJobs.addAll(entry.getValue());
}
}
}
addNeedCancelLoadJob(stmt,
labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()),
matchLoadJobs);
if (matchLoadJobs.isEmpty()) {
throw new DdlException("Load job does not exist");
}
// check state here
List<LoadJob> uncompletedLoadJob = matchLoadJobs.stream().filter(entity -> !entity.isTxnDone())
.collect(Collectors.toList());
List<LoadJob> uncompletedLoadJob =
matchLoadJobs.stream().filter(entity -> !entity.isTxnDone()).collect(Collectors.toList());
if (uncompletedLoadJob.isEmpty()) {
throw new DdlException("There is no uncompleted job which label " +
(isAccurateMatch ? "is " : "like ") + stmt.getLabel());
throw new DdlException("There is no uncompleted job");
}
loadJobs.addAll(uncompletedLoadJob);
} finally {
readUnlock();
}
for (LoadJob loadJob : loadJobs) {
for (LoadJob loadJob : matchLoadJobs) {
try {
loadJob.cancelJob(new FailMsg(FailMsg.CancelType.USER_CANCEL, "user cancel"));
} catch (DdlException e) {
throw new DdlException("Cancel load job [" + loadJob.getId() + "] fail, " +
"label=[" + loadJob.getLabel() + "] failed msg=" + e.getMessage());
throw new DdlException(
"Cancel load job [" + loadJob.getId() + "] fail, " + "label=[" + loadJob.getLabel()
+
"] failed msg=" + e.getMessage());
}
}
}
/**
* Replay end load job.
**/
public void replayEndLoadJob(LoadJobFinalOperation operation) {
LoadJob job = idToLoadJob.get(operation.getId());
if (job == null) {
@ -367,12 +388,13 @@ public class LoadManager implements Writable {
return;
}
job.unprotectReadEndOperation(operation);
LOG.info(new LogBuilder(LogKey.LOAD_JOB, operation.getId())
.add("operation", operation)
.add("msg", "replay end load job")
.build());
LOG.info(new LogBuilder(LogKey.LOAD_JOB, operation.getId()).add("operation", operation)
.add("msg", "replay end load job").build());
}
/**
* Replay update load job.
**/
public void replayUpdateLoadJobStateInfo(LoadJob.LoadJobStateUpdateInfo info) {
long jobId = info.getJobId();
LoadJob job = idToLoadJob.get(jobId);
@ -384,6 +406,9 @@ public class LoadManager implements Writable {
job.replayUpdateStateInfo(info);
}
/**
* Get load job num, used by proc.
**/
public int getLoadJobNum(JobState jobState, long dbId) {
readLock();
try {
@ -391,23 +416,31 @@ public class LoadManager implements Writable {
if (labelToLoadJobs == null) {
return 0;
}
List<LoadJob> loadJobList = labelToLoadJobs.values().stream()
.flatMap(entity -> entity.stream()).collect(Collectors.toList());
List<LoadJob> loadJobList =
labelToLoadJobs.values().stream().flatMap(entity -> entity.stream()).collect(Collectors.toList());
return (int) loadJobList.stream().filter(entity -> entity.getState() == jobState).count();
} finally {
readUnlock();
}
}
/**
* Get load job num, used by metric.
**/
public long getLoadJobNum(JobState jobState, EtlJobType jobType) {
readLock();
try {
return idToLoadJob.values().stream().filter(j -> j.getState() == jobState && j.getJobType() == jobType).count();
return idToLoadJob.values().stream().filter(j -> j.getState() == jobState && j.getJobType() == jobType)
.count();
} finally {
readUnlock();
}
}
/**
* Remove old load job.
**/
public void removeOldLoadJob() {
long currentTimeMs = System.currentTimeMillis();
@ -437,7 +470,9 @@ public class LoadManager implements Writable {
}
}
// only for those jobs which have etl state, like SparkLoadJob
/**
* Only for those jobs which have etl state, like SparkLoadJob.
**/
public void processEtlStateJobs() {
idToLoadJob.values().stream().filter(job -> (job.jobType == EtlJobType.SPARK && job.state == JobState.ETL))
.forEach(job -> {
@ -445,8 +480,8 @@ public class LoadManager implements Writable {
((SparkLoadJob) job).updateEtlStatus();
} catch (DataQualityException e) {
LOG.info("update load job etl status failed. job id: {}", job.getId(), e);
job.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED, DataQualityException.QUALITY_FAIL_MSG),
true, true);
job.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_QUALITY_UNSATISFIED,
DataQualityException.QUALITY_FAIL_MSG), true, true);
} catch (UserException e) {
LOG.warn("update load job etl status failed. job id: {}", job.getId(), e);
job.cancelJobWithoutCheck(new FailMsg(CancelType.ETL_RUN_FAIL, e.getMessage()), true, true);
@ -456,7 +491,9 @@ public class LoadManager implements Writable {
});
}
// only for those jobs which load by PushTask
/**
* Only for those jobs which load by PushTask.
**/
public void processLoadingStateJobs() {
idToLoadJob.values().stream().filter(job -> (job.jobType == EtlJobType.SPARK && job.state == JobState.LOADING))
.forEach(job -> {
@ -473,16 +510,17 @@ public class LoadManager implements Writable {
/**
* This method will return the jobs info which can meet the condition of input param.
* @param dbId used to filter jobs which belong to this db
* @param labelValue used to filter jobs which's label is or like labelValue.
*
* @param dbId used to filter jobs which belong to this db
* @param labelValue used to filter jobs which's label is or like labelValue.
* @param accurateMatch true: filter jobs which's label is labelValue. false: filter jobs which's label like itself.
* @param statesValue used to filter jobs which's state within the statesValue set.
* @param statesValue used to filter jobs which's state within the statesValue set.
* @return The result is the list of jobInfo.
* JobInfo is a List<Comparable> which includes the comparable object: jobId, label, state etc.
* The result is unordered.
* JobInfo is a list which includes the comparable object: jobId, label, state etc.
* The result is unordered.
*/
public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String labelValue,
boolean accurateMatch, Set<String> statesValue) throws AnalysisException {
public List<List<Comparable>> getLoadJobInfosByDb(long dbId, String labelValue, boolean accurateMatch,
Set<String> statesValue) throws AnalysisException {
LinkedList<List<Comparable>> loadJobInfos = new LinkedList<List<Comparable>>();
if (!dbIdToLabelToLoadJobs.containsKey(dbId)) {
return loadJobInfos;
@ -506,8 +544,8 @@ public class LoadManager implements Writable {
Map<String, List<LoadJob>> labelToLoadJobs = dbIdToLabelToLoadJobs.get(dbId);
List<LoadJob> loadJobList = Lists.newArrayList();
if (Strings.isNullOrEmpty(labelValue)) {
loadJobList.addAll(labelToLoadJobs.values()
.stream().flatMap(Collection::stream).collect(Collectors.toList()));
loadJobList.addAll(
labelToLoadJobs.values().stream().flatMap(Collection::stream).collect(Collectors.toList()));
} else {
// check label value
if (accurateMatch) {
@ -517,7 +555,8 @@ public class LoadManager implements Writable {
loadJobList.addAll(labelToLoadJobs.get(labelValue));
} else {
// non-accurate match
PatternMatcher matcher = PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility());
PatternMatcher matcher =
PatternMatcher.createMysqlPattern(labelValue, CaseSensibility.LABEL.getCaseSensibility());
for (Map.Entry<String, List<LoadJob>> entry : labelToLoadJobs.entrySet()) {
if (matcher.match(entry.getKey())) {
loadJobList.addAll(entry.getValue());
@ -544,6 +583,9 @@ public class LoadManager implements Writable {
}
}
/**
* Get load job info.
**/
public void getLoadJobInfo(Load.JobInfo info) throws DdlException {
String fullDbName = ClusterNamespace.getFullName(info.clusterName, info.dbName);
info.dbName = fullDbName;
@ -577,8 +619,8 @@ public class LoadManager implements Writable {
}
private void submitJobs() {
loadJobScheduler.submitJob(idToLoadJob.values().stream().filter(
loadJob -> loadJob.state == JobState.PENDING).collect(Collectors.toList()));
loadJobScheduler.submitJob(idToLoadJob.values().stream().filter(loadJob -> loadJob.state == JobState.PENDING)
.collect(Collectors.toList()));
}
private void analyzeLoadJobs() {
@ -594,16 +636,13 @@ public class LoadManager implements Writable {
}
/**
* step1: if label has been used in old load jobs which belong to load class
* step2: if label has been used in v2 load jobs
* step2.1: if label has been user in v2 load jobs, the create timestamp will be checked
* step1: if label has been used in old load jobs which belong to load class.
* step2: if label has been used in v2 load jobs.
* step2.1: if label has been user in v2 load jobs, the create timestamp will be checked.
*
* @param dbId
* @param label
* @throws LabelAlreadyUsedException throw exception when label has been used by an unfinished job.
*/
private void checkLabelUsed(long dbId, String label)
throws DdlException {
private void checkLabelUsed(long dbId, String label) throws DdlException {
// if label has been used in old load jobs
Catalog.getCurrentCatalog().getLoadInstance().isLabelUsed(dbId, label);
// if label has been used in v2 of load jobs
@ -637,16 +676,22 @@ public class LoadManager implements Writable {
lock.writeLock().unlock();
}
/**
* Init.
**/
public void initJobProgress(Long jobId, TUniqueId loadId, Set<TUniqueId> fragmentIds,
List<Long> relatedBackendIds) {
List<Long> relatedBackendIds) {
LoadJob job = idToLoadJob.get(jobId);
if (job != null) {
job.initLoadProgress(loadId, fragmentIds, relatedBackendIds);
}
}
public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId,
long scannedRows, long scannedBytes, boolean isDone) {
/**
* Update.
**/
public void updateJobProgress(Long jobId, Long beId, TUniqueId loadId, TUniqueId fragmentId, long scannedRows,
long scannedBytes, boolean isDone) {
LoadJob job = idToLoadJob.get(jobId);
if (job != null) {
job.updateProgress(beId, loadId, fragmentId, scannedRows, scannedBytes, isDone);
@ -656,7 +701,8 @@ public class LoadManager implements Writable {
@Override
public void write(DataOutput out) throws IOException {
long currentTimeMs = System.currentTimeMillis();
List<LoadJob> loadJobs = idToLoadJob.values().stream().filter(t -> !t.isExpired(currentTimeMs)).collect(Collectors.toList());
List<LoadJob> loadJobs =
idToLoadJob.values().stream().filter(t -> !t.isExpired(currentTimeMs)).collect(Collectors.toList());
out.writeInt(loadJobs.size());
for (LoadJob loadJob : loadJobs) {
@ -664,6 +710,9 @@ public class LoadManager implements Writable {
}
}
/**
* Read from file.
**/
public void readFields(DataInput in) throws IOException {
long currentTimeMs = System.currentTimeMillis();
int size = in.readInt();
@ -683,12 +732,13 @@ public class LoadManager implements Writable {
if (loadJob.getState() == JobState.PENDING) {
// bad case. When a mini load job is created and then FE restart.
// the job will be in PENDING state forever.
// This is a temp solution to remove these jobs. And the mini load job should be deprecated in Doris v1.1
TransactionState state = Catalog.getCurrentCatalog().getGlobalTransactionMgr().getTransactionState(
loadJob.getDbId(), loadJob.getTransactionId());
// This is a temp solution to remove these jobs.
// And the mini load job should be deprecated in Doris v1.1
TransactionState state = Catalog.getCurrentCatalog().getGlobalTransactionMgr()
.getTransactionState(loadJob.getDbId(), loadJob.getTransactionId());
if (state == null) {
LOG.warn("skip mini load job {} in db {} with PENDING state and with txn: {}",
loadJob.getId(), loadJob.getDbId(), loadJob.getTransactionId());
LOG.warn("skip mini load job {} in db {} with PENDING state and with txn: {}", loadJob.getId(),
loadJob.getDbId(), loadJob.getTransactionId());
continue;
}
}

View File

@ -170,18 +170,7 @@ public class DdlExecutor {
catalog.getLoadManager().createLoadJobFromStmt(loadStmt);
}
} else if (ddlStmt instanceof CancelLoadStmt) {
boolean isAccurateMatch = ((CancelLoadStmt) ddlStmt).isAccurateMatch();
boolean isLabelExist = catalog.getLoadInstance().isLabelExist(
((CancelLoadStmt) ddlStmt).getDbName(),
((CancelLoadStmt) ddlStmt).getLabel(), isAccurateMatch);
if (isLabelExist) {
catalog.getLoadInstance().cancelLoadJob((CancelLoadStmt) ddlStmt,
isAccurateMatch);
}
if (!isLabelExist || isAccurateMatch) {
catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt,
isAccurateMatch);
}
catalog.getLoadManager().cancelLoadJob((CancelLoadStmt) ddlStmt);
} else if (ddlStmt instanceof CreateRoutineLoadStmt) {
catalog.getRoutineLoadManager().createRoutineLoadJob((CreateRoutineLoadStmt) ddlStmt);
} else if (ddlStmt instanceof PauseRoutineLoadStmt) {

View File

@ -17,88 +17,123 @@
package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.FakeCatalog;
import org.apache.doris.analysis.CompoundPredicate.Operator;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.load.loadv2.InsertLoadJob;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.utframe.TestWithFeService;
import mockit.Expectations;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
public class CancelLoadStmtTest extends TestWithFeService {
public class CancelLoadStmtTest {
private Analyzer analyzer;
private Catalog catalog;
FakeCatalog fakeCatalog;
@Before
public void setUp() {
fakeCatalog = new FakeCatalog();
catalog = AccessTestUtil.fetchAdminCatalog();
FakeCatalog.setCatalog(catalog);
analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
new Expectations(analyzer) {
{
analyzer.getDefaultDb();
minTimes = 0;
result = "testCluster:testDb";
analyzer.getQualifiedUser();
minTimes = 0;
result = "testCluster:testUser";
analyzer.getClusterName();
minTimes = 0;
result = "testCluster";
analyzer.getCatalog();
minTimes = 0;
result = catalog;
}
};
@Override
protected void runBeforeAll() throws Exception {
FeConstants.runningUnitTest = true;
createDatabase("testDb");
useDatabase("testDb");
createTable("create table table1\n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n"
+ "properties(\"replication_num\" = \"1\");");
analyzer = new Analyzer(connectContext.getCatalog(), connectContext);
}
@Test
public void testNormal() throws UserException, AnalysisException {
SlotRef slotRef = new SlotRef(null, "label");
StringLiteral stringLiteral = new StringLiteral("doris_test_label");
public void testNormal() throws UserException {
SlotRef labelSlotRef = new SlotRef(null, "label");
StringLiteral labelStringLiteral = new StringLiteral("doris_test_label");
BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral);
CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate);
stmt.analyze(analyzer);
Assert.assertTrue(stmt.isAccurateMatch());
Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label` = 'doris_test_label'", stmt.toString());
SlotRef stateSlotRef = new SlotRef(null, "state");
StringLiteral stateStringLiteral = new StringLiteral("FINISHED");
LikePredicate likePredicate = new LikePredicate(LikePredicate.Operator.LIKE, slotRef, stringLiteral);
stmt = new CancelLoadStmt(null, likePredicate);
BinaryPredicate labelBinaryPredicate =
new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, labelStringLiteral);
CancelLoadStmt stmt = new CancelLoadStmt(null, labelBinaryPredicate);
stmt.analyze(analyzer);
Assert.assertFalse(stmt.isAccurateMatch());
Assert.assertEquals("CANCEL LOAD FROM testCluster:testDb WHERE `label` LIKE 'doris_test_label'", stmt.toString());
Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label'",
stmt.toString());
BinaryPredicate stateBinaryPredicate =
new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral);
stmt = new CancelLoadStmt(null, stateBinaryPredicate);
stmt.analyze(analyzer);
Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `state` = 'FINISHED'", stmt.toString());
LikePredicate labelLikePredicate =
new LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef, labelStringLiteral);
stmt = new CancelLoadStmt(null, labelLikePredicate);
stmt.analyze(analyzer);
Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `label` LIKE 'doris_test_label'",
stmt.toString());
CompoundPredicate compoundAndPredicate =
new CompoundPredicate(Operator.AND, labelBinaryPredicate, stateBinaryPredicate);
stmt = new CancelLoadStmt(null, compoundAndPredicate);
stmt.analyze(analyzer);
Assertions.assertEquals(
"CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label' AND `state` = 'FINISHED'",
stmt.toString());
CompoundPredicate compoundOrPredicate =
new CompoundPredicate(Operator.OR, labelBinaryPredicate, stateBinaryPredicate);
stmt = new CancelLoadStmt(null, compoundOrPredicate);
stmt.analyze(analyzer);
Assertions.assertEquals(
"CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label' OR `state` = 'FINISHED'",
stmt.toString());
// test match
List<LoadJob> loadJobs = new ArrayList<>();
InsertLoadJob insertLoadJob1 = new InsertLoadJob("doris_test_label", 1L, 10003L, 10005L, 0, "", "");
loadJobs.add(insertLoadJob1);
InsertLoadJob insertLoadJob2 = new InsertLoadJob("doris_test_label_1", 2L, 10003L, 10005L, 0, "", "");
loadJobs.add(insertLoadJob2);
InsertLoadJob insertLoadJob3 = new InsertLoadJob("doris_test_label_2", 3L, 10003L, 10005L, 0, "", "");
loadJobs.add(insertLoadJob3);
// label
stmt = new CancelLoadStmt(null, labelBinaryPredicate);
stmt.analyze(analyzer);
List<LoadJob> matchLoadJobs = new ArrayList<>();
LoadManager.addNeedCancelLoadJob(stmt, loadJobs, matchLoadJobs);
Assertions.assertEquals(1, matchLoadJobs.size());
// state
matchLoadJobs.clear();
stmt = new CancelLoadStmt(null, stateBinaryPredicate);
stmt.analyze(analyzer);
LoadManager.addNeedCancelLoadJob(stmt, loadJobs, matchLoadJobs);
Assertions.assertEquals(3, matchLoadJobs.size());
// or
matchLoadJobs.clear();
stmt = new CancelLoadStmt(null, compoundOrPredicate);
stmt.analyze(analyzer);
LoadManager.addNeedCancelLoadJob(stmt, loadJobs, matchLoadJobs);
Assertions.assertEquals(3, matchLoadJobs.size());
// and
matchLoadJobs.clear();
stmt = new CancelLoadStmt(null, compoundAndPredicate);
stmt.analyze(analyzer);
LoadManager.addNeedCancelLoadJob(stmt, loadJobs, matchLoadJobs);
Assertions.assertEquals(1, matchLoadJobs.size());
}
@Test(expected = AnalysisException.class)
public void testNoDb() throws UserException, AnalysisException {
SlotRef slotRef = new SlotRef(null, "label");
StringLiteral stringLiteral = new StringLiteral("doris_test_label");
new Expectations(analyzer) {
{
analyzer.getDefaultDb();
minTimes = 0;
result = "";
@Test
public void testError() {
SlotRef stateSlotRef = new SlotRef(null, "state");
StringLiteral stateStringLiteral = new StringLiteral("FINISHED");
analyzer.getClusterName();
minTimes = 0;
result = "testCluster";
}
};
BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, slotRef, stringLiteral);
CancelLoadStmt stmt = new CancelLoadStmt(null, binaryPredicate);
stmt.analyze(analyzer);
Assert.fail("No exception throws.");
LikePredicate stateLikePredicate =
new LikePredicate(LikePredicate.Operator.LIKE, stateSlotRef, stateStringLiteral);
CancelLoadStmt stmt = new CancelLoadStmt(null, stateLikePredicate);
ExceptionChecker.expectThrowsWithMsg(AnalysisException.class, "Only label can use like",
() -> stmt.analyze(analyzer));
}
}