[improve](routine_load) add db and table name in create routine load job log (#27500)
This commit is contained in:
committed by
GitHub
parent
b08865982a
commit
04033dce01
@ -175,10 +175,12 @@ public class RoutineLoadManager implements Writable {
|
||||
|
||||
routineLoadJob.setOrigStmt(createRoutineLoadStmt.getOrigStmt());
|
||||
routineLoadJob.setComment(createRoutineLoadStmt.getComment());
|
||||
addRoutineLoadJob(routineLoadJob, createRoutineLoadStmt.getDBName());
|
||||
addRoutineLoadJob(routineLoadJob, createRoutineLoadStmt.getDBName(),
|
||||
createRoutineLoadStmt.getTableName());
|
||||
}
|
||||
|
||||
public void addRoutineLoadJob(RoutineLoadJob routineLoadJob, String dbName) throws DdlException {
|
||||
public void addRoutineLoadJob(RoutineLoadJob routineLoadJob, String dbName, String tableName)
|
||||
throws DdlException {
|
||||
writeLock();
|
||||
try {
|
||||
// check if db.routineLoadName has been used
|
||||
@ -195,10 +197,12 @@ public class RoutineLoadManager implements Writable {
|
||||
|
||||
unprotectedAddJob(routineLoadJob);
|
||||
Env.getCurrentEnv().getEditLog().logCreateRoutineLoadJob(routineLoadJob);
|
||||
LOG.info("create routine load job: id: {}, name: {}", routineLoadJob.getId(), routineLoadJob.getName());
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
|
||||
LOG.info("create routine load job: id: {}, job name: {}, db name: {}, table name: {}",
|
||||
routineLoadJob.getId(), routineLoadJob.getName(), dbName, tableName);
|
||||
}
|
||||
|
||||
private void unprotectedAddJob(RoutineLoadJob routineLoadJob) {
|
||||
|
||||
@ -215,7 +215,7 @@ public class RoutineLoadManagerTest {
|
||||
|
||||
Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob);
|
||||
try {
|
||||
routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db");
|
||||
routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db", "table");
|
||||
Assert.fail();
|
||||
} catch (DdlException e) {
|
||||
LOG.info(e.getMessage());
|
||||
@ -256,7 +256,7 @@ public class RoutineLoadManagerTest {
|
||||
|
||||
Deencapsulation.setField(routineLoadManager, "dbToNameToRoutineLoadJob", dbToNameToRoutineLoadJob);
|
||||
Deencapsulation.setField(routineLoadManager, "idToRoutineLoadJob", idToRoutineLoadJob);
|
||||
routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db");
|
||||
routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db", "table");
|
||||
|
||||
Map<Long, Map<String, List<RoutineLoadJob>>> result =
|
||||
Deencapsulation.getField(routineLoadManager, "dbToNameToRoutineLoadJob");
|
||||
@ -777,7 +777,7 @@ public class RoutineLoadManagerTest {
|
||||
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
|
||||
KafkaRoutineLoadJob job = new KafkaRoutineLoadJob(1L, "testjob", SystemInfoService.DEFAULT_CLUSTER,
|
||||
10000, 10001, "192.168.1.1:9090", "testtopic", UserIdentity.ADMIN);
|
||||
routineLoadManager.addRoutineLoadJob(job, "testdb");
|
||||
routineLoadManager.addRoutineLoadJob(job, "testdb", "testtable");
|
||||
Config.max_routine_load_task_num_per_be = 10;
|
||||
Deencapsulation.setField(routineLoadManager, "beIdToMaxConcurrentTasks", beIdToMaxConcurrentTasks);
|
||||
Assert.assertEquals(1L, routineLoadManager.getAvailableBeForTask(1L, 1L, "default"));
|
||||
|
||||
@ -140,7 +140,7 @@ public class RoutineLoadSchedulerTest {
|
||||
KafkaRoutineLoadJob kafkaRoutineLoadJob = new KafkaRoutineLoadJob(1L, "test", "default_cluster", 1L, 1L,
|
||||
"10.74.167.16:8092", "test", UserIdentity.ADMIN);
|
||||
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
|
||||
routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db");
|
||||
routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob, "db", "table");
|
||||
|
||||
List<Long> backendIds = new ArrayList<>();
|
||||
backendIds.add(1L);
|
||||
@ -176,7 +176,7 @@ public class RoutineLoadSchedulerTest {
|
||||
List<Integer> customKafkaPartitions = new ArrayList<>();
|
||||
customKafkaPartitions.add(2);
|
||||
Deencapsulation.setField(kafkaRoutineLoadJob1, "customKafkaPartitions", customKafkaPartitions);
|
||||
routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob1, "db");
|
||||
routineLoadManager.addRoutineLoadJob(kafkaRoutineLoadJob1, "db", "table");
|
||||
|
||||
Thread.sleep(10000);
|
||||
}
|
||||
|
||||
@ -352,7 +352,7 @@ public class GlobalTransactionMgrTest {
|
||||
TxnCommitAttachment txnCommitAttachment = new RLTaskTxnCommitAttachment(rlTaskTxnCommitAttachment);
|
||||
|
||||
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
|
||||
routineLoadManager.addRoutineLoadJob(routineLoadJob, "db");
|
||||
routineLoadManager.addRoutineLoadJob(routineLoadJob, "db", "table");
|
||||
|
||||
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState);
|
||||
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
|
||||
@ -423,7 +423,7 @@ public class GlobalTransactionMgrTest {
|
||||
TxnCommitAttachment txnCommitAttachment = new RLTaskTxnCommitAttachment(rlTaskTxnCommitAttachment);
|
||||
|
||||
RoutineLoadManager routineLoadManager = new RoutineLoadManager();
|
||||
routineLoadManager.addRoutineLoadJob(routineLoadJob, "db");
|
||||
routineLoadManager.addRoutineLoadJob(routineLoadJob, "db", "table");
|
||||
|
||||
Deencapsulation.setField(masterTransMgr.getDatabaseTransactionMgr(CatalogTestUtil.testDbId1), "idToRunningTransactionState", idToTransactionState);
|
||||
Table testTable1 = masterEnv.getInternalCatalog().getDbOrMetaException(CatalogTestUtil.testDbId1)
|
||||
|
||||
Reference in New Issue
Block a user