[RoutineLoad] Auto Resume RoutineLoadJob (#2958)

When all backends restart, the routine load job can be resumed.
This commit is contained in:
worker24h
2020-03-02 13:27:35 +08:00
committed by GitHub
parent df56588bb5
commit ef4bb0c011
20 changed files with 404 additions and 97 deletions

View File

@ -1,4 +1,4 @@
<!--
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
@ -34,7 +34,7 @@ under the License.
## 原理
```
```
+---------+
| Client |
+----+----+
@ -87,13 +87,13 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或
* columns_mapping
`columns_mapping` 主要用于指定表结构和 message 中的列映射关系,以及一些列的转换。如果不指定,Doris 会默认 message 中的列和表结构的列按顺序一一对应。虽然在正常情况下,如果源数据正好一一对应,则不指定也可以进行正常的数据导入。但是我们依然强烈建议用户**显式的指定列映射关系**。这样当表结构发生变化(比如增加一个 nullable 的列),或者源文件发生变化(比如增加了一列)时,导入任务依然可以继续进行。否则,当发生上述变动后,因为列映射关系不再一一对应,导入将报错。
`columns_mapping` 中我们同样可以使用一些内置函数进行列的转换。但需要注意函数参数对应的实际列类型。举例说明:
假设用户需要导入只包含 `k1` 一列的表,列类型为 `int`。并且需要将源文件中的 null 值转换为 0。该功能可以通过 `ifnull` 函数实现。正确是的使用方式如下:
`COLUMNS (xx, k1=ifnull(xx, "3"))`
注意这里我们使用 `"3"` 而不是 `3`,虽然 `k1` 的类型为 `int`。因为对于导入任务来说,源数据中的列类型都为 `varchar`,所以这里 `xx` 虚拟列的类型也为 `varchar`。所以我们需要使用 `"3"` 来进行对应的匹配,否则 `ifnull` 函数无法找到参数为 `(varchar, int)` 的函数签名,将出现错误。
再举例,假设用户需要导入只包含 `k1` 一列的表,列类型为 `int`。并且需要将源文件中的对应列进行处理:将负数转换为正数,而将正数乘以 100。这个功能可以通过 `case when` 函数实现,正确写法应如下:
@ -109,43 +109,43 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或
* desired\_concurrent\_number
`desired_concurrent_number` 用于指定一个例行作业期望的并发度。即一个作业,最多有多少 task 同时在执行。对于 Kafka 导入而言,当前的实际并发度计算如下:
```
Min(partition num, desired_concurrent_number, alive_backend_num, Config.max_routine_load_task_concurrrent_num)
```
其中 `Config.max_routine_load_task_concurrrent_num` 是系统的一个默认的最大并发数限制。这是一个 FE 配置,可以通过改配置调整。默认为 5。
其中 partition num 指订阅的 Kafka topic 的 partition 数量。`alive_backend_num` 是当前正常的 BE 节点数。
* max\_batch\_interval/max\_batch\_rows/max\_batch\_size
这三个参数用于控制单个任务的执行时间。其中任意一个阈值达到,则任务结束。其中 `max_batch_rows` 用于记录从 Kafka 中读取到的数据行数。`max_batch_size` 用于记录从 Kafka 中读取到的数据量,单位是字节。目前一个任务的消费速率大约为 5-10MB/s。
那么假设一行数据 500B,用户希望每 100MB 或 10 秒为一个 task。100MB 的预期处理时间是 10-20 秒,对应的行数约为 200000 行。则一个合理的配置为:
```
"max_batch_interval" = "10",
"max_batch_rows" = "200000",
"max_batch_size" = "104857600"
```
以上示例中的参数也是这些配置的默认参数。
* max\_error\_number
`max_error_number` 用于控制错误率。在错误率过高的时候,作业会自动暂停。因为整个作业是面向数据流的,且由于数据流的无边界性,我们无法像其他导入任务一样,通过一个错误比例来计算错误率。因此这里提供了一种新的计算方式,来计算数据流中的错误比例。
我们设定了一个采样窗口。窗口的大小为 `max_batch_rows * 10`。在一个采样窗口内,如果错误行数超过 `max_error_number`,则作业被暂停。如果没有超过,则下一个窗口重新开始计算错误行数。
我们假设 `max_batch_rows` 为 200000,则窗口大小为 2000000。设 `max_error_number` 为 20000,即用户预期每 2000000 行的错误行为 20000。即错误率为 1%。但是因为不是每批次任务正好消费 200000 行,所以窗口的实际范围是 [2000000, 2200000],即有 10% 的统计误差。
错误行不包括通过 where 条件过滤掉的行。但是包括没有对应的 Doris 表中的分区的行。
* data\_source\_properties
`data_source_properties` 中可以指定消费具体的 Kakfa partition。如果不指定,则默认消费所订阅的 topic 的所有 partition。
注意,当显式的指定了 partition,则导入作业不会再动态的检测 Kafka partition 的变化。如果没有指定,则会根据 kafka partition 的变化,动态调整需要消费的 partition。
* strict\_mode
@ -157,7 +157,7 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或
1. 对于列类型转换来说,如果 strict mode 为true,则错误的数据将被 filter。这里的错误数据是指:原始数据并不为空值,在参与列类型转换后结果为空值的这一类数据。
2. 对于导入的某列由函数变换生成时,strict mode 对其不产生影响。
3. 对于导入的某列类型包含范围限制的,如果原始数据能正常通过类型转换,但无法通过范围限制的,strict mode 对其也不产生影响。例如:如果类型是 decimal(1,0), 原始数据为 10,则属于可以通过类型转换但不在列声明的范围内。这种数据 strict 对其不产生影响。
#### strict mode 与 source data 的导入关系
@ -174,7 +174,7 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或
|not null | 1 | 1 | true or false | correct data|
这里以列类型为 Decimal(1,0) 举例
>注:当表中的列允许导入空值时
|source data | source data example | string to int | strict_mode | result|
@ -185,7 +185,7 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或
|not null | 1 or 10 | 1 | true or false | correct data|
> 注意:10 虽然是一个超过范围的值,但是因为其类型符合 decimal的要求,所以 strict mode对其不产生影响。10 最后会在其他 ETL 处理流程中被过滤。但不会被 strict mode 过滤。
#### 访问 SSL 认证的 Kafka 集群
访问 SSL 认证的 Kafka 集群需要用户提供用于认证 Kafka Broker 公钥的证书文件(ca.pem)。如果 Kafka 集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key),以及密钥密码。这里所需的文件需要先通过 `CREAE FILE` 命令上传到 Doris 中,**并且 catalog 名称为 `kafka`**。`CREATE FILE` 命令的具体帮助可以参见 `HELP CREATE FILE;`。这里给出示例:
@ -219,7 +219,7 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或
```
> Doris 通过 Kafka 的 C++ API `librdkafka` 来访问 Kafka 集群。`librdkafka` 所支持的参数可以参阅
>
>
> `https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md`
@ -232,7 +232,7 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或
只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。
### 作业控制
用户可以通过 `STOP/PAUSE/RESUME` 三个命令来控制作业的停止,暂停和重启。可以通过 `HELP STOP ROUTINE LOAD;`, `HELP PAUSE ROUTINE LOAD;` 以及 `HELP RESUME ROUTINE LOAD;` 三个命令查看帮助和示例。
## 其他说明
@ -255,10 +255,10 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或
当用户在创建例行导入声明的 `kafka_topic` 在kafka集群中不存在时。
* 如果用户 kafka 集群的 broker 设置了 `auto.create.topics.enable = true`,则 `kafka_topic` 会先被自动创建,自动创建的 partition 个数是由**用户方的kafka集群**中的 broker 配置 `num.partitions` 决定的。例行作业会正常的不断读取该 topic 的数据。
* 如果用户 kafka 集群的 broker 设置了 `auto.create.topics.enable = true`,则 `kafka_topic` 会先被自动创建,自动创建的 partition 个数是由**用户方的kafka集群**中的 broker 配置 `num.partitions` 决定的。例行作业会正常的不断读取该 topic 的数据。
* 如果用户 kafka 集群的 broker 设置了 `auto.create.topics.enable = false`, 则 topic 不会被自动创建,例行作业会在没有读取任何数据之前就被暂停,状态为 `PAUSED`。
所以,如果用户希望当 kafka topic 不存在的时候,被例行作业自动创建的话,只需要将**用户方的kafka集群**中的 broker 设置 `auto.create.topics.enable = true` 即可
所以,如果用户希望当 kafka topic 不存在的时候,被例行作业自动创建的话,只需要将**用户方的kafka集群**中的 broker 设置 `auto.create.topics.enable = true` 即可。
## 相关参数
@ -284,3 +284,9 @@ FE 中的 JobScheduler 根据汇报结果,继续生成后续新的 Task,或
BE 配置项。默认为 10,即 10MB/s。该参数为导入通用参数,不限于例行导入作业。该参数限制了导入数据写入磁盘的速度。对于 SSD 等高性能存储设备,可以适当增加这个限速。
6. max\_tolerable\_backend\_down\_num
FE 配置项,默认值是0。在满足某些条件下,Doris可PAUSED的任务重新调度,即变成RUNNING。该参数为0代表只有所有BE节点是alive状态才允许重新调度。
7. period\_of\_auto\_resume\_min
FE 配置项,默认是5分钟。Doris重新调度,只会在5分钟这个周期内,最多尝试3次. 如果3次都失败则锁定当前任务,后续不在进行调度。但可通过人为干预,进行手动恢复。

View File

@ -1,4 +1,4 @@
<!--
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
@ -34,7 +34,7 @@ This document describes the implementation principles, usage, and best practices
## Principle
```
```
+---------+
| Client |
+----+----+
@ -115,7 +115,7 @@ The detailed syntax for creating a routine load task can be connected to Doris a
```
Where `Config.max_routine_load_task_concurrrent_num` is a default maximum concurrency limit for the system. This is a FE configuration that can be adjusted by changing the configuration. The default is 5.
Where partition num refers to the number of partitions for the Kafka topic subscribed to. `alive_backend_num` is the current number of normal BE nodes.
* max\_batch\_interval/max\_batch\_rows/max\_batch\_size
@ -137,7 +137,7 @@ The detailed syntax for creating a routine load task can be connected to Doris a
`max_error_number` is used to control the error rate. When the error rate is too high, the job will automatically pause. Because the entire job is stream-oriented, and because of the borderless nature of the data stream, we can't calculate the error rate with an error ratio like other load tasks. So here is a new way of calculating to calculate the proportion of errors in the data stream.
We have set up a sampling window. The size of the window is `max_batch_rows * 10`. Within a sampling window, if the number of error lines exceeds `max_error_number`, the job is suspended. If it is not exceeded, the next window restarts counting the number of error lines.
We assume that `max_batch_rows` is 200000 and the window size is 2000000. Let `max_error_number` be 20000, that is, the user expects an error behavior of 20000 for every 2000000 lines. That is, the error rate is 1%. But because not every batch of tasks consumes 200000 rows, the actual range of the window is [2000000, 2200000], which is 10% statistical error.
The error line does not include rows that are filtered out by the where condition. But include rows that do not have a partition in the corresponding Doris table.
@ -272,7 +272,7 @@ Some system configuration parameters can affect the use of routine loads.
The FE configuration item, which defaults to 5, can be modified at runtime. This parameter limits the number of subtasks that can be executed concurrently by each BE node. It is recommended to maintain the default value. If the setting is too large, it may cause too many concurrent tasks and occupy cluster resources.
3. max\_routine\_load\_job\_num
The FE configuration item, which defaults to 100, can be modified at runtime. This parameter limits the total number of routine load jobs, including NEED_SCHEDULED, RUNNING, PAUSE. After the overtime, you cannot submit a new assignment.
4. max\_consumer\_num\_per\_group
@ -282,3 +282,9 @@ Some system configuration parameters can affect the use of routine loads.
5. push\_write\_mbytes\_per\_sec
BE configuration item. The default is 10, which is 10MB/s. This parameter is to load common parameters, not limited to routine load jobs. This parameter limits the speed at which loaded data is written to disk. For high-performance storage devices such as SSDs, this speed limit can be appropriately increased.
6. max\_tolerable\_backend\_down\_num
FE configuration item, the default is 0. Under certain conditions, Doris can reschedule PAUSED tasks, that becomes RUNNING?This parameter is 0, which means that rescheduling is allowed only when all BE nodes are in alive state.
7. period\_of\_auto\_resume\_min
FE configuration item, the default is 5 mins. Doris reschedules will only try at most 3 times in the 5 minute period. If all 3 times fail, the current task will be locked, and auto-scheduling will not be performed. However, manual intervention can be performed.

