[Bug] Fix bug that descriptor table is not reset before planning next routine load task (#3605)

Before planning for next routine load task, the analyzer and descriptor table
in it should be reset. Otherwise, a lot of historical objects will
accumulate inside, causing memory leaks.
This commit is contained in:
Mingyu Chen
2020-05-18 10:34:21 +08:00
committed by GitHub
parent 5138197d57
commit 271f25f0a4
2 changed files with 8 additions and 5 deletions

View File

@ -41,17 +41,15 @@ import java.util.List;
public class DescriptorTable {
private final static Logger LOG = LogManager.getLogger(DescriptorTable.class);
private final HashMap<TupleId, TupleDescriptor> tupleDescs;
private final HashMap<TupleId, TupleDescriptor> tupleDescs = new HashMap<TupleId, TupleDescriptor>();
// List of referenced tables with no associated TupleDescriptor to ship to the BE.
// For example, the output table of an insert query.
private final List<Table> referencedTables;
private final List<Table> referencedTables = new ArrayList<Table>();;
private final IdGenerator<TupleId> tupleIdGenerator_ = TupleId.createGenerator();
private final IdGenerator<SlotId> slotIdGenerator_ = SlotId.createGenerator();
private final HashMap<SlotId, SlotDescriptor> slotDescs = Maps.newHashMap();
public DescriptorTable() {
tupleDescs = new HashMap<TupleId, TupleDescriptor>();
referencedTables = new ArrayList<Table>();
}
public TupleDescriptor createTupleDescriptor() {

View File

@ -77,19 +77,24 @@ public class StreamLoadPlanner {
this.db = db;
this.destTable = destTable;
this.streamLoadTask = streamLoadTask;
analyzer = new Analyzer(Catalog.getInstance(), null);
}
private void resetAnalyzer() {
analyzer = new Analyzer(Catalog.getCurrentCatalog(), null);
// TODO(cmy): currently we do not support UDF in stream load command.
// Because there is no way to check the privilege of accessing UDF..
analyzer.setUDFAllowed(false);
descTable = analyzer.getDescTbl();
}
// can only be called after "plan()", or it will return null
public OlapTable getDestTable() {
return destTable;
}
// create the plan. the plan's query id and load id are same, using the parameter 'loadId'
public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException {
resetAnalyzer();
// construct tuple descriptor, used for scanNode and dataSink
TupleDescriptor tupleDesc = descTable.createTupleDescriptor("DstTableTuple");
boolean negative = streamLoadTask.getNegative();