diff --git a/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md index 478a8ab60f..941191e1d0 100644 --- a/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/routine-load-manual.md @@ -1,4 +1,4 @@ - we must be reset counter because a new period for AutoResume RoutineLoadJob + */ + jobRoutine.firstResumeTimestamp = current; + jobRoutine.autoResumeCount = 1; + return true; + } + } + } + return false; + } +} diff --git a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java index 4245e8d328..9017fa036c 100644 --- a/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -35,6 +35,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.system.Backend; @@ -298,7 +299,8 @@ public class OlapTableSink extends DataSink { for (Tablet tablet : index.getTablets()) { Multimap bePathsMap = tablet.getNormalReplicaBackendPathMap(); if (bePathsMap.keySet().size() < quorum) { - throw new UserException("tablet " + tablet.getId() + " has few replicas: " + bePathsMap.keySet().size()); + throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, + "tablet " + tablet.getId() + " has few replicas: " + bePathsMap.keySet().size()); } locationParam.addToTablets(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet()))); allBePathsMap.putAll(bePathsMap); diff --git a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java index 10ed65d605..0ed034f806 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java @@ -126,25 +126,25 @@ public class KafkaRoutineLoadJobTest { // 2 partitions, 1 be RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName1, 1L, - 1L, "127.0.0.1:9020", "topic1"); + 1L, 3L, "127.0.0.1:9020", "topic1"); Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList1); Assert.assertEquals(1, routineLoadJob.calculateCurrentConcurrentTaskNum()); // 3 partitions, 4 be routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L, - 1L, "127.0.0.1:9020", "topic1"); + 1L, 3L, "127.0.0.1:9020", "topic1"); Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList2); Assert.assertEquals(3, routineLoadJob.calculateCurrentConcurrentTaskNum()); // 4 partitions, 4 be routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L, - 1L, "127.0.0.1:9020", "topic1"); + 1L, 3L, "127.0.0.1:9020", "topic1"); Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList3); Assert.assertEquals(4, routineLoadJob.calculateCurrentConcurrentTaskNum()); // 7 partitions, 4 be routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", clusterName2, 1L, - 1L, "127.0.0.1:9020", "topic1"); + 1L, 3L, "127.0.0.1:9020", "topic1"); Deencapsulation.setField(routineLoadJob, "currentKafkaPartitions", partitionList4); Assert.assertEquals(4, routineLoadJob.calculateCurrentConcurrentTaskNum()); } @@ -159,7 +159,7 @@ public class KafkaRoutineLoadJobTest { RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "default", 1L, - 1L, "127.0.0.1:9020", "topic1"); + 1L, 3L, "127.0.0.1:9020", "topic1"); new Expectations(catalog) { { @@ -204,7 +204,7 @@ public class KafkaRoutineLoadJobTest { RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "kafka_routine_load_job", "default", 1L, - 1L, "127.0.0.1:9020", "topic1"); + 1L, 3L, "127.0.0.1:9020", "topic1"); long maxBatchIntervalS = 10; Deencapsulation.setField(routineLoadJob, "maxBatchIntervalS", maxBatchIntervalS); new Expectations() { diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java index 120aa1e79d..3ea320beda 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadJobTest.java @@ -23,6 +23,7 @@ import org.apache.doris.analysis.SqlParser; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Table; +import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.UserException; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.common.util.KafkaUtil; @@ -173,13 +174,13 @@ public class RoutineLoadJobTest { public void testGetShowInfo(@Mocked KafkaProgress kafkaProgress) { RoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(); Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED); - Deencapsulation.setField(routineLoadJob, "pauseReason", - TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString()); + ErrorReason errorReason = new ErrorReason(InternalErrorCode.INTERNAL_ERR, TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString()); + Deencapsulation.setField(routineLoadJob, "pauseReason", errorReason); Deencapsulation.setField(routineLoadJob, "progress", kafkaProgress); List showInfo = routineLoadJob.getShowInfo(); Assert.assertEquals(true, showInfo.stream().filter(entity -> !Strings.isNullOrEmpty(entity)) - .anyMatch(entity -> entity.equals(TransactionState.TxnStatusChangeReason.OFFSET_OUT_OF_RANGE.toString()))); + .anyMatch(entity -> entity.equals(errorReason.toString()))); } @Test diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 5e13dc114f..3b150f69ac 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -30,6 +30,7 @@ import org.apache.doris.catalog.Database; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.InternalErrorCode; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; @@ -94,7 +95,7 @@ public class RoutineLoadManagerTest { typeName, customProperties); KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L, - serverAddress, topicName); + 3L, serverAddress, topicName); new MockUp() { @Mock @@ -191,8 +192,7 @@ public class RoutineLoadManagerTest { String topicName = "topic1"; String serverAddress = "http://127.0.0.1:8080"; KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L, - serverAddress, - topicName); + 3L, serverAddress,topicName); RoutineLoadManager routineLoadManager = new RoutineLoadManager(); @@ -200,7 +200,7 @@ public class RoutineLoadManagerTest { Map> nameToRoutineLoadJob = Maps.newConcurrentMap(); List routineLoadJobList = Lists.newArrayList(); KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", - 1L, 1L, serverAddress, topicName); + 1L, 1L, 3L, serverAddress, topicName); routineLoadJobList.add(kafkaRoutineLoadJobWithSameName); nameToRoutineLoadJob.put(jobName, routineLoadJobList); dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); @@ -222,7 +222,7 @@ public class RoutineLoadManagerTest { String topicName = "topic1"; String serverAddress = "http://127.0.0.1:8080"; KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", 1L, 1L, - serverAddress, topicName); + 3L, serverAddress, topicName); RoutineLoadManager routineLoadManager = new RoutineLoadManager(); @@ -238,7 +238,7 @@ public class RoutineLoadManagerTest { Map> nameToRoutineLoadJob = Maps.newConcurrentMap(); List routineLoadJobList = Lists.newArrayList(); KafkaRoutineLoadJob kafkaRoutineLoadJobWithSameName = new KafkaRoutineLoadJob(1L, jobName, "default_cluster", - 1L, 1L, serverAddress, topicName); + 1L, 1L, 3L, serverAddress, topicName); Deencapsulation.setField(kafkaRoutineLoadJobWithSameName, "state", RoutineLoadJob.JobState.STOPPED); routineLoadJobList.add(kafkaRoutineLoadJobWithSameName); nameToRoutineLoadJob.put(jobName, routineLoadJobList); @@ -579,6 +579,10 @@ public class RoutineLoadManagerTest { dbToNameToRoutineLoadJob.put(1L, nameToRoutineLoadJob); Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); + Map idToRoutineLoadJob = Maps.newConcurrentMap(); + idToRoutineLoadJob.put(routineLoadJob.getId(), routineLoadJob); + Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); + new Expectations() { { pauseRoutineLoadStmt.getDbFullName(); @@ -605,6 +609,22 @@ public class RoutineLoadManagerTest { routineLoadManager.pauseRoutineLoadJob(pauseRoutineLoadStmt); Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); + + // 第一次自动恢复 + for (int i = 0; i < 3; i++) { + Deencapsulation.setField(routineLoadJob, "pauseReason", + new ErrorReason(InternalErrorCode.REPLICA_FEW_ERR, "")); + routineLoadManager.updateRoutineLoadJob(); + Assert.assertEquals(RoutineLoadJob.JobState.NEED_SCHEDULE, routineLoadJob.getState()); + Deencapsulation.setField(routineLoadJob, "state", RoutineLoadJob.JobState.PAUSED); + boolean autoResumeLock = Deencapsulation.getField(routineLoadJob, "autoResumeLock"); + Assert.assertEquals(autoResumeLock, false); + } + // 第四次自动恢复 就会锁定 + routineLoadManager.updateRoutineLoadJob(); + Assert.assertEquals(RoutineLoadJob.JobState.PAUSED, routineLoadJob.getState()); + boolean autoResumeLock = Deencapsulation.getField(routineLoadJob, "autoResumeLock"); + Assert.assertEquals(autoResumeLock, true); } @Test diff --git a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java index 0beb048c43..5a8f7f2702 100644 --- a/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java +++ b/fe/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java @@ -75,7 +75,7 @@ public class RoutineLoadSchedulerTest { Deencapsulation.setField(catalog, "routineLoadTaskScheduler", routineLoadTaskScheduler); KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "test", clusterName, 1L, 1L, - "xxx", "test"); + 3L, "xxx", "test"); Deencapsulation.setField(kafkaRoutineLoadJob,"state", RoutineLoadJob.JobState.NEED_SCHEDULE); List routineLoadJobList = new ArrayList<>(); routineLoadJobList.add(kafkaRoutineLoadJob); @@ -138,7 +138,7 @@ public class RoutineLoadSchedulerTest { }; KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, - "10.74.167.16:8092", "test"); + 3L, "10.74.167.16:8092", "test"); RoutineLoadManager routineLoadManager = new RoutineLoadManager(); routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db"); @@ -169,7 +169,7 @@ public class RoutineLoadSchedulerTest { executorService.submit(routineLoadTaskScheduler); KafkaRoutineLoadJob kafkaRoutineLoadJob1 = new KafkaRoutineLoadJob(1L, "test_custom_partition", - "default_cluster", 1L, 1L, "xxx", "test_1"); + "default_cluster", 1L, 1L, 3L, "xxx", "test_1"); List customKafkaPartitions = new ArrayList<>(); customKafkaPartitions.add(2); Deencapsulation.setField(kafkaRoutineLoadJob1, "customKafkaPartitions", customKafkaPartitions); diff --git a/fe/src/test/java/org/apache/doris/rewrite/FEFunctionsTest.java b/fe/src/test/java/org/apache/doris/rewrite/FEFunctionsTest.java index e826d92ab5..73c3afb8d4 100644 --- a/fe/src/test/java/org/apache/doris/rewrite/FEFunctionsTest.java +++ b/fe/src/test/java/org/apache/doris/rewrite/FEFunctionsTest.java @@ -17,11 +17,6 @@ package org.apache.doris.rewrite; -import static org.junit.Assert.fail; - -import mockit.Expectations; -import mockit.MockUp; -import mockit.Mocked; import org.apache.doris.analysis.DateLiteral; import org.apache.doris.analysis.DecimalLiteral; import org.apache.doris.analysis.FloatLiteral; @@ -30,8 +25,8 @@ import org.apache.doris.analysis.LargeIntLiteral; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; - import org.apache.doris.common.util.TimeUtils; + import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -42,6 +37,10 @@ import java.time.ZoneId; import java.util.Locale; import java.util.TimeZone; +import mockit.Expectations; +import mockit.Mocked; +import static org.junit.Assert.fail; + public class FEFunctionsTest { @Mocked @@ -212,37 +211,38 @@ public class FEFunctionsTest { FEFunctions.dateParse(new StringLiteral("2013-05-17"), new StringLiteral("%D")); fail("Junit test dateParse fail"); } catch (AnalysisException e) { - Assert.assertEquals(e.getMessage(), "%D not supported in date format string"); + Assert.assertEquals(e.getMessage(), + "errCode = 2, detailMessage = %D not supported in date format string"); } try { FEFunctions.dateParse(new StringLiteral("2013-05-17"), new StringLiteral("%U")); fail("Junit test dateParse fail"); } catch (AnalysisException e) { - Assert.assertEquals(e.getMessage(), "%U not supported in date format string"); + Assert.assertEquals(e.getMessage(), "errCode = 2, detailMessage = %U not supported in date format string"); } try { FEFunctions.dateParse(new StringLiteral("2013-05-17"), new StringLiteral("%u")); fail("Junit test dateParse fail"); } catch (AnalysisException e) { - Assert.assertEquals(e.getMessage(), "%u not supported in date format string"); + Assert.assertEquals(e.getMessage(), "errCode = 2, detailMessage = %u not supported in date format string"); } try { FEFunctions.dateParse(new StringLiteral("2013-05-17"), new StringLiteral("%V")); fail("Junit test dateParse fail"); } catch (AnalysisException e) { - Assert.assertEquals(e.getMessage(), "%V not supported in date format string"); + Assert.assertEquals(e.getMessage(), "errCode = 2, detailMessage = %V not supported in date format string"); } try { FEFunctions.dateParse(new StringLiteral("2013-05-17"), new StringLiteral("%w")); fail("Junit test dateParse fail"); } catch (AnalysisException e) { - Assert.assertEquals(e.getMessage(), "%w not supported in date format string"); + Assert.assertEquals(e.getMessage(), "errCode = 2, detailMessage = %w not supported in date format string"); } try { FEFunctions.dateParse(new StringLiteral("2013-05-17"), new StringLiteral("%X")); fail("Junit test dateParse fail"); } catch (AnalysisException e) { - Assert.assertEquals(e.getMessage(), "%X not supported in date format string"); + Assert.assertEquals(e.getMessage(), "errCode = 2, detailMessage = %X not supported in date format string"); } } diff --git a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index 2aa4d587c8..64cd68ee0a 100644 --- a/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -311,7 +311,7 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo2); transTablets.add(tabletCommitInfo3); - KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port", "topic"); + KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, 3L, "host:port", "topic"); List routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); Map partitionIdToOffset = Maps.newHashMap(); partitionIdToOffset.put(1, 0L); @@ -378,7 +378,7 @@ public class GlobalTransactionMgrTest { transTablets.add(tabletCommitInfo2); transTablets.add(tabletCommitInfo3); - KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "host:port", "topic"); + KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, 3L, "host:port", "topic"); List routineLoadTaskInfoList = Deencapsulation.getField(routineLoadJob, "routineLoadTaskInfoList"); Map partitionIdToOffset = Maps.newHashMap(); partitionIdToOffset.put(1, 0L);