[fix](fe-memory) Fix fe schema change high memory usage (#30231)

Signed-off-by: Jack Drogon <jack.xsuperman@gmail.com>
This commit is contained in:
Jack Drogon
2024-01-23 20:56:39 +08:00
committed by yiguolei
parent 5213f941dd
commit dde5ed5231
4 changed files with 25 additions and 7 deletions

View File

@ -62,6 +62,7 @@ import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterReplicaTask;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
@ -386,6 +387,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
}
tbl.readLock();
Map<Object, List<TColumn>> tcloumnsPool = Maps.newHashMap();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
@ -465,6 +467,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId,
rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId,
JobType.ROLLUP, defineExprs, descTable, tbl.getSchemaByIndexId(baseIndexId, true),
tcloumnsPool,
whereClause);
rollupBatchTask.addTask(rollupTask);
}

View File

@ -55,6 +55,7 @@ import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterReplicaTask;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
@ -234,7 +235,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
tbl.readLock();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig());
for (long partitionId : partitionIndexMap.rowKeySet()) {
@ -407,7 +407,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
tbl.readLock();
Map<Object, List<TColumn>> tcloumnsPool = Maps.newHashMap();
try {
Map<String, Column> indexColumnMap = Maps.newHashMap();
for (Map.Entry<Long, List<Column>> entry : indexSchemaMap.entrySet()) {
@ -470,7 +470,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId,
tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId,
shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId,
JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, null);
JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, tcloumnsPool,
null);
schemaChangeBatchTask.addTask(rollupTask);
}
}

View File

@ -54,6 +54,8 @@ public class DescriptorTable {
private final HashMap<SlotDescriptor, SlotDescriptor> outToIntermediateSlots = new HashMap<>();
private TDescriptorTable thriftDescTable = null; // serialized version of this
public DescriptorTable() {
}
@ -182,6 +184,10 @@ public class DescriptorTable {
}
public TDescriptorTable toThrift() {
if (thriftDescTable != null) {
return thriftDescTable;
}
TDescriptorTable result = new TDescriptorTable();
Map<Long, TableIf> referencedTbls = Maps.newHashMap();
for (TupleDescriptor tupleD : tupleDescs.values()) {
@ -208,6 +214,7 @@ public class DescriptorTable {
for (TableIf tbl : referencedTbls.values()) {
result.addToTableDescriptors(tbl.toThrift());
}
thriftDescTable = result;
return result;
}

View File

@ -53,6 +53,7 @@ public class AlterReplicaTask extends AgentTask {
private Map<String, Expr> defineExprs;
private Expr whereClause;
private DescriptorTable descTable;
private Map<Object, List<TColumn>> tcloumnsPool;
private List<Column> baseSchemaColumns;
/**
@ -62,7 +63,8 @@ public class AlterReplicaTask extends AgentTask {
public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId,
long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash,
int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map<String, Expr> defineExprs,
DescriptorTable descTable, List<Column> baseSchemaColumns, Expr whereClause) {
DescriptorTable descTable, List<Column> baseSchemaColumns, Map<Object, List<TColumn>> tcloumnsPool,
Expr whereClause) {
super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId);
this.baseTabletId = baseTabletId;
@ -79,6 +81,7 @@ public class AlterReplicaTask extends AgentTask {
this.whereClause = whereClause;
this.descTable = descTable;
this.baseSchemaColumns = baseSchemaColumns;
this.tcloumnsPool = tcloumnsPool;
}
public long getBaseTabletId() {
@ -140,9 +143,13 @@ public class AlterReplicaTask extends AgentTask {
req.setDescTbl(descTable.toThrift());
if (baseSchemaColumns != null) {
List<TColumn> columns = new ArrayList<TColumn>();
for (Column column : baseSchemaColumns) {
columns.add(column.toThrift());
List<TColumn> columns = tcloumnsPool.get(baseSchemaColumns);
if (columns == null) {
columns = new ArrayList<TColumn>();
for (Column column : baseSchemaColumns) {
columns.add(column.toThrift());
}
tcloumnsPool.put(baseSchemaColumns, columns);
}
req.setColumns(columns);
}