diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java index 24fafc6677..aa57f67e6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java @@ -175,10 +175,12 @@ public class RoutineLoadManager implements Writable { routineLoadJob.setOrigStmt(createRoutineLoadStmt.getOrigStmt()); routineLoadJob.setComment(createRoutineLoadStmt.getComment()); - addRoutineLoadJob(routineLoadJob, createRoutineLoadStmt.getDBName()); + addRoutineLoadJob(routineLoadJob, createRoutineLoadStmt.getDBName(), + createRoutineLoadStmt.getTableName()); } - public void addRoutineLoadJob(RoutineLoadJob routineLoadJob, String dbName) throws DdlException { + public void addRoutineLoadJob(RoutineLoadJob routineLoadJob, String dbName, String tableName) + throws DdlException { writeLock(); try { // check if db.routineLoadName has been used @@ -195,10 +197,12 @@ public class RoutineLoadManager implements Writable { unprotectedAddJob(routineLoadJob); Env.getCurrentEnv().getEditLog().logCreateRoutineLoadJob(routineLoadJob); - LOG.info("create routine load job: id: {}, name: {}", routineLoadJob.getId(), routineLoadJob.getName()); } finally { writeUnlock(); } + + LOG.info("create routine load job: id: {}, job name: {}, db name: {}, table name: {}", + routineLoadJob.getId(), routineLoadJob.getName(), dbName, tableName); } private void unprotectedAddJob(RoutineLoadJob routineLoadJob) { diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java index 7bd776df3e..018b2e9523 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadManagerTest.java @@ -215,7 +215,7 @@ public class RoutineLoadManagerTest { Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); try { - routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db"); + routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db", "table"); Assert.fail(); } catch (DdlException e) { LOG.info(e.getMessage()); @@ -256,7 +256,7 @@ public class RoutineLoadManagerTest { Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob); Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob); - routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db"); + routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db", "table"); Map>> result = Deencapsulation.getField(routineLoadManager, "dbToNameToRoutineLoadJob"); @@ -777,7 +777,7 @@ public class RoutineLoadManagerTest { RoutineLoadManager routineLoadManager = new RoutineLoadManager(); KafkaRoutineLoadJob job = new KafkaRoutineLoadJob(1L, "testjob", SystemInfoService.DEFAULT_CLUSTER, 10000, 10001, "192.168.1.1:9090", "testtopic", UserIdentity.ADMIN); - routineLoadManager.addRoutineLoadJob(job, "testdb"); + routineLoadManager.addRoutineLoadJob(job, "testdb", "testtable"); Config.max_routine_load_task_num_per_be = 10; Deencapsulation.setField(routineLoadManager, "beIdToMaxConcurrentTasks", beIdToMaxConcurrentTasks); Assert.assertEquals(1L, routineLoadManager.getAvailableBeForTask(1L, 1L, "default")); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java index 490a42719f..da701ff130 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadSchedulerTest.java @@ -140,7 +140,7 @@ public class RoutineLoadSchedulerTest { KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L, "10.74.167.16:8092", "test", UserIdentity.ADMIN); RoutineLoadManager routineLoadManager = new RoutineLoadManager(); - routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db"); + routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db", "table"); List backendIds = new ArrayList<>(); backendIds.add(1L); @@ -176,7 +176,7 @@ public class RoutineLoadSchedulerTest { List customKafkaPartitions = new ArrayList<>(); customKafkaPartitions.add(2); Deencapsulation.setField(kafkaRoutineLoadJob1, "customKafkaPartitions", customKafkaPartitions); - routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob1, "db"); + routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob1, "db", "table"); Thread.sleep(10000); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java index a819c4f030..89f6e0b814 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java @@ -352,7 +352,7 @@ public class GlobalTransactionMgrTest { TxnCommitAttachment txnCommitAttachment = new RLTaskTxnCommitAttachment(rlTaskTxnCommitAttachment); RoutineLoadManager routineLoadManager = new RoutineLoadManager(); - routineLoadManager.addRoutineLoadJob(routineLoadJob, "db"); + routineLoadManager.addRoutineLoadJob(routineLoadJob, "db", "table"); Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1) @@ -423,7 +423,7 @@ public class GlobalTransactionMgrTest { TxnCommitAttachment txnCommitAttachment = new RLTaskTxnCommitAttachment(rlTaskTxnCommitAttachment); RoutineLoadManager routineLoadManager = new RoutineLoadManager(); - routineLoadManager.addRoutineLoadJob(routineLoadJob, "db"); + routineLoadManager.addRoutineLoadJob(routineLoadJob, "db", "table"); Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState); Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)