[fix](routine-load) pause job when json path is invalid #30197

If jsonpaths is set wrong, routine load job will report error but running all time.For example:

CREATE ROUTINE LOAD jobName ON tableName
PROPERTIES
(
    "format" = "json",
    "max_batch_interval" = "5",
    "max_batch_rows" = "300000",
    "max_batch_size" = "209715200",
    "jsonpaths" = "[\'t\',\'a\']"
)
FROM KAFKA
(
    "kafka_broker_list" = "$IP:PORT",
    "kafka_topic" = "XXX",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);
Jsonpaths ['t','a'] is invalid, but job will running all time.
This commit is contained in:
HHoflittlefish777
2024-01-22 20:34:14 +08:00
committed by yiguolei
parent 9c742d46a2
commit 32c5153999
7 changed files with 148 additions and 4 deletions

View File

@ -1145,6 +1145,16 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
String msg;
if (txnStatusChangeReason != null) {
switch (txnStatusChangeReason) {
case INVALID_JSON_PATH:
msg = "be " + taskBeId + " abort task,"
+ " task id: " + routineLoadTaskInfo.getId()
+ " job id: " + routineLoadTaskInfo.getJobId()
+ " with reason: " + txnStatusChangeReasonString
+ " please check the jsonpaths";
updateState(JobState.PAUSED,
new ErrorReason(InternalErrorCode.TASKS_ABORT_ERR, msg),
false /* not replay */);
return;
case OFFSET_OUT_OF_RANGE:
msg = "be " + taskBeId + " abort task,"
+ " task id: " + routineLoadTaskInfo.getId()

View File

@ -108,7 +108,8 @@ public class TransactionState implements Writable {
TIMEOUT,
OFFSET_OUT_OF_RANGE,
PAUSE,
NO_PARTITIONS;
NO_PARTITIONS,
INVALID_JSON_PATH;
public static TxnStatusChangeReason fromString(String reasonString) {
for (TxnStatusChangeReason txnStatusChangeReason : TxnStatusChangeReason.values()) {