diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 8ad67cfd92..2ffd5e5a4e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -62,7 +62,6 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterReplicaTask; import org.apache.doris.task.CreateReplicaTask; -import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -81,11 +80,13 @@ import java.io.DataOutput; import java.io.IOException; import java.io.StringReader; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -233,6 +234,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { try { BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig()); Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); + Map objectPool = new HashMap(); for (Map.Entry entry : this.partitionIdToRollupIndex.entrySet()) { long partitionId = entry.getKey(); Partition partition = tbl.getPartition(partitionId); @@ -276,7 +278,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), tbl.getTimeSeriesCompactionLevelThreshold(), tbl.storeRowColumn(), - binlogConfig); + binlogConfig, objectPool); createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash); if (this.storageFormat != null) { createReplicaTask.setStorageFormat(this.storageFormat); @@ -386,7 +388,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { } tbl.readLock(); - Map> tcloumnsPool = Maps.newHashMap(); + Map objectPool = new ConcurrentHashMap(); try { Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP); for (Map.Entry entry : this.partitionIdToRollupIndex.entrySet()) { @@ -466,7 +468,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId, rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId, JobType.ROLLUP, defineExprs, descTable, tbl.getSchemaByIndexId(baseIndexId, true), - tcloumnsPool, + objectPool, whereClause); rollupBatchTask.addTask(rollupTask); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 3a589e0372..b58568d394 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -54,7 +54,6 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.AlterReplicaTask; import org.apache.doris.task.CreateReplicaTask; -import org.apache.doris.thrift.TColumn; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -73,9 +72,11 @@ import org.apache.logging.log4j.Logger; import java.io.DataOutput; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -234,6 +235,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { try { Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE); BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig()); + Map objectPool = new HashMap(); for (long partitionId : partitionIndexMap.rowKeySet()) { Partition partition = tbl.getPartition(partitionId); if (partition == null) { @@ -281,7 +283,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), tbl.getTimeSeriesCompactionLevelThreshold(), tbl.storeRowColumn(), - binlogConfig); + binlogConfig, objectPool); createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId) .get(shadowTabletId), originSchemaHash); @@ -405,7 +407,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { } tbl.readLock(); - Map> tcloumnsPool = Maps.newHashMap(); + Map objectPool = new ConcurrentHashMap(); try { Map indexColumnMap = Maps.newHashMap(); for (Map.Entry> entry : indexSchemaMap.entrySet()) { @@ -463,7 +465,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId, tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId, shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId, - JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, tcloumnsPool, + JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, objectPool, null); schemaChangeBatchTask.addTask(rollupTask); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index f36573e1ba..12c42aee73 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -97,6 +97,7 @@ import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -1068,6 +1069,7 @@ public class RestoreJob extends AbstractJob { } finally { localTbl.readUnlock(); } + Map objectPool = new HashMap(); for (MaterializedIndex restoredIdx : restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) { MaterializedIndexMeta indexMeta = localTbl.getIndexMetaByIndexId(restoredIdx.getId()); for (Tablet restoreTablet : restoredIdx.getTablets()) { @@ -1099,7 +1101,7 @@ public class RestoreJob extends AbstractJob { localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), localTbl.getTimeSeriesCompactionLevelThreshold(), localTbl.storeRowColumn(), - binlogConfig); + binlogConfig, objectPool); task.setInvertedIndexStorageFormat(localTbl.getInvertedIndexStorageFormat()); task.setInRestoreMode(true); batchTask.addTask(task); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index df6b02b832..c8f52d5b4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1910,6 +1910,7 @@ public class InternalCatalog implements CatalogIf { short totalReplicaNum = replicaAlloc.getTotalReplicaNum(); TStorageMedium realStorageMedium = null; + Map objectPool = new HashMap(); for (Map.Entry entry : indexMap.entrySet()) { long indexId = entry.getKey(); MaterializedIndex index = entry.getValue(); @@ -1957,7 +1958,7 @@ public class InternalCatalog implements CatalogIf { tbl.getTimeSeriesCompactionTimeThresholdSeconds(), tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), tbl.getTimeSeriesCompactionLevelThreshold(), - tbl.storeRowColumn(), binlogConfig); + tbl.storeRowColumn(), binlogConfig, objectPool); task.setStorageFormat(tbl.getStorageFormat()); task.setInvertedIndexStorageFormat(tbl.getInvertedIndexStorageFormat()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 574d3314a1..7c77c1dfad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -748,6 +748,7 @@ public class ReportHandler extends Daemon { long backendReportVersion) { AgentBatchTask createReplicaBatchTask = new AgentBatchTask(); TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); + Map objectPool = new HashMap(); for (Long dbId : tabletDeleteFromMeta.keySet()) { Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); if (db == null) { @@ -852,7 +853,7 @@ public class ReportHandler extends Daemon { olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(), olapTable.getTimeSeriesCompactionLevelThreshold(), olapTable.storeRowColumn(), - binlogConfig); + binlogConfig, objectPool); createReplicaTask.setIsRecoverTask(true); createReplicaTask.setInvertedIndexStorageFormat(olapTable diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java index 267a99a12b..bf4708b4b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java @@ -53,7 +53,7 @@ public class AlterReplicaTask extends AgentTask { private Map defineExprs; private Expr whereClause; private DescriptorTable descTable; - private Map> tcloumnsPool; + private Map objectPool; private List baseSchemaColumns; /** @@ -63,7 +63,7 @@ public class AlterReplicaTask extends AgentTask { public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map defineExprs, - DescriptorTable descTable, List baseSchemaColumns, Map> tcloumnsPool, + DescriptorTable descTable, List baseSchemaColumns, Map objectPool, Expr whereClause) { super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId); @@ -81,7 +81,7 @@ public class AlterReplicaTask extends AgentTask { this.whereClause = whereClause; this.descTable = descTable; this.baseSchemaColumns = baseSchemaColumns; - this.tcloumnsPool = tcloumnsPool; + this.objectPool = objectPool; } public long getBaseTabletId() { @@ -128,30 +128,47 @@ public class AlterReplicaTask extends AgentTask { } if (defineExprs != null) { for (Map.Entry entry : defineExprs.entrySet()) { - List slots = Lists.newArrayList(); - entry.getValue().collect(SlotRef.class, slots); - TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey()); - mvParam.setMvExpr(entry.getValue().treeToThrift()); - req.addToMaterializedViewParams(mvParam); + Object value = objectPool.get(entry.getKey()); + if (value == null) { + List slots = Lists.newArrayList(); + entry.getValue().collect(SlotRef.class, slots); + TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey()); + mvParam.setMvExpr(entry.getValue().treeToThrift()); + req.addToMaterializedViewParams(mvParam); + objectPool.put(entry.getKey(), mvParam); + } else { + TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value; + req.addToMaterializedViewParams(mvParam); + } } } if (whereClause != null) { - TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN); - mvParam.setMvExpr(whereClause.treeToThrift()); - req.addToMaterializedViewParams(mvParam); + Object value = objectPool.get(Column.WHERE_SIGN); + if (value == null) { + TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN); + mvParam.setMvExpr(whereClause.treeToThrift()); + req.addToMaterializedViewParams(mvParam); + objectPool.put(Column.WHERE_SIGN, mvParam); + } else { + TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value; + req.addToMaterializedViewParams(mvParam); + } } req.setDescTbl(descTable.toThrift()); if (baseSchemaColumns != null) { - List columns = tcloumnsPool.get(baseSchemaColumns); - if (columns == null) { - columns = new ArrayList(); + Object value = objectPool.get(baseSchemaColumns); + if (value == null) { + List columns = new ArrayList(); for (Column column : baseSchemaColumns) { columns.add(column.toThrift()); } - tcloumnsPool.put(baseSchemaColumns, columns); + req.setColumns(columns); + objectPool.put(baseSchemaColumns, columns); + } else { + List columns = (List) value; + req.setColumns(columns); } - req.setColumns(columns); } return req; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java index 7a5262ba0e..1de5d4e8d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java @@ -47,6 +47,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; @@ -123,6 +124,8 @@ public class CreateReplicaTask extends AgentTask { private BinlogConfig binlogConfig; private List clusterKeyIndexes; + private Map objectPool; + public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId, long replicaId, short shortKeyColumnCount, int schemaHash, long version, KeysType keysType, TStorageType storageType, @@ -144,7 +147,8 @@ public class CreateReplicaTask extends AgentTask { long timeSeriesCompactionEmptyRowsetsThreshold, long timeSeriesCompactionLevelThreshold, boolean storeRowColumn, - BinlogConfig binlogConfig) { + BinlogConfig binlogConfig, + Map objectPool) { super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId); this.replicaId = replicaId; @@ -188,6 +192,7 @@ public class CreateReplicaTask extends AgentTask { this.timeSeriesCompactionLevelThreshold = timeSeriesCompactionLevelThreshold; this.storeRowColumn = storeRowColumn; this.binlogConfig = binlogConfig; + this.objectPool = objectPool; } public void setIsRecoverTask(boolean isRecoverTask) { @@ -260,21 +265,32 @@ public class CreateReplicaTask extends AgentTask { int deleteSign = -1; int sequenceCol = -1; int versionCol = -1; - List tColumns = new ArrayList(); + List tColumns = null; + Object tCols = objectPool.get(columns); + if (tCols != null) { + tColumns = (List) tCols; + } else { + tColumns = new ArrayList<>(); + for (int i = 0; i < columns.size(); i++) { + Column column = columns.get(i); + TColumn tColumn = column.toThrift(); + // is bloom filter column + if (bfColumns != null && bfColumns.contains(column.getName())) { + tColumn.setIsBloomFilterColumn(true); + } + // when doing schema change, some modified column has a prefix in name. + // this prefix is only used in FE, not visible to BE, so we should remove this prefix. + if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) { + tColumn.setColumnName( + column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length())); + } + tColumn.setVisible(column.isVisible()); + tColumns.add(tColumn); + } + objectPool.put(columns, tColumns); + } for (int i = 0; i < columns.size(); i++) { Column column = columns.get(i); - TColumn tColumn = column.toThrift(); - // is bloom filter column - if (bfColumns != null && bfColumns.contains(column.getName())) { - tColumn.setIsBloomFilterColumn(true); - } - // when doing schema change, some modified column has a prefix in name. - // this prefix is only used in FE, not visible to BE, so we should remove this prefix. - if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) { - tColumn.setColumnName(column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length())); - } - tColumn.setVisible(column.isVisible()); - tColumns.add(tColumn); if (column.isDeleteSignColumn()) { deleteSign = i; } @@ -296,9 +312,15 @@ public class CreateReplicaTask extends AgentTask { } } if (CollectionUtils.isNotEmpty(indexes)) { - List tIndexes = new ArrayList<>(); - for (Index index : indexes) { - tIndexes.add(index.toThrift()); + List tIndexes = null; + Object value = objectPool.get(indexes); + if (value != null) { + tIndexes = (List) value; + } else { + tIndexes = new ArrayList<>(); + for (Index index : indexes) { + tIndexes.add(index.toThrift()); + } } tSchema.setIndexes(tIndexes); storageFormat = TStorageFormat.V2; diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java index 3dc4bc4695..b604076ddb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java @@ -102,12 +102,12 @@ public class AgentTaskTest { range2 = Range.closedOpen(pk2, pk3); // create tasks - + Map objectPool = new HashMap(); // create createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, version, KeysType.AGG_KEYS, storageType, TStorageMedium.SSD, columns, null, 0, latch, null, false, TTabletType.TABLET_TYPE_DISK, null, - TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, 0, false, null); + TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, 0, false, null, objectPool); // drop dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1, false);