Modify insert operation's behavior (#1444)

Before changing default insert operation to streaming load, if the select result
of a insert stmt is empty, a label will still be returned to the user, and user
can use this label to check the insert load job's status.

After changing the insert operation, if the select result is empty, a exception
will be thrown to user client directly without any label.

This new usage pattern is not friendly to already existed users, which is forcing
them to change their way of using insert operation.

So I add a new FE config 'using_old_load_usage_pattern', default is false.
If set to true, a label will be returned to user even if the select result is empty.
This commit is contained in:
Mingyu Chen
2019-07-09 10:17:09 +08:00
committed by ZHAO Chun
parent dc64521607
commit bde362c3cd
8 changed files with 122 additions and 64 deletions

View File

@ -824,5 +824,13 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_relocate = false;
@ConfField(mutable = true, masterOnly = true) public static boolean disable_colocate_balance = false;
/*
* If set to true, the insert stmt with processing error will still return a label to user.
* And user can use this label to check the load job's status.
* The default value is false, which means if insert operation encounter errors,
* exception will be thrown to user client directly without load label.
*/
@ConfField(mutable = true, masterOnly = true) public static boolean using_old_load_usage_pattern = false;
}

View File

@ -23,6 +23,10 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.FailMsg.CancelType;
import com.google.common.base.Strings;
import java.io.DataInput;
import java.io.DataOutput;
@ -46,14 +50,20 @@ public class InsertLoadJob extends LoadJob {
this.jobType = EtlJobType.INSERT;
}
public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp) {
public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp, String failMsg) {
super(dbId, label);
this.tableId = tableId;
this.createTimestamp = createTimestamp;
this.loadStartTimestamp = createTimestamp;
this.finishTimestamp = System.currentTimeMillis();
this.state = JobState.FINISHED;
this.progress = 100;
if (Strings.isNullOrEmpty(failMsg)) {
this.state = JobState.FINISHED;
this.progress = 100;
} else {
this.state = JobState.CANCELLED;
this.failMsg = new FailMsg(CancelType.LOAD_RUN_FAIL, failMsg);
this.progress = 0;
}
this.jobType = EtlJobType.INSERT;
this.timeoutSecond = Config.insert_load_default_timeout_second;
}

View File

@ -251,7 +251,7 @@ public class LoadManager implements Writable{
}
public void recordFinishedLoadJob(String label, String dbName, long tableId, EtlJobType jobType,
long createTimestamp) throws MetaNotFoundException {
long createTimestamp, String failMsg) throws MetaNotFoundException {
// get db id
Database db = Catalog.getCurrentCatalog().getDb(dbName);
@ -262,7 +262,7 @@ public class LoadManager implements Writable{
LoadJob loadJob;
switch (jobType) {
case INSERT:
loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp);
loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp, failMsg);
break;
default:
return;

View File

@ -589,74 +589,111 @@ public class StmtExecutor {
return;
}
// assign request_id
long createTime = System.currentTimeMillis();
UUID uuid = UUID.randomUUID();
context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()));
Throwable throwable = null;
try {
// assign request_id
context.setQueryId(new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()));
coord = new Coordinator(context, analyzer, planner);
coord.setQueryType(TQueryType.LOAD);
coord = new Coordinator(context, analyzer, planner);
coord.setQueryType(TQueryType.LOAD);
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord);
QeProcessorImpl.INSTANCE.registerQuery(context.queryId(), coord);
coord.exec();
coord.exec();
coord.join(context.getSessionVariable().getQueryTimeoutS());
if (!coord.isDone()) {
coord.cancel();
ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
}
if (!coord.getExecStatus().ok()) {
String errMsg = coord.getExecStatus().getErrorMsg();
LOG.warn("insert failed: {}", errMsg);
// hide host info
int hostIndex = errMsg.indexOf("host");
if (hostIndex != -1) {
errMsg = errMsg.substring(0, hostIndex);
coord.join(context.getSessionVariable().getQueryTimeoutS());
if (!coord.isDone()) {
coord.cancel();
ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT);
}
ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
}
LOG.info("delta files is {}", coord.getDeltaUrls());
if (!coord.getExecStatus().ok()) {
String errMsg = coord.getExecStatus().getErrorMsg();
LOG.warn("insert failed: {}", errMsg);
if (context.getSessionVariable().getEnableInsertStrict()) {
Map<String, String> counters = coord.getLoadCounters();
String strValue = counters.get(LoadEtlTask.DPP_ABNORMAL_ALL);
if (strValue != null && Long.valueOf(strValue) > 0) {
throw new UserException("Insert has filtered data in strict mode, tracking_url="
+ coord.getTrackingUrl());
// hide host info
int hostIndex = errMsg.indexOf("host");
if (hostIndex != -1) {
errMsg = errMsg.substring(0, hostIndex);
}
ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
}
LOG.info("delta files is {}", coord.getDeltaUrls());
if (context.getSessionVariable().getEnableInsertStrict()) {
Map<String, String> counters = coord.getLoadCounters();
String strValue = counters.get(LoadEtlTask.DPP_ABNORMAL_ALL);
if (strValue != null && Long.valueOf(strValue) > 0) {
throw new UserException("Insert has filtered data in strict mode, tracking_url="
+ coord.getTrackingUrl());
}
}
if (insertStmt.getTargetTable().getType() != TableType.OLAP) {
// no need to add load job.
// MySQL table is already being inserted.
context.getState().setOk();
return;
}
Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
insertStmt.getDbObj(), insertStmt.getTransactionId(),
TabletCommitInfo.fromThrift(coord.getCommitInfos()),
5000);
} catch (Throwable t) {
// if any throwable being thrown during insert operation, first we should abort this txn
LOG.warn("handle insert stmt fail: {}", DebugUtil.printId(uuid), t);
try {
Catalog.getCurrentGlobalTransactionMgr().abortTransaction(
insertStmt.getTransactionId(),
t.getMessage() == null ? "unknown reason" : t.getMessage());
} catch (Exception abortTxnException) {
// just print a log if abort txn failed. This failure do not need to pass to user.
// user only concern abort how txn failed.
LOG.warn("errors when abort txn", abortTxnException);
}
if (!Config.using_old_load_usage_pattern) {
// if not using old usage pattern, the exception will be thrown to user directly without
// a label
throw t;
}
/*
* If config 'using_old_load_usage_pattern' is true.
* Doris will return a label to user, and user can use this label to check load job's status,
* which exactly like the old insert stmt usage pattern.
*/
throwable = t;
}
if (insertStmt.getTargetTable().getType() != TableType.OLAP) {
// no need to add load job.
// mysql table is already being inserted.
context.getState().setOk();
return;
}
Catalog.getCurrentGlobalTransactionMgr().commitAndPublishTransaction(
insertStmt.getDbObj(), insertStmt.getTransactionId(),
TabletCommitInfo.fromThrift(coord.getCommitInfos()),
5000);
context.getState().setOk();
// record the non-streaming insert info for show load
if (!insertStmt.isStreaming()) {
// record insert info for show load stmt
if (!insertStmt.isStreaming() || Config.using_old_load_usage_pattern) {
try {
context.getCatalog().getLoadManager().recordFinishedLoadJob(
uuid.toString(),
insertStmt.getDb(),
insertStmt.getTargetTable().getId(),
EtlJobType.INSERT,
System.currentTimeMillis()
createTime,
throwable == null ? "" : throwable.getMessage()
);
} catch (MetaNotFoundException e) {
LOG.warn("Record info of insert load with error " + e.getMessage(), e);
context.getState().setOk("Insert has been finished while info has not been recorded with " + e.getMessage());
LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
context.getState().setError("Failed to record info of insert load job, but insert job is "
+ (throwable == null ? "success" : "failed"));
return;
}
// set to OK, which means the insert load job is successfully submitted.
// and user can check the job's status by label.
context.getState().setOk("{'label':'" + uuid.toString() + "'}");
} else {
// just return OK without label, which means this job is successfully done
context.getState().setOk();
}
}

