[fix](schema change) reduce memory usage in schema change process #30231 #36285 #33073 (#36756)

pick
https://github.com/apache/doris/pull/30231
https://github.com/apache/doris/pull/36285
https://github.com/apache/doris/pull/33073
This commit is contained in:
Lightman
2024-06-25 12:21:17 +08:00
committed by GitHub
parent 3652fc31c3
commit 07ce9cf52c
8 changed files with 93 additions and 46 deletions

View File

@ -62,7 +62,6 @@ import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterReplicaTask;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
@ -81,11 +80,13 @@ import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -233,6 +234,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
try {
BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig());
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
long partitionId = entry.getKey();
Partition partition = tbl.getPartition(partitionId);
@ -276,7 +278,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
binlogConfig);
binlogConfig, objectPool);
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
if (this.storageFormat != null) {
createReplicaTask.setStorageFormat(this.storageFormat);
@ -386,7 +388,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
}
tbl.readLock();
Map<Object, List<TColumn>> tcloumnsPool = Maps.newHashMap();
Map<Object, Object> objectPool = new ConcurrentHashMap<Object, Object>();
try {
Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
@ -466,7 +468,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId,
rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId,
JobType.ROLLUP, defineExprs, descTable, tbl.getSchemaByIndexId(baseIndexId, true),
tcloumnsPool,
objectPool,
whereClause);
rollupBatchTask.addTask(rollupTask);
}

View File

@ -54,7 +54,6 @@ import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.AlterReplicaTask;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
@ -73,9 +72,11 @@ import org.apache.logging.log4j.Logger;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@ -234,6 +235,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
try {
Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
BinlogConfig binlogConfig = new BinlogConfig(tbl.getBinlogConfig());
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (long partitionId : partitionIndexMap.rowKeySet()) {
Partition partition = tbl.getPartition(partitionId);
if (partition == null) {
@ -281,7 +283,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(),
binlogConfig);
binlogConfig, objectPool);
createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId)
.get(shadowTabletId), originSchemaHash);
@ -405,7 +407,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
}
tbl.readLock();
Map<Object, List<TColumn>> tcloumnsPool = Maps.newHashMap();
Map<Object, Object> objectPool = new ConcurrentHashMap<Object, Object>();
try {
Map<String, Column> indexColumnMap = Maps.newHashMap();
for (Map.Entry<Long, List<Column>> entry : indexSchemaMap.entrySet()) {
@ -463,7 +465,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId,
tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId,
shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId,
JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, tcloumnsPool,
JobType.SCHEMA_CHANGE, defineExprs, descTable, originSchemaColumns, objectPool,
null);
schemaChangeBatchTask.addTask(rollupTask);
}

View File

