@ -278,7 +278,8 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
|
||||
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
|
||||
tbl.getTimeSeriesCompactionLevelThreshold(),
|
||||
tbl.storeRowColumn(),
|
||||
binlogConfig, objectPool);
|
||||
binlogConfig, objectPool,
|
||||
tbl.rowStorePageSize());
|
||||
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
|
||||
if (this.storageFormat != null) {
|
||||
createReplicaTask.setStorageFormat(this.storageFormat);
|
||||
|
||||
@ -283,7 +283,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
|
||||
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
|
||||
tbl.getTimeSeriesCompactionLevelThreshold(),
|
||||
tbl.storeRowColumn(),
|
||||
binlogConfig, objectPool);
|
||||
binlogConfig, objectPool,
|
||||
tbl.rowStorePageSize());
|
||||
|
||||
createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId)
|
||||
.get(shadowTabletId), originSchemaHash);
|
||||
|
||||
@ -1102,7 +1102,8 @@ public class RestoreJob extends AbstractJob {
|
||||
localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
|
||||
localTbl.getTimeSeriesCompactionLevelThreshold(),
|
||||
localTbl.storeRowColumn(),
|
||||
binlogConfig, objectPool);
|
||||
binlogConfig, objectPool,
|
||||
localTbl.rowStorePageSize());
|
||||
task.setInvertedIndexStorageFormat(localTbl.getInvertedIndexStorageFormat());
|
||||
task.setInRestoreMode(true);
|
||||
batchTask.addTask(task);
|
||||
|
||||
@ -3384,6 +3384,10 @@ public class Env {
|
||||
if (olapTable.storeRowColumn()) {
|
||||
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_STORE_ROW_COLUMN).append("\" = \"");
|
||||
sb.append(olapTable.storeRowColumn()).append("\"");
|
||||
|
||||
// row store page size
|
||||
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ROW_STORE_PAGE_SIZE).append("\" = \"");
|
||||
sb.append(olapTable.rowStorePageSize()).append("\"");
|
||||
}
|
||||
|
||||
// skip inverted index on load
|
||||
|
||||
@ -2432,6 +2432,21 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
|
||||
tableProperty.buildCompressionType();
|
||||
}
|
||||
|
||||
public void setRowStorePageSize(long pageSize) {
|
||||
TableProperty tableProperty = getOrCreatTableProperty();
|
||||
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_ROW_STORE_PAGE_SIZE,
|
||||
Long.valueOf(pageSize).toString());
|
||||
tableProperty.buildRowStorePageSize();
|
||||
}
|
||||
|
||||
public long rowStorePageSize() {
|
||||
if (tableProperty != null) {
|
||||
return tableProperty.rowStorePageSize();
|
||||
}
|
||||
return PropertyAnalyzer.ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
|
||||
}
|
||||
|
||||
|
||||
public void setStorageFormat(TStorageFormat storageFormat) {
|
||||
TableProperty tableProperty = getOrCreatTableProperty();
|
||||
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT, storageFormat.name());
|
||||
|
||||
@ -92,6 +92,8 @@ public class TableProperty implements Writable {
|
||||
|
||||
private boolean skipWriteIndexOnLoad = false;
|
||||
|
||||
private long rowStorePageSize = PropertyAnalyzer.ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
|
||||
|
||||
private String compactionPolicy = PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY;
|
||||
|
||||
private long timeSeriesCompactionGoalSizeMbytes
|
||||
@ -238,6 +240,17 @@ public class TableProperty implements Writable {
|
||||
return storeRowColumn;
|
||||
}
|
||||
|
||||
public TableProperty buildRowStorePageSize() {
|
||||
rowStorePageSize = Long.parseLong(
|
||||
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ROW_STORE_PAGE_SIZE,
|
||||
Long.toString(PropertyAnalyzer.ROW_STORE_PAGE_SIZE_DEFAULT_VALUE)));
|
||||
return this;
|
||||
}
|
||||
|
||||
public long rowStorePageSize() {
|
||||
return rowStorePageSize;
|
||||
}
|
||||
|
||||
public TableProperty buildSkipWriteIndexOnLoad() {
|
||||
skipWriteIndexOnLoad = Boolean.parseBoolean(
|
||||
properties.getOrDefault(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD, "false"));
|
||||
@ -606,6 +619,7 @@ public class TableProperty implements Writable {
|
||||
.buildBinlogConfig()
|
||||
.buildEnableLightSchemaChange()
|
||||
.buildStoreRowColumn()
|
||||
.buildRowStorePageSize()
|
||||
.buildSkipWriteIndexOnLoad()
|
||||
.buildCompactionPolicy()
|
||||
.buildTimeSeriesCompactionGoalSizeMbytes()
|
||||
|
||||
@ -90,6 +90,10 @@ public class PropertyAnalyzer {
|
||||
public static final String PROPERTIES_TIMEOUT = "timeout";
|
||||
public static final String PROPERTIES_COMPRESSION = "compression";
|
||||
|
||||
// row store page size, default 16KB
|
||||
public static final String PROPERTIES_ROW_STORE_PAGE_SIZE = "row_store_page_size";
|
||||
public static final long ROW_STORE_PAGE_SIZE_DEFAULT_VALUE = 16384;
|
||||
|
||||
public static final String PROPERTIES_ENABLE_LIGHT_SCHEMA_CHANGE = "light_schema_change";
|
||||
|
||||
public static final String PROPERTIES_DISTRIBUTION_TYPE = "distribution_type";
|
||||
@ -887,6 +891,31 @@ public class PropertyAnalyzer {
|
||||
}
|
||||
}
|
||||
|
||||
public static long alignTo4K(long size) {
|
||||
return (size + 4095) & ~4095;
|
||||
}
|
||||
|
||||
// analyzeRowStorePageSize will parse the row_store_page_size from properties
|
||||
public static long analyzeRowStorePageSize(Map<String, String> properties) throws AnalysisException {
|
||||
long rowStorePageSize = ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
|
||||
if (properties != null && properties.containsKey(PROPERTIES_ROW_STORE_PAGE_SIZE)) {
|
||||
String rowStorePageSizeStr = properties.get(PROPERTIES_ROW_STORE_PAGE_SIZE);
|
||||
try {
|
||||
rowStorePageSize = alignTo4K(Long.parseLong(rowStorePageSizeStr));
|
||||
} catch (NumberFormatException e) {
|
||||
throw new AnalysisException("Invalid row store page size: " + rowStorePageSizeStr);
|
||||
}
|
||||
|
||||
if (rowStorePageSize <= 0) {
|
||||
throw new AnalysisException("Row store page size should larger than 0.");
|
||||
}
|
||||
|
||||
properties.remove(PROPERTIES_ROW_STORE_PAGE_SIZE);
|
||||
}
|
||||
|
||||
return rowStorePageSize;
|
||||
}
|
||||
|
||||
// analyzeStorageFormat will parse the storage format from properties
|
||||
// sql: alter table tablet_name set ("storage_format" = "v2")
|
||||
// Use this sql to convert all tablets(base and rollup index) to a new format segment
|
||||
|
||||
@ -1511,6 +1511,10 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
properties.put(PropertyAnalyzer.PROPERTIES_STORE_ROW_COLUMN,
|
||||
olapTable.storeRowColumn().toString());
|
||||
}
|
||||
if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_ROW_STORE_PAGE_SIZE)) {
|
||||
properties.put(PropertyAnalyzer.PROPERTIES_ROW_STORE_PAGE_SIZE,
|
||||
Long.toString(olapTable.rowStorePageSize()));
|
||||
}
|
||||
if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)) {
|
||||
properties.put(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD,
|
||||
olapTable.skipWriteIndexOnLoad().toString());
|
||||
@ -1988,7 +1992,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
|
||||
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
|
||||
tbl.getTimeSeriesCompactionLevelThreshold(),
|
||||
tbl.storeRowColumn(), binlogConfig, objectPool);
|
||||
tbl.storeRowColumn(), binlogConfig, objectPool, tbl.rowStorePageSize());
|
||||
|
||||
task.setStorageFormat(tbl.getStorageFormat());
|
||||
task.setInvertedIndexStorageFormat(tbl.getInvertedIndexStorageFormat());
|
||||
@ -2347,6 +2351,16 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
}
|
||||
olapTable.setCompressionType(compressionType);
|
||||
|
||||
// get row_store_page_size
|
||||
long rowStorePageSize = PropertyAnalyzer.ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
|
||||
try {
|
||||
rowStorePageSize = PropertyAnalyzer.analyzeRowStorePageSize(properties);
|
||||
} catch (AnalysisException e) {
|
||||
throw new DdlException(e.getMessage());
|
||||
}
|
||||
|
||||
olapTable.setRowStorePageSize(rowStorePageSize);
|
||||
|
||||
// check data sort properties
|
||||
int keyColumnSize = CollectionUtils.isEmpty(keysDesc.getClusterKeysColumnIds()) ? keysDesc.keysColumnSize() :
|
||||
keysDesc.getClusterKeysColumnIds().size();
|
||||
|
||||
@ -865,7 +865,8 @@ public class ReportHandler extends Daemon {
|
||||
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
|
||||
olapTable.getTimeSeriesCompactionLevelThreshold(),
|
||||
olapTable.storeRowColumn(),
|
||||
binlogConfig, objectPool);
|
||||
binlogConfig, objectPool,
|
||||
olapTable.rowStorePageSize());
|
||||
|
||||
createReplicaTask.setIsRecoverTask(true);
|
||||
createReplicaTask.setInvertedIndexStorageFormat(olapTable
|
||||
|
||||
@ -64,6 +64,7 @@ public class CreateReplicaTask extends AgentTask {
|
||||
private TStorageType storageType;
|
||||
private TStorageMedium storageMedium;
|
||||
private TCompressionType compressionType;
|
||||
private long rowStorePageSize;
|
||||
|
||||
private List<Column> columns;
|
||||
|
||||
@ -148,7 +149,8 @@ public class CreateReplicaTask extends AgentTask {
|
||||
long timeSeriesCompactionLevelThreshold,
|
||||
boolean storeRowColumn,
|
||||
BinlogConfig binlogConfig,
|
||||
Map<Object, Object> objectPool) {
|
||||
Map<Object, Object> objectPool,
|
||||
long rowStorePageSize) {
|
||||
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);
|
||||
|
||||
this.replicaId = replicaId;
|
||||
@ -193,6 +195,8 @@ public class CreateReplicaTask extends AgentTask {
|
||||
this.storeRowColumn = storeRowColumn;
|
||||
this.binlogConfig = binlogConfig;
|
||||
this.objectPool = objectPool;
|
||||
this.rowStorePageSize = rowStorePageSize;
|
||||
|
||||
}
|
||||
|
||||
public void setIsRecoverTask(boolean isRecoverTask) {
|
||||
@ -334,6 +338,7 @@ public class CreateReplicaTask extends AgentTask {
|
||||
tSchema.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction);
|
||||
tSchema.setSkipWriteIndexOnLoad(skipWriteIndexOnLoad);
|
||||
tSchema.setStoreRowColumn(storeRowColumn);
|
||||
tSchema.setRowStorePageSize(rowStorePageSize);
|
||||
createTabletReq.setTabletSchema(tSchema);
|
||||
|
||||
createTabletReq.setVersion(version);
|
||||
|
||||
@ -73,6 +73,7 @@ public class AgentTaskTest {
|
||||
private long version = 1L;
|
||||
|
||||
private TStorageType storageType = TStorageType.COLUMN;
|
||||
private long rowStorePageSize = 16384L;
|
||||
private List<Column> columns;
|
||||
private MarkedCountDownLatch<Long, Long> latch = new MarkedCountDownLatch<Long, Long>(3);
|
||||
|
||||
@ -107,7 +108,7 @@ public class AgentTaskTest {
|
||||
createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId,
|
||||
indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, version, KeysType.AGG_KEYS, storageType,
|
||||
TStorageMedium.SSD, columns, null, 0, latch, null, false, TTabletType.TABLET_TYPE_DISK, null,
|
||||
TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, 0, false, null, objectPool);
|
||||
TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, 0, false, null, objectPool, rowStorePageSize);
|
||||
|
||||
// drop
|
||||
dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1, false);
|
||||
|
||||
Reference in New Issue
Block a user