View File

@ -242,12 +242,8 @@ public class GlobalTransactionMgr {
}
LOG.debug("try to commit transaction: {}", transactionId);
if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
throw new TransactionCommitFailedException("all partitions have no load data");
}
// 1. check status
// the caller method already own db lock, we not obtain db lock here
// the caller method already own db lock, we do not obtain db lock here
Database db = catalog.getDb(dbId);
if (null == db) {
throw new MetaNotFoundException("could not find db [" + dbId + "]");
@ -257,6 +253,7 @@ public class GlobalTransactionMgr {
|| transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
throw new TransactionCommitFailedException(transactionState.getReason());
}
if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
return;
}
@ -264,6 +261,10 @@ public class GlobalTransactionMgr {
return;
}
if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()) {
throw new TransactionCommitFailedException(TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG);
}
// update transaction state extra if exists
if (txnCommitAttachment != null) {
transactionState.setTxnCommitAttachment(txnCommitAttachment);

View File

@ -22,6 +22,8 @@ public class TransactionCommitFailedException extends TransactionException {
private static final long serialVersionUID = -2528170792631761535L;
public static final String NO_DATA_TO_LOAD_MSG = "all partitions have no load data";
public TransactionCommitFailedException(String msg) {
super(msg);
}

View File

@ -38,7 +38,7 @@ public class InsertLoadJobTest {
public void testGetTableNames(@Mocked Catalog catalog,
@Injectable Database database,
@Injectable Table table) throws MetaNotFoundException {
InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1000);
InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1000, "");
String tableName = "table1";
new Expectations() {
{

View File

@ -17,11 +17,6 @@
package org.apache.doris.load.loadv2;
import mockit.Deencapsulation;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Catalog;
@ -48,6 +43,11 @@ import java.io.FileOutputStream;
import java.util.List;
import java.util.Map;
import mockit.Deencapsulation;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
public class LoadManagerTest {
private LoadManager loadManager;
private static final String methodName = "getIdToLoadJobs";
@ -55,7 +55,7 @@ public class LoadManagerTest {
@Before
public void setUp() throws Exception {
loadManager = new LoadManager(new LoadJobScheduler());
LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis());
LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis(), "");
Deencapsulation.invoke(loadManager, "addLoadJob", job1);
}