@ -97,6 +97,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -1068,6 +1069,7 @@ public class RestoreJob extends AbstractJob {
} finally {
localTbl.readUnlock();
}
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (MaterializedIndex restoredIdx : restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
MaterializedIndexMeta indexMeta = localTbl.getIndexMetaByIndexId(restoredIdx.getId());
for (Tablet restoreTablet : restoredIdx.getTablets()) {
@ -1099,7 +1101,7 @@ public class RestoreJob extends AbstractJob {
localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
localTbl.getTimeSeriesCompactionLevelThreshold(),
localTbl.storeRowColumn(),
binlogConfig);
binlogConfig, objectPool);
task.setInvertedIndexStorageFormat(localTbl.getInvertedIndexStorageFormat());
task.setInRestoreMode(true);
batchTask.addTask(task);

View File

@ -1910,6 +1910,7 @@ public class InternalCatalog implements CatalogIf<Database> {
short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
TStorageMedium realStorageMedium = null;
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
long indexId = entry.getKey();
MaterializedIndex index = entry.getValue();
@ -1957,7 +1958,7 @@ public class InternalCatalog implements CatalogIf<Database> {
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.getTimeSeriesCompactionLevelThreshold(),
tbl.storeRowColumn(), binlogConfig);
tbl.storeRowColumn(), binlogConfig, objectPool);
task.setStorageFormat(tbl.getStorageFormat());
task.setInvertedIndexStorageFormat(tbl.getInvertedIndexStorageFormat());

View File

@ -748,6 +748,7 @@ public class ReportHandler extends Daemon {
long backendReportVersion) {
AgentBatchTask createReplicaBatchTask = new AgentBatchTask();
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
Map<Object, Object> objectPool = new HashMap<Object, Object>();
for (Long dbId : tabletDeleteFromMeta.keySet()) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
@ -852,7 +853,7 @@ public class ReportHandler extends Daemon {
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
olapTable.getTimeSeriesCompactionLevelThreshold(),
olapTable.storeRowColumn(),
binlogConfig);
binlogConfig, objectPool);
createReplicaTask.setIsRecoverTask(true);
createReplicaTask.setInvertedIndexStorageFormat(olapTable

View File

@ -53,7 +53,7 @@ public class AlterReplicaTask extends AgentTask {
private Map<String, Expr> defineExprs;
private Expr whereClause;
private DescriptorTable descTable;
private Map<Object, List<TColumn>> tcloumnsPool;
private Map<Object, Object> objectPool;
private List<Column> baseSchemaColumns;
/**
@ -63,7 +63,7 @@ public class AlterReplicaTask extends AgentTask {
public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId,
long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash,
int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map<String, Expr> defineExprs,
DescriptorTable descTable, List<Column> baseSchemaColumns, Map<Object, List<TColumn>> tcloumnsPool,
DescriptorTable descTable, List<Column> baseSchemaColumns, Map<Object, Object> objectPool,
Expr whereClause) {
super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId);
@ -81,7 +81,7 @@ public class AlterReplicaTask extends AgentTask {
this.whereClause = whereClause;
this.descTable = descTable;
this.baseSchemaColumns = baseSchemaColumns;
this.tcloumnsPool = tcloumnsPool;
this.objectPool = objectPool;
}
public long getBaseTabletId() {
@ -128,30 +128,47 @@ public class AlterReplicaTask extends AgentTask {
}
if (defineExprs != null) {
for (Map.Entry<String, Expr> entry : defineExprs.entrySet()) {
List<SlotRef> slots = Lists.newArrayList();
entry.getValue().collect(SlotRef.class, slots);
TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey());
mvParam.setMvExpr(entry.getValue().treeToThrift());
req.addToMaterializedViewParams(mvParam);
Object value = objectPool.get(entry.getKey());
if (value == null) {
List<SlotRef> slots = Lists.newArrayList();
entry.getValue().collect(SlotRef.class, slots);
TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey());
mvParam.setMvExpr(entry.getValue().treeToThrift());
req.addToMaterializedViewParams(mvParam);
objectPool.put(entry.getKey(), mvParam);
} else {
TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value;
req.addToMaterializedViewParams(mvParam);
}
}
}
if (whereClause != null) {
TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN);
mvParam.setMvExpr(whereClause.treeToThrift());
req.addToMaterializedViewParams(mvParam);
Object value = objectPool.get(Column.WHERE_SIGN);
if (value == null) {
TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN);
mvParam.setMvExpr(whereClause.treeToThrift());
req.addToMaterializedViewParams(mvParam);
objectPool.put(Column.WHERE_SIGN, mvParam);
} else {
TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value;
req.addToMaterializedViewParams(mvParam);
}
}
req.setDescTbl(descTable.toThrift());
if (baseSchemaColumns != null) {
List<TColumn> columns = tcloumnsPool.get(baseSchemaColumns);
if (columns == null) {
columns = new ArrayList<TColumn>();
Object value = objectPool.get(baseSchemaColumns);
if (value == null) {
List<TColumn> columns = new ArrayList<TColumn>();
for (Column column : baseSchemaColumns) {
columns.add(column.toThrift());
}
tcloumnsPool.put(baseSchemaColumns, columns);
req.setColumns(columns);
objectPool.put(baseSchemaColumns, columns);
} else {
List<TColumn> columns = (List<TColumn>) value;
req.setColumns(columns);
}
req.setColumns(columns);
}
return req;
}

View File

@ -47,6 +47,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@ -123,6 +124,8 @@ public class CreateReplicaTask extends AgentTask {
private BinlogConfig binlogConfig;
private List<Integer> clusterKeyIndexes;
private Map<Object, Object> objectPool;
public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
long replicaId, short shortKeyColumnCount, int schemaHash, long version,
KeysType keysType, TStorageType storageType,
@ -144,7 +147,8 @@ public class CreateReplicaTask extends AgentTask {
long timeSeriesCompactionEmptyRowsetsThreshold,
long timeSeriesCompactionLevelThreshold,
boolean storeRowColumn,
BinlogConfig binlogConfig) {
BinlogConfig binlogConfig,
Map<Object, Object> objectPool) {
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);
this.replicaId = replicaId;
@ -188,6 +192,7 @@ public class CreateReplicaTask extends AgentTask {
this.timeSeriesCompactionLevelThreshold = timeSeriesCompactionLevelThreshold;
this.storeRowColumn = storeRowColumn;
this.binlogConfig = binlogConfig;
this.objectPool = objectPool;
}
public void setIsRecoverTask(boolean isRecoverTask) {
@ -260,21 +265,32 @@ public class CreateReplicaTask extends AgentTask {
int deleteSign = -1;
int sequenceCol = -1;
int versionCol = -1;
List<TColumn> tColumns = new ArrayList<TColumn>();
List<TColumn> tColumns = null;
Object tCols = objectPool.get(columns);
if (tCols != null) {
tColumns = (List<TColumn>) tCols;
} else {
tColumns = new ArrayList<>();
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
TColumn tColumn = column.toThrift();
// is bloom filter column
if (bfColumns != null && bfColumns.contains(column.getName())) {
tColumn.setIsBloomFilterColumn(true);
}
// when doing schema change, some modified column has a prefix in name.
// this prefix is only used in FE, not visible to BE, so we should remove this prefix.
if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
tColumn.setColumnName(
column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
}
tColumn.setVisible(column.isVisible());
tColumns.add(tColumn);
}
objectPool.put(columns, tColumns);
}
for (int i = 0; i < columns.size(); i++) {
Column column = columns.get(i);
TColumn tColumn = column.toThrift();
// is bloom filter column
if (bfColumns != null && bfColumns.contains(column.getName())) {
tColumn.setIsBloomFilterColumn(true);
}
// when doing schema change, some modified column has a prefix in name.
// this prefix is only used in FE, not visible to BE, so we should remove this prefix.
if (column.getName().startsWith(SchemaChangeHandler.SHADOW_NAME_PREFIX)) {
tColumn.setColumnName(column.getName().substring(SchemaChangeHandler.SHADOW_NAME_PREFIX.length()));
}
tColumn.setVisible(column.isVisible());
tColumns.add(tColumn);
if (column.isDeleteSignColumn()) {
deleteSign = i;
}
@ -296,9 +312,15 @@ public class CreateReplicaTask extends AgentTask {
}
}
if (CollectionUtils.isNotEmpty(indexes)) {
List<TOlapTableIndex> tIndexes = new ArrayList<>();
for (Index index : indexes) {
tIndexes.add(index.toThrift());
List<TOlapTableIndex> tIndexes = null;
Object value = objectPool.get(indexes);
if (value != null) {
tIndexes = (List<TOlapTableIndex>) value;
} else {
tIndexes = new ArrayList<>();
for (Index index : indexes) {
tIndexes.add(index.toThrift());
}
}
tSchema.setIndexes(tIndexes);
storageFormat = TStorageFormat.V2;