[Bug] Fix bug that replayCreateLoadJob will cause fe memory leak in non master node because InsertLoadJob cannot be removed from TxnStateCallbackFactory (#6795)
This commit is contained in:
@ -48,10 +48,11 @@ public class InsertLoadJob extends LoadJob {
|
||||
super(EtlJobType.INSERT);
|
||||
}
|
||||
|
||||
public InsertLoadJob(String label, long dbId, long tableId, long createTimestamp, String failMsg,
|
||||
public InsertLoadJob(String label, long transactionId, long dbId, long tableId, long createTimestamp, String failMsg,
|
||||
String trackingUrl) throws MetaNotFoundException {
|
||||
super(EtlJobType.INSERT, dbId, label);
|
||||
this.tableId = tableId;
|
||||
this.transactionId = transactionId;
|
||||
this.createTimestamp = createTimestamp;
|
||||
this.loadStartTimestamp = createTimestamp;
|
||||
this.finishTimestamp = System.currentTimeMillis();
|
||||
|
||||
@ -260,9 +260,11 @@ public class LoadManager implements Writable{
|
||||
return;
|
||||
}
|
||||
addLoadJob(loadJob);
|
||||
// add callback before txn created, because callback will be performed on replay without txn begin
|
||||
// add callback before txn if load job is uncompleted, because callback will be performed on replay without txn begin
|
||||
// register txn state listener
|
||||
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
|
||||
if (!loadJob.isCompleted()) {
|
||||
Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(loadJob);
|
||||
}
|
||||
}
|
||||
|
||||
private void addLoadJob(LoadJob loadJob) {
|
||||
@ -278,7 +280,7 @@ public class LoadManager implements Writable{
|
||||
labelToLoadJobs.get(loadJob.getLabel()).add(loadJob);
|
||||
}
|
||||
|
||||
public void recordFinishedLoadJob(String label, String dbName, long tableId, EtlJobType jobType,
|
||||
public void recordFinishedLoadJob(String label, long transactionId, String dbName, long tableId, EtlJobType jobType,
|
||||
long createTimestamp, String failMsg, String trackingUrl) throws MetaNotFoundException {
|
||||
|
||||
// get db id
|
||||
@ -287,7 +289,7 @@ public class LoadManager implements Writable{
|
||||
LoadJob loadJob;
|
||||
switch (jobType) {
|
||||
case INSERT:
|
||||
loadJob = new InsertLoadJob(label, db.getId(), tableId, createTimestamp, failMsg, trackingUrl);
|
||||
loadJob = new InsertLoadJob(label, transactionId, db.getId(), tableId, createTimestamp, failMsg, trackingUrl);
|
||||
break;
|
||||
default:
|
||||
return;
|
||||
|
||||
@ -1288,9 +1288,11 @@ public class StmtExecutor implements ProfileWriter {
|
||||
message = throwable.getMessage();
|
||||
}
|
||||
|
||||
txnId = insertStmt.getTransactionId();
|
||||
try {
|
||||
context.getCatalog().getLoadManager().recordFinishedLoadJob(
|
||||
label,
|
||||
txnId,
|
||||
insertStmt.getDb(),
|
||||
insertStmt.getTargetTable().getId(),
|
||||
EtlJobType.INSERT,
|
||||
@ -1301,7 +1303,6 @@ public class StmtExecutor implements ProfileWriter {
|
||||
LOG.warn("Record info of insert load with error {}", e.getMessage(), e);
|
||||
errMsg = "Record info of insert load with error " + e.getMessage();
|
||||
}
|
||||
txnId = insertStmt.getTransactionId();
|
||||
}
|
||||
|
||||
// {'label':'my_label1', 'status':'visible', 'txnId':'123'}
|
||||
|
||||
@ -39,7 +39,7 @@ public class InsertLoadJobTest {
|
||||
public void testGetTableNames(@Mocked Catalog catalog,
|
||||
@Injectable Database database,
|
||||
@Injectable Table table) throws MetaNotFoundException {
|
||||
InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1000, "", "");
|
||||
InsertLoadJob insertLoadJob = new InsertLoadJob("label", 1L, 1L, 1L, 1000, "", "");
|
||||
String tableName = "table1";
|
||||
new Expectations() {
|
||||
{
|
||||
|
||||
@ -133,7 +133,7 @@ public class LoadManagerTest {
|
||||
};
|
||||
|
||||
loadManager = new LoadManager(new LoadJobScheduler());
|
||||
LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis(), "", "");
|
||||
LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, System.currentTimeMillis(), "", "");
|
||||
Deencapsulation.invoke(loadManager, "addLoadJob", job1);
|
||||
|
||||
File file = serializeToFile(loadManager);
|
||||
@ -168,7 +168,7 @@ public class LoadManagerTest {
|
||||
};
|
||||
|
||||
loadManager = new LoadManager(new LoadJobScheduler());
|
||||
LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, System.currentTimeMillis(), "", "");
|
||||
LoadJob job1 = new InsertLoadJob("job1", 1L, 1L, 1L, System.currentTimeMillis(), "", "");
|
||||
Deencapsulation.invoke(loadManager, "addLoadJob", job1);
|
||||
|
||||
//make job1 don't serialize
|
||||
|
||||
Reference in New Issue
Block a user