View File

@ -1005,5 +1005,18 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = true)
public static boolean enable_materialized_view = false;
/**
* it can't auto-resume routine load job as long as one of the backends is down
*/
@ConfField(mutable = true, masterOnly = true)
public static int max_tolerable_backend_down_num = 0;
/**
* a period for auto resume routine load
*/
@ConfField(mutable = true, masterOnly = true)
public static int period_of_auto_resume_min = 5;
}

View File

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common;
public enum InternalErrorCode {
OK(0),
// for common error
IMPOSSIBLE_ERROR_ERR(1),
INTERNAL_ERR(2),
REPLICA_FEW_ERR(3),
PARTITIONS_ERR(4),
DB_ERR(5),
TABLE_ERR(6),
META_NOT_FOUND_ERR(7),
// for load job error
MANUAL_PAUSE_ERR(100),
MANUAL_STOP_ERR(101),
TOO_MANY_FAILURE_ROWS_ERR(102),
CREATE_TASKS_ERR(103),
TASKS_ABORT_ERR(104);
private long errCode;
private InternalErrorCode(long code) {
this.errCode = code;
}
@Override
public String toString() {
return "errCode = " + errCode;
}
}

View File

@ -25,6 +25,10 @@ public class MetaNotFoundException extends UserException {
super(msg);
}
public MetaNotFoundException(InternalErrorCode errcode, String msg) {
super(errcode, msg);
}
public MetaNotFoundException(String msg, Throwable e) {
super(msg, e);
}

