This commit is contained in:
zh0122
2020-11-20 09:48:27 +08:00
committed by GitHub
parent d1a7f1d2c6
commit 64b219f04d
8 changed files with 15 additions and 15 deletions

View File

@ -172,7 +172,7 @@ The following is a detailed explanation of some parameters of the data descripti
+ where predicate
The where statement in ```data_desc``` is responsible for filtering the data that has been transformed. The unselected rows which is filtered by where predicate will not be calculated in ```max_filter_ratio``` . If there are more then one where predicate of the same table , the multi where predicate will be merged from different ```data_desc``` and the policy is AND.
The where statement in ```data_desc``` is responsible for filtering the data that has been transformed. The unselected rows which is filtered by where predicate will not be calculated in ```max_filter_ratio``` . If there are more than one where predicate of the same table , the multi where predicate will be merged from different ```data_desc``` and the policy is AND.
+ merge\_type
The type of data merging supports three types: APPEND, DELETE, and MERGE. APPEND is the default value, which means that all this batch of data needs to be appended to the existing data. DELETE means to delete all rows with the same key as this batch of data. MERGE semantics Need to be used in conjunction with the delete condition, which means that the data that meets the delete condition is processed according to DELETE semantics and the rest is processed according to APPEND semantics

View File

@ -579,7 +579,7 @@ public class Config extends ConfigBase {
/**
* Default number of waiting jobs for routine load and version 2 of load
* This is a desired number.
* In some situation, such as switch the master, the current number is maybe more then desired_max_waiting_jobs
* In some situation, such as switch the master, the current number is maybe more than desired_max_waiting_jobs
*/
@ConfField(mutable = true, masterOnly = true)
public static int desired_max_waiting_jobs = 100;

View File

@ -107,7 +107,7 @@ public class LoadManager implements Writable{
throw new DdlException("LoadManager only support the broker and spark load.");
}
if (loadJobScheduler.isQueueFull()) {
throw new DdlException("There are more then " + Config.desired_max_waiting_jobs + " load jobs in waiting queue, "
throw new DdlException("There are more than " + Config.desired_max_waiting_jobs + " load jobs in waiting queue, "
+ "please retry later.");
}
loadJob = BulkLoadJob.fromLoadStmt(stmt);

View File

@ -163,7 +163,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
protected LoadDataSourceType dataSourceType;
// max number of error data in max batch rows * 10
// maxErrorNum / (maxBatchRows * 10) = max error rate of routine load job
// if current error rate is more then max error rate, the job will be paused
// if current error rate is more than max error rate, the job will be paused
protected long maxErrorNum = DEFAULT_MAX_ERROR_NUM; // optional
protected long execMemLimit = DEFAULT_EXEC_MEM_LIMIT;
// include strict mode
@ -670,7 +670,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
}
}
// if rate of error data is more then max_filter_ratio, pause job
// if rate of error data is more than max_filter_ratio, pause job
protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException {
updateNumOfData(attachment.getTotalRows(), attachment.getFilteredRows(), attachment.getUnselectedRows(),
attachment.getReceivedBytes(), attachment.getTaskExecutionTimeMs(),
@ -700,13 +700,13 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
.add("current_total_rows", currentTotalRows)
.add("current_error_rows", currentErrorRows)
.add("max_error_num", maxErrorNum)
.add("msg", "current error rows is more then max error num, begin to pause job")
.add("msg", "current error rows is more than max error num, begin to pause job")
.build());
// if this is a replay thread, the update state should already be replayed by OP_CHANGE_ROUTINE_LOAD_JOB
if (!isReplay) {
// remove all of task in jobs and change job state to paused
updateState(JobState.PAUSED,
new ErrorReason(InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR, "current error rows of job is more then max error num"),
new ErrorReason(InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR, "current error rows of job is more than max error num"),
isReplay);
}
}
@ -717,7 +717,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
.add("current_error_rows", currentErrorRows)
.add("max_error_num", maxErrorNum)
.add("msg", "reset current total rows and current error rows "
+ "when current total rows is more then base")
+ "when current total rows is more than base")
.build());
}
// reset currentTotalNum and currentErrorNum
@ -728,12 +728,12 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
.add("current_total_rows", currentTotalRows)
.add("current_error_rows", currentErrorRows)
.add("max_error_num", maxErrorNum)
.add("msg", "current error rows is more then max error rows, begin to pause job")
.add("msg", "current error rows is more than max error rows, begin to pause job")
.build());
if (!isReplay) {
// remove all of task in jobs and change job state to paused
updateState(JobState.PAUSED,
new ErrorReason(InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR, "current error rows is more then max error num"),
new ErrorReason(InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR, "current error rows is more than max error num"),
isReplay);
}
// reset currentTotalNum and currentErrorNum
@ -949,7 +949,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
}
}
// the task is aborted when the correct number of rows is more then 0
// the task is aborted when the correct number of rows is more than 0
// be will abort txn when all of kafka data is wrong or total consume data is 0
// txn will be aborted but progress will be update
// progress will be update otherwise the progress will be hung

View File

@ -158,7 +158,7 @@ public class RoutineLoadManager implements Writable {
}
if (getRoutineLoadJobByState(Sets.newHashSet(RoutineLoadJob.JobState.NEED_SCHEDULE,
RoutineLoadJob.JobState.RUNNING, RoutineLoadJob.JobState.PAUSED)).size() > Config.max_routine_load_job_num) {
throw new DdlException("There are more then " + Config.max_routine_load_job_num
throw new DdlException("There are more than " + Config.max_routine_load_job_num
+ " routine load jobs are running. exceed limit.");
}

View File

@ -77,7 +77,7 @@ public class RoutineLoadScheduler extends MasterDaemon {
UserException userException = null;
try {
routineLoadJob.prepare();
// judge nums of tasks more then max concurrent tasks of cluster
// judge nums of tasks more than max concurrent tasks of cluster
int desiredConcurrentTaskNum = routineLoadJob.calculateCurrentConcurrentTaskNum();
if (desiredConcurrentTaskNum <= 0) {
// the job will be rescheduled later.

View File

@ -48,7 +48,7 @@ import java.util.concurrent.LinkedBlockingQueue;
/**
* Routine load task scheduler is a function which allocate task to be.
* Step1: update backend slot if interval more then BACKEND_SLOT_UPDATE_INTERVAL_MS
* Step1: update backend slot if interval more than BACKEND_SLOT_UPDATE_INTERVAL_MS
* Step2: submit beIdToBatchTask when queue is empty
* Step3: take a task from queue and schedule this task
*

View File

@ -26,7 +26,7 @@ import org.apache.doris.thrift.TPlanNodeType;
/**
* Assert num rows node is used to determine whether the number of rows is less then desired num of rows.
* The rows are the result of subqueryString.
* If the number of rows is more then the desired num of rows, the query will be cancelled.
* If the number of rows is more than the desired num of rows, the query will be cancelled.
* The cancelled reason will be reported by Backend and displayed back to the user.
*/
public class AssertNumRowsNode extends PlanNode {