From dde5ed5231bc9aeb1d889754fdd3732d9017185d Mon Sep 17 00:00:00 2001 From: Jack Drogon Date: Tue, 23 Jan 2024 20:56:39 +0800 Subject: [PATCH] [fix](fe-memory) Fix fe schema change high memory usage (#30231) Signed-off-by: Jack Drogon --- .../java/org/apache/doris/alter/RollupJobV2.java | 3 +++ .../org/apache/doris/alter/SchemaChangeJobV2.java | 7 ++++--- .../apache/doris/analysis/DescriptorTable.java | 7 +++++++ .../org/apache/doris/task/AlterReplicaTask.java | 15 +++++++++++---- 4 files changed, 25 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index aa61eaf906..4ad9e0fcb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -62,6 +62,7 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterReplicaTask; import org.apache.doris.task.CreateReplicaTask; +import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -386,6 +387,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { } tbl.readLock(); + Map> tcloumnsPool = Maps.newHashMap(); try { Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); for (Map.Entry entry : this.partitionIdToRollupIndex.entrySet()) { @@ -465,6 +467,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId, rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId, JobType.ROLLUP, defineExprs, descTable, tbl.getSchemaByIndexId(baseIndexId, true), + tcloumnsPool, whereClause); rollupBatchTask.addTask(rollupTask); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 5c74164ae3..08e2797341 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -55,6 +55,7 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterReplicaTask; import org.apache.doris.task.CreateReplicaTask; +import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -234,7 +235,6 @@ public class SchemaChangeJobV2 extends AlterJobV2 { tbl.readLock(); try { - Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE); BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig()); for (long partitionId : partitionIndexMap.rowKeySet()) { @@ -407,7 +407,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { } tbl.readLock(); - + Map> tcloumnsPool = Maps.newHashMap(); try { Map indexColumnMap = Maps.newHashMap(); for (Map.Entry> entry : indexSchemaMap.entrySet()) { @@ -470,7 +470,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 { AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId, tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId, shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId, - JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, null); + JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, tcloumnsPool, + null); schemaChangeBatchTask.addTask(rollupTask); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java index 42c69bba6e..fb6cc7df0a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java @@ -54,6 +54,8 @@ public class DescriptorTable { private final HashMap outToIntermediateSlots = new HashMap<>(); + private TDescriptorTable thriftDescTable = null; // serialized version of this + public DescriptorTable() { } @@ -182,6 +184,10 @@ public class DescriptorTable { } public TDescriptorTable toThrift() { + if (thriftDescTable != null) { + return thriftDescTable; + } + TDescriptorTable result = new TDescriptorTable(); Map referencedTbls = Maps.newHashMap(); for (TupleDescriptor tupleD : tupleDescs.values()) { @@ -208,6 +214,7 @@ public class DescriptorTable { for (TableIf tbl : referencedTbls.values()) { result.addToTableDescriptors(tbl.toThrift()); } + thriftDescTable = result; return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java index cd6502da76..267a99a12b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java @@ -53,6 +53,7 @@ public class AlterReplicaTask extends AgentTask { private Map defineExprs; private Expr whereClause; private DescriptorTable descTable; + private Map> tcloumnsPool; private List baseSchemaColumns; /** @@ -62,7 +63,8 @@ public class AlterReplicaTask extends AgentTask { public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map defineExprs, - DescriptorTable descTable, List baseSchemaColumns, Expr whereClause) { + DescriptorTable descTable, List baseSchemaColumns, Map> tcloumnsPool, + Expr whereClause) { super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId); this.baseTabletId = baseTabletId; @@ -79,6 +81,7 @@ public class AlterReplicaTask extends AgentTask { this.whereClause = whereClause; this.descTable = descTable; this.baseSchemaColumns = baseSchemaColumns; + this.tcloumnsPool = tcloumnsPool; } public long getBaseTabletId() { @@ -140,9 +143,13 @@ public class AlterReplicaTask extends AgentTask { req.setDescTbl(descTable.toThrift()); if (baseSchemaColumns != null) { - List columns = new ArrayList(); - for (Column column : baseSchemaColumns) { - columns.add(column.toThrift()); + List columns = tcloumnsPool.get(baseSchemaColumns); + if (columns == null) { + columns = new ArrayList(); + for (Column column : baseSchemaColumns) { + columns.add(column.toThrift()); + } + tcloumnsPool.put(baseSchemaColumns, columns); } req.setColumns(columns); }