View File

@ -23,20 +23,38 @@ import com.google.common.base.Strings;
* Thrown for internal server errors.
*/
public class UserException extends Exception {
private InternalErrorCode errorCode;
public UserException(String msg, Throwable cause) {
super(Strings.nullToEmpty(msg), cause);
errorCode = InternalErrorCode.INTERNAL_ERR;
}
public UserException(Throwable cause) {
super(cause);
errorCode = InternalErrorCode.INTERNAL_ERR;
}
public UserException(String msg, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(Strings.nullToEmpty(msg), cause, enableSuppression, writableStackTrace);
errorCode = InternalErrorCode.INTERNAL_ERR;
}
public UserException(String msg) {
super(Strings.nullToEmpty(msg));
errorCode = InternalErrorCode.INTERNAL_ERR;
}
public UserException(InternalErrorCode errCode, String msg) {
super(Strings.nullToEmpty(msg));
this.errorCode = errCode;
}
public InternalErrorCode getErrorCode() {
return errorCode;
}
@Override
public String getMessage() {
return errorCode + ", detailMessage = " + super.getMessage();
}
}

View File

@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import org.apache.doris.common.InternalErrorCode;
public class ErrorReason {
private InternalErrorCode code;
private String msg;
public ErrorReason(InternalErrorCode errCode, String msg) {
this.code = errCode;
this.msg = msg;
}
public InternalErrorCode getCode() {
return code;
}
public void setCode(InternalErrorCode code) {
this.code = code;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
@Override
public String toString() {
return "ErrorReason{" + "code=" + code + ", msg='" + msg + '\'' + '}';
}
}

View File

@ -20,11 +20,14 @@ package org.apache.doris.load.routineload;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
@ -83,9 +86,11 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
super(-1, LoadDataSourceType.KAFKA);
}
public KafkaRoutineLoadJob(Long id, String name, String clusterName, long dbId, long tableId, String brokerList,
public KafkaRoutineLoadJob(Long id, String name, String clusterName,
long dbId, long tableId, long replicationNum,
String brokerList,
String topic) {
super(id, name, clusterName, dbId, tableId, LoadDataSourceType.KAFKA);
super(id, name, clusterName, dbId, tableId, replicationNum, LoadDataSourceType.KAFKA);
this.brokerList = brokerList;
this.topic = topic;
this.progress = new KafkaProgress();
@ -136,6 +141,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
}
@Override
public void divideRoutineLoadJob(int currentConcurrentTaskNum) throws UserException {
List<RoutineLoadTaskInfo> result = new ArrayList<>();
@ -270,7 +276,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
.build(), e);
if (this.state == JobState.NEED_SCHEDULE) {
unprotectUpdateState(JobState.PAUSED,
"Job failed to fetch all current partition with error " + e.getMessage(),
new ErrorReason(InternalErrorCode.PARTITIONS_ERR,
"Job failed to fetch all current partition with error " + e.getMessage()),
false /* not replay */);
}
return false;
@ -299,6 +306,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
return true;
}
}
} else if (this.state == JobState.PAUSED) {
return ScheduleRule.isNeedAutoSchedule(this);
} else {
return false;
}
@ -334,10 +343,15 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
long tableId = -1L;
long replicationNum = 0;
db.readLock();
try {
unprotectedCheckMeta(db, stmt.getTableName(), stmt.getRoutineLoadDesc());
tableId = db.getTable(stmt.getTableName()).getId();
Table table = db.getTable(stmt.getTableName());
tableId = table.getId();
if (table instanceof OlapTable) {
replicationNum = ((OlapTable)table).getReplicationNum();
}
} finally {
db.readUnlock();
}
@ -345,7 +359,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
// init kafka routine load job
long id = Catalog.getInstance().getNextId();
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(id, stmt.getName(),
db.getClusterName(), db.getId(), tableId, stmt.getKafkaBrokerList(), stmt.getKafkaTopic());
db.getClusterName(), db.getId(), tableId, replicationNum,
stmt.getKafkaBrokerList(), stmt.getKafkaTopic());
kafkaRoutineLoadJob.setOptional(stmt);
kafkaRoutineLoadJob.checkCustomProperties();
kafkaRoutineLoadJob.checkCustomPartition();

View File

@ -35,6 +35,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
@ -139,6 +140,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
protected String clusterName;
protected long dbId;
protected long tableId;
protected long replicationNum;
// this code is used to verify be task request
protected long authCode;
// protected RoutineLoadDesc routineLoadDesc; // optional
@ -173,10 +175,13 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
protected int currentTaskConcurrentNum;
protected RoutineLoadProgress progress;
protected long firstResumeTimestamp; // the first resume time
protected long autoResumeCount;
protected boolean autoResumeLock = false; //it can't auto resume iff true
// some other msg which need to show to user;
protected String otherMsg = "";
protected String pauseReason = "";
protected String cancelReason = "";
protected ErrorReason pauseReason;
protected ErrorReason cancelReason;
protected long createTimestamp = System.currentTimeMillis();
protected long pauseTimestamp = -1;
@ -228,13 +233,15 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
this.dataSourceType = type;
}
public RoutineLoadJob(Long id, String name, String clusterName, long dbId, long tableId,
public RoutineLoadJob(Long id, String name, String clusterName,
long dbId, long tableId, long replicationNum,
LoadDataSourceType dataSourceType) {
this(id, dataSourceType);
this.name = name;
this.clusterName = clusterName;
this.dbId = dbId;
this.tableId = tableId;
this.replicationNum = replicationNum;
this.authCode = 0;
if (ConnectContext.get() != null) {
@ -538,7 +545,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
// if this is a replay thread, the update state should already be replayed by OP_CHANGE_ROUTINE_LOAD_JOB
if (!isReplay) {
// remove all of task in jobs and change job state to paused
updateState(JobState.PAUSED, "current error rows of job is more then max error num", isReplay);
updateState(JobState.PAUSED,
new ErrorReason(InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR, "current error rows of job is more then max error num"),
isReplay);
}
}
@ -563,7 +572,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
.build());
if (!isReplay) {
// remove all of task in jobs and change job state to paused
updateState(JobState.PAUSED, "current error rows is more then max error num", isReplay);
updateState(JobState.PAUSED,
new ErrorReason(InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR, "current error rows is more then max error num"),
isReplay);
}
// reset currentTotalNum and currentErrorNum
currentErrorRows = 0;
@ -701,9 +712,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
}
} catch (Throwable e) {
LOG.warn("after committed failed", e);
updateState(JobState.PAUSED, "be " + taskBeId + " commit task failed " + txnState.getLabel()
String errmsg = "be " + taskBeId + " commit task failed " + txnState.getLabel()
+ " with error " + e.getMessage()
+ " while transaction " + txnState.getTransactionId() + " has been committed", false /* not replay */);
+ " while transaction " + txnState.getTransactionId() + " has been committed";
updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.INTERNAL_ERR, errmsg), false /* not replay */);
} finally {
writeUnlock();
LOG.debug("unlock write lock of routine load job after committed: {}", id);
@ -752,7 +764,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
DebugUtil.printId(routineLoadTaskInfo.getId()), id, txnState.getTransactionId(), routineLoadTaskInfo.getTxnStatus().name());
LOG.warn(msg);
try {
updateState(JobState.PAUSED, msg, false /* not replay */);
updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.IMPOSSIBLE_ERROR_ERR, msg), false /* not replay */);
} catch (UserException e) {
// should not happen
LOG.warn("failed to pause the job {}. this should not happen", id, e);
@ -803,8 +815,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
switch (txnStatusChangeReason) {
case OFFSET_OUT_OF_RANGE:
case PAUSE:
updateState(JobState.PAUSED, "be " + taskBeId + " abort task "
+ "with reason: " + txnStatusChangeReasonString, false /* not replay */);
String msg = "be " + taskBeId + " abort task "
+ "with reason: " + txnStatusChangeReasonString;
updateState(JobState.PAUSED,
new ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
false /* not replay */);
return;
default:
break;
@ -816,7 +831,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
executeTaskOnTxnStatusChanged(routineLoadTaskInfo, txnState, TransactionStatus.ABORTED);
}
} catch (Exception e) {
updateState(JobState.PAUSED, "be " + taskBeId + " abort task " + txnState.getLabel() + " failed with error " + e.getMessage(),
String msg = "be " + taskBeId + " abort task " + txnState.getLabel() + " failed with error " + e.getMessage();
updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
false /* not replay */);
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("task_id", txnState.getLabel())
@ -904,7 +920,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
// columns will be checked when planing
}
public void updateState(JobState jobState, String reason, boolean isReplay) throws UserException {
public void updateState(JobState jobState, ErrorReason reason, boolean isReplay) throws UserException {
writeLock();
try {
unprotectUpdateState(jobState, reason, isReplay);
@ -913,7 +929,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
}
}
protected void unprotectUpdateState(JobState jobState, String reason, boolean isReplay) throws UserException {
protected void unprotectUpdateState(JobState jobState, ErrorReason reason, boolean isReplay) throws UserException {
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
.add("current_job_state", getState())
@ -960,9 +976,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
state = JobState.RUNNING;
}
private void executePause(String reason) {
private void executePause(ErrorReason reason) {
// remove all of task in jobs and change job state to paused
pauseReason = Strings.nullToEmpty(reason);
pauseReason = reason;
state = JobState.PAUSED;
pauseTimestamp = System.currentTimeMillis();
routineLoadTaskInfoList.clear();
@ -980,8 +996,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
endTimestamp = System.currentTimeMillis();
}
private void executeCancel(String reason) {
cancelReason = Strings.nullToEmpty(reason);
private void executeCancel(ErrorReason reason) {
cancelReason = reason;
state = JobState.CANCELLED;
routineLoadTaskInfoList.clear();
endTimestamp = System.currentTimeMillis();
@ -997,7 +1013,9 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
writeLock();
try {
if (!state.isFinalState()) {
unprotectUpdateState(JobState.CANCELLED, "db " + dbId + "not exist", false /* not replay */);
unprotectUpdateState(JobState.CANCELLED,
new ErrorReason(InternalErrorCode.DB_ERR, "db " + dbId + "not exist"),
false /* not replay */);
}
return;
} finally {
@ -1020,7 +1038,8 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
writeLock();
try {
if (!state.isFinalState()) {
unprotectUpdateState(JobState.CANCELLED, "table not exist", false /* not replay */);
unprotectUpdateState(JobState.CANCELLED,
new ErrorReason(InternalErrorCode.TABLE_ERR, "table not exist"), false /* not replay */);
}
return;
} finally {
@ -1092,10 +1111,10 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
row.add(getProgress().toJsonString());
switch (state) {
case PAUSED:
row.add(pauseReason);
row.add(pauseReason == null ? "" : pauseReason.toString());
break;
case CANCELLED:
row.add(cancelReason);
row.add(cancelReason == null? "" : cancelReason.toString());
break;
default:
row.add("");

View File

@ -27,6 +27,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
@ -226,7 +227,8 @@ public class RoutineLoadManager implements Writable {
}
routineLoadJob.updateState(RoutineLoadJob.JobState.PAUSED,
"User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job",
new ErrorReason(InternalErrorCode.MANUAL_PAUSE_ERR,
"User " + ConnectContext.get().getQualifiedUser() + " pauses routine load job"),
false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("current_state", routineLoadJob.getState())
@ -258,7 +260,9 @@ public class RoutineLoadManager implements Writable {
ConnectContext.get().getRemoteIP(),
tableName);
}
routineLoadJob.autoResumeCount = 0;
routineLoadJob.firstResumeTimestamp = 0;
routineLoadJob.autoResumeLock = false;
routineLoadJob.updateState(RoutineLoadJob.JobState.NEED_SCHEDULE, null, false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("current_state", routineLoadJob.getState())
@ -292,7 +296,7 @@ public class RoutineLoadManager implements Writable {
tableName);
}
routineLoadJob.updateState(RoutineLoadJob.JobState.STOPPED,
"User " + ConnectContext.get().getQualifiedUser() + " stop routine load job",
new ErrorReason(InternalErrorCode.MANUAL_STOP_ERR, "User " + ConnectContext.get().getQualifiedUser() + " stop routine load job"),
false /* not replay */);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("current_state", routineLoadJob.getState())

View File

@ -92,9 +92,11 @@ public class RoutineLoadScheduler extends MasterDaemon {
} catch (MetaNotFoundException e) {
errorJobState = RoutineLoadJob.JobState.CANCELLED;
userException = e;
LOG.warn(userException.getMessage());
} catch (UserException e) {
errorJobState = RoutineLoadJob.JobState.PAUSED;
userException = e;
LOG.warn(userException.getMessage());
}
if (errorJobState != null) {
@ -104,7 +106,8 @@ public class RoutineLoadScheduler extends MasterDaemon {
.add("warn_msg", "failed to scheduler job, change job state to desired_state with error reason " + userException.getMessage())
.build(), userException);
try {
routineLoadJob.updateState(errorJobState, userException.getMessage(), false);
ErrorReason reason = new ErrorReason(userException.getErrorCode(), userException.getMessage());
routineLoadJob.updateState(errorJobState, reason, false);
} catch (UserException e) {
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, routineLoadJob.getId())
.add("current_state", routineLoadJob.getState())

View File

@ -20,6 +20,7 @@ package org.apache.doris.load.routineload;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
@ -130,10 +131,15 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
// allocate failed, push it back to the queue to wait next scheduling
needScheduleTasksQueue.put(routineLoadTaskInfo);
}
} catch (UserException e) {
routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).
updateState(JobState.PAUSED,
new ErrorReason(e.getErrorCode(), e.getMessage()), false);
throw e;
} catch (Exception e) {
// exception happens, PAUSE the job
routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).updateState(JobState.PAUSED,
"failed to allocate task: " + e.getMessage(), false);
new ErrorReason(InternalErrorCode.CREATE_TASKS_ERR, "failed to allocate task: " + e.getMessage()), false);
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId()).add("error_msg",
"allocate task encounter exception: " + e.getMessage()).build());
throw e;
@ -152,7 +158,8 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
// set BE id to -1 to release the BE slot
routineLoadTaskInfo.setBeId(-1);
routineLoadManager.getJob(routineLoadTaskInfo.getJobId()).updateState(JobState.PAUSED,
"failed to allocate task: " + e.getMessage(), false);
new ErrorReason(InternalErrorCode.CREATE_TASKS_ERR,
"failed to allocate task for txn: " + e.getMessage()), false);
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_TASK, routineLoadTaskInfo.getId()).add("error_msg",
"begin task txn encounter exception: " + e.getMessage()).build());
throw e;
@ -167,13 +174,17 @@ public class RoutineLoadTaskScheduler extends MasterDaemon {
// set BE id to -1 to release the BE slot
routineLoadTaskInfo.setBeId(-1);
routineLoadManager.getJob(routineLoadTaskInfo.getJobId())
.updateState(JobState.STOPPED, "meta not found: " + e.getMessage(), false);
.updateState(JobState.STOPPED,
new ErrorReason(InternalErrorCode.META_NOT_FOUND_ERR, "meta not found: " + e.getMessage()),
false);
throw e;
} catch (UserException e) {
// set BE id to -1 to release the BE slot
routineLoadTaskInfo.setBeId(-1);
routineLoadManager.getJob(routineLoadTaskInfo.getJobId())
.updateState(JobState.PAUSED, "failed to create task: " + e.getMessage(), false);
.updateState(JobState.PAUSED,
new ErrorReason(e.getErrorCode(),
"failed to create task: " + e.getMessage()), false);
throw e;
}

View File

@ -0,0 +1,86 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.routineload;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.Config;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.system.SystemInfoService;
/**
* ScheduleRule: RoutineLoad PAUSED -> NEED_SCHEDULE
*/
public class ScheduleRule {
private static int deadBeCount(String clusterName) {
SystemInfoService systemInfoService = Catalog.getCurrentSystemInfo();
int total = systemInfoService.getClusterBackendIds(clusterName, false).size();
int alive = systemInfoService.getClusterBackendIds(clusterName, true).size();
return total - alive;
}
/**
* check if RoutineLoadJob is auto schedule
* @param jobRoutine
* @return
*/
public static boolean isNeedAutoSchedule(RoutineLoadJob jobRoutine) {
if (jobRoutine.state != RoutineLoadJob.JobState.PAUSED) {
return false;
}
if (jobRoutine.autoResumeLock) {//only manual resume for unlock
return false;
}
/*
* Handle all backends are down.
*/
if (jobRoutine.pauseReason != null && jobRoutine.pauseReason.getCode() == InternalErrorCode.REPLICA_FEW_ERR) {
int dead = deadBeCount(jobRoutine.clusterName);
if (dead > Config.max_tolerable_backend_down_num) {
return false;
}
if (jobRoutine.firstResumeTimestamp == 0) {//the first resume
jobRoutine.firstResumeTimestamp = System.currentTimeMillis();
jobRoutine.autoResumeCount = 1;
return true;
} else {
long current = System.currentTimeMillis();
if (current - jobRoutine.firstResumeTimestamp < Config.period_of_auto_resume_min * 60000) {
if (jobRoutine.autoResumeCount >= 3) {
jobRoutine.autoResumeLock = true;// locked Auto Resume RoutineLoadJob
return false;
}
jobRoutine.autoResumeCount++;
return true;
} else {
/**
* for example:
* the first resume time at 10:01
* the second resume time at 10:03
* the third resume time at 10:20
* --> we must be reset counter because a new period for AutoResume RoutineLoadJob
*/
jobRoutine.firstResumeTimestamp = current;
jobRoutine.autoResumeCount = 1;
return true;
}
}
}
return false;
}
}

View File

@ -35,6 +35,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.system.Backend;
@ -298,7 +299,8 @@ public class OlapTableSink extends DataSink {
for (Tablet tablet : index.getTablets()) {
Multimap<Long, Long> bePathsMap = tablet.getNormalReplicaBackendPathMap();
if (bePathsMap.keySet().size() < quorum) {
throw new UserException("tablet " + tablet.getId() + " has few replicas: " + bePathsMap.keySet().size());
throw new UserException(InternalErrorCode.REPLICA_FEW_ERR,
"tablet " + tablet.getId() + " has few replicas: " + bePathsMap.keySet().size());
}
locationParam.addToTablets(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet())));
allBePathsMap.putAll(bePathsMap);

View File

@ -126,25 +126,25 @@ public class KafkaRoutineLoadJobTest {
// 2 partitions, 1 be
RoutineLoadJob routineLoadJob =
new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName1, 1L,
1L, "127.0.0.1:9020", "topic1");
1L, 3L, "127.0.0.1:9020", "topic1");
Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList1);
Assert.assertEquals(1, routineLoadJob.calculateCurrentConcurrentTaskNum());
// 3 partitions, 4 be
routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L,
1L, "127.0.0.1:9020", "topic1");
1L, 3L, "127.0.0.1:9020", "topic1");
Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList2);
Assert.assertEquals(3, routineLoadJob.calculateCurrentConcurrentTaskNum());
// 4 partitions, 4 be
routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L,
1L, "127.0.0.1:9020", "topic1");
1L, 3L, "127.0.0.1:9020", "topic1");
Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList3);
Assert.assertEquals(4, routineLoadJob.calculateCurrentConcurrentTaskNum());
// 7 partitions, 4 be
routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L,
1L, "127.0.0.1:9020", "topic1");
1L, 3L, "127.0.0.1:9020", "topic1");
Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList4);
Assert.assertEquals(4, routineLoadJob.calculateCurrentConcurrentTaskNum());
}
@ -159,7 +159,7 @@ public class KafkaRoutineLoadJobTest {
RoutineLoadJob routineLoadJob =
new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "default", 1L,
1L, "127.0.0.1:9020", "topic1");
1L, 3L, "127.0.0.1:9020", "topic1");
new Expectations(catalog) {
{
@ -204,7 +204,7 @@ public class KafkaRoutineLoadJobTest {
RoutineLoadJob routineLoadJob =
new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "default", 1L,
1L, "127.0.0.1:9020", "topic1");
1L, 3L, "127.0.0.1:9020", "topic1");
long maxBatchIntervalS = 10;
Deencapsulation.setField(routineLoadJob, "maxBatchIntervalS", maxBatchIntervalS);
new Expectations() {

View File

@ -23,6 +23,7 @@ import org.apache.doris.analysis.SqlParser;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.common.util.KafkaUtil;
@ -173,13 +174,13 @@ public class RoutineLoadJobTest {
public void testGetShowInfo(@Mocked KafkaProgress kafkaProgress) {
RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob();
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED);
Deencapsulation.setField(routineLoadJob, "pauseReason",
TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString());
ErrorReason errorReason = new ErrorReason(InternalErrorCode.INTERNAL_ERR, TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString());
Deencapsulation.setField(routineLoadJob, "pauseReason", errorReason);
Deencapsulation.setField(routineLoadJob, "progress", kafkaProgress);
List<String> showInfo = routineLoadJob.getShowInfo();
Assert.assertEquals(true, showInfo.stream().filter(entity -> !Strings.isNullOrEmpty(entity))
.anyMatch(entity -> entity.equals(TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString())));
.anyMatch(entity -> entity.equals(errorReason.toString())));
}
@Test

View File

@ -30,6 +30,7 @@ import org.apache.doris.catalog.Database;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
@ -94,7 +95,7 @@ public class RoutineLoadManagerTest {
typeName, customProperties);
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
serverAddress, topicName);
3L, serverAddress, topicName);
new MockUp<KafkaRoutineLoadJob>() {
@Mock
@ -191,8 +192,7 @@ public class RoutineLoadManagerTest {
String topicName = "topic1";
String serverAddress = "http://127.0.0.1:8080";
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
serverAddress,
topicName);
3L, serverAddress,topicName);
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
@ -200,7 +200,7 @@ public class RoutineLoadManagerTest {
Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newConcurrentMap();
List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList();
KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, "default_cluster",
1L, 1L, serverAddress, topicName);
1L, 1L, 3L, serverAddress, topicName);
routineLoadJobList.add(kafkaRoutineLoadJobWithSameName);
nameToRoutineLoadJob.put(jobName, routineLoadJobList);
dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob);
@ -222,7 +222,7 @@ public class RoutineLoadManagerTest {
String topicName = "topic1";
String serverAddress = "http://127.0.0.1:8080";
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L,
serverAddress, topicName);
3L, serverAddress, topicName);
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
@ -238,7 +238,7 @@ public class RoutineLoadManagerTest {
Map<String, List<RoutineLoadJob>> nameToRoutineLoadJob = Maps.newConcurrentMap();
List<RoutineLoadJob> routineLoadJobList = Lists.newArrayList();
KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, "default_cluster",
1L, 1L, serverAddress, topicName);
1L, 1L, 3L, serverAddress, topicName);
Deencapsulation.setField(kafkaRoutineLoadJobWithSameName, "state", RoutineLoadJob.JobState.STOPPED);
routineLoadJobList.add(kafkaRoutineLoadJobWithSameName);
nameToRoutineLoadJob.put(jobName, routineLoadJobList);
@ -579,6 +579,10 @@ public class RoutineLoadManagerTest {
dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob);
Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob);
Map<Long, RoutineLoadJob> idToRoutineLoadJob = Maps.newConcurrentMap();
idToRoutineLoadJob.put(routineLoadJob.getId(), routineLoadJob);
Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob);
new Expectations() {
{
pauseRoutineLoadStmt.getDbFullName();
@ -605,6 +609,22 @@ public class RoutineLoadManagerTest {
routineLoadManager.pauseRoutineLoadJob(pauseRoutineLoadStmt);
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState());
// 第一次自动恢复
for (int i = 0; i < 3; i++) {
Deencapsulation.setField(routineLoadJob, "pauseReason",
new ErrorReason(InternalErrorCode.REPLICA_FEW_ERR, ""));
routineLoadManager.updateRoutineLoadJob();
Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob.getState());
Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED);
boolean autoResumeLock = Deencapsulation.getField(routineLoadJob, "autoResumeLock");
Assert.assertEquals(autoResumeLock, false);
}
// 第四次自动恢复 就会锁定
routineLoadManager.updateRoutineLoadJob();
Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState());
boolean autoResumeLock = Deencapsulation.getField(routineLoadJob, "autoResumeLock");
Assert.assertEquals(autoResumeLock, true);
}
@Test

View File

@ -75,7 +75,7 @@ public class RoutineLoadSchedulerTest {
Deencapsulation.setField(catalog, "routineLoadTaskScheduler", routineLoadTaskScheduler);
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "test", clusterName, 1L, 1L,
"xxx", "test");
3L, "xxx", "test");
Deencapsulation.setField(kafkaRoutineLoadJob,"state", RoutineLoadJob.JobState.NEED_SCHEDULE);
List<RoutineLoadJob> routineLoadJobList = new ArrayList<>();
routineLoadJobList.add(kafkaRoutineLoadJob);
@ -138,7 +138,7 @@ public class RoutineLoadSchedulerTest {
};
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L,
"10.74.167.16:8092", "test");
3L, "10.74.167.16:8092", "test");
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db");
@ -169,7 +169,7 @@ public class RoutineLoadSchedulerTest {
executorService.submit(routineLoadTaskScheduler);
KafkaRoutineLoadJob kafkaRoutineLoadJob1 = new KafkaRoutineLoadJob(1L, "test_custom_partition",
"default_cluster", 1L, 1L, "xxx", "test_1");
"default_cluster", 1L, 1L, 3L, "xxx", "test_1");
List<Integer> customKafkaPartitions = new ArrayList<>();
customKafkaPartitions.add(2);
Deencapsulation.setField(kafkaRoutineLoadJob1, "customKafkaPartitions", customKafkaPartitions);

View File

@ -17,11 +17,6 @@
package org.apache.doris.rewrite;
import static org.junit.Assert.fail;
import mockit.Expectations;
import mockit.MockUp;
import mockit.Mocked;
import org.apache.doris.analysis.DateLiteral;
import org.apache.doris.analysis.DecimalLiteral;
import org.apache.doris.analysis.FloatLiteral;
@ -30,8 +25,8 @@ import org.apache.doris.analysis.LargeIntLiteral;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.TimeUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@ -42,6 +37,10 @@ import java.time.ZoneId;
import java.util.Locale;
import java.util.TimeZone;
import mockit.Expectations;
import mockit.Mocked;
import static org.junit.Assert.fail;
public class FEFunctionsTest {
@Mocked
@ -212,37 +211,38 @@ public class FEFunctionsTest {
FEFunctions.dateParse(new StringLiteral("2013-05-17"), new StringLiteral("%D"));
fail("Junit test dateParse fail");
} catch (AnalysisException e) {
Assert.assertEquals(e.getMessage(), "%D not supported in date format string");
Assert.assertEquals(e.getMessage(),
"errCode = 2, detailMessage = %D not supported in date format string");
}
try {
FEFunctions.dateParse(new StringLiteral("2013-05-17"), new StringLiteral("%U"));
fail("Junit test dateParse fail");
} catch (AnalysisException e) {
Assert.assertEquals(e.getMessage(), "%U not supported in date format string");
Assert.assertEquals(e.getMessage(), "errCode = 2, detailMessage = %U not supported in date format string");
}
try {
FEFunctions.dateParse(new StringLiteral("2013-05-17"), new StringLiteral("%u"));
fail("Junit test dateParse fail");
} catch (AnalysisException e) {
Assert.assertEquals(e.getMessage(), "%u not supported in date format string");
Assert.assertEquals(e.getMessage(), "errCode = 2, detailMessage = %u not supported in date format string");
}
try {
FEFunctions.dateParse(new StringLiteral("2013-05-17"), new StringLiteral("%V"));
fail("Junit test dateParse fail");
} catch (AnalysisException e) {
Assert.assertEquals(e.getMessage(), "%V not supported in date format string");
Assert.assertEquals(e.getMessage(), "errCode = 2, detailMessage = %V not supported in date format string");
}
try {
FEFunctions.dateParse(new StringLiteral("2013-05-17"), new StringLiteral("%w"));
fail("Junit test dateParse fail");
} catch (AnalysisException e) {
Assert.assertEquals(e.getMessage(), "%w not supported in date format string");
Assert.assertEquals(e.getMessage(), "errCode = 2, detailMessage = %w not supported in date format string");
}
try {
FEFunctions.dateParse(new StringLiteral("2013-05-17"), new StringLiteral("%X"));
fail("Junit test dateParse fail");
} catch (AnalysisException e) {
Assert.assertEquals(e.getMessage(), "%X not supported in date format string");
Assert.assertEquals(e.getMessage(), "errCode = 2, detailMessage = %X not supported in date format string");
}
}

View File

@ -311,7 +311,7 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port", "topic");
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, 3L, "host:port", "topic");
List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 0L);
@ -378,7 +378,7 @@ public class GlobalTransactionMgrTest {
transTablets.add(tabletCommitInfo2);
transTablets.add(tabletCommitInfo3);
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port", "topic");
KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, 3L, "host:port", "topic");
List<RoutineLoadTaskInfo> routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList");
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 0L);