[feature] Support compression prop (#8923)
This commit is contained in:
@ -231,7 +231,8 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
|
||||
rollupSchema, tbl.getCopiedBfColumns(), tbl.getBfFpp(), countDownLatch,
|
||||
tbl.getCopiedIndexes(),
|
||||
tbl.isInMemory(),
|
||||
tabletType);
|
||||
tabletType,
|
||||
tbl.getCompressionType());
|
||||
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
|
||||
if (this.storageFormat != null) {
|
||||
createReplicaTask.setStorageFormat(this.storageFormat);
|
||||
|
||||
@ -253,7 +253,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
|
||||
originKeysType, TStorageType.COLUMN, storageMedium,
|
||||
shadowSchema, bfColumns, bfFpp, countDownLatch, indexes,
|
||||
tbl.isInMemory(),
|
||||
tbl.getPartitionInfo().getTabletType(partitionId));
|
||||
tbl.getPartitionInfo().getTabletType(partitionId),
|
||||
tbl.getCompressionType());
|
||||
createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId), originSchemaHash);
|
||||
if (this.storageFormat != null) {
|
||||
createReplicaTask.setStorageFormat(this.storageFormat);
|
||||
|
||||
@ -964,7 +964,8 @@ public class RestoreJob extends AbstractJob {
|
||||
indexMeta.getSchema(), bfColumns, bfFpp, null,
|
||||
localTbl.getCopiedIndexes(),
|
||||
localTbl.isInMemory(),
|
||||
localTbl.getPartitionInfo().getTabletType(restorePart.getId()));
|
||||
localTbl.getPartitionInfo().getTabletType(restorePart.getId()),
|
||||
localTbl.getCompressionType());
|
||||
task.setInRestoreMode(true);
|
||||
batchTask.addTask(task);
|
||||
}
|
||||
|
||||
@ -245,6 +245,7 @@ import org.apache.doris.task.CreateReplicaTask;
|
||||
import org.apache.doris.task.DropReplicaTask;
|
||||
import org.apache.doris.task.MasterTaskExecutor;
|
||||
import org.apache.doris.thrift.BackendService;
|
||||
import org.apache.doris.thrift.TCompressionType;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TStorageFormat;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
@ -3026,6 +3027,7 @@ public class Catalog {
|
||||
* 6.2. replicationNum
|
||||
* 6.3. inMemory
|
||||
* 6.4. storageFormat
|
||||
* 6.5. compressionType
|
||||
* 7. set index meta
|
||||
* 8. check colocation properties
|
||||
* 9. create tablet in BE
|
||||
@ -3314,6 +3316,7 @@ public class Catalog {
|
||||
singlePartitionDesc.isInMemory(),
|
||||
olapTable.getStorageFormat(),
|
||||
singlePartitionDesc.getTabletType(),
|
||||
olapTable.getCompressionType(),
|
||||
olapTable.getDataSortInfo()
|
||||
);
|
||||
|
||||
@ -3545,6 +3548,7 @@ public class Catalog {
|
||||
boolean isInMemory,
|
||||
TStorageFormat storageFormat,
|
||||
TTabletType tabletType,
|
||||
TCompressionType compressionType,
|
||||
DataSortInfo dataSortInfo) throws DdlException {
|
||||
// create base index first.
|
||||
Preconditions.checkArgument(baseIndexId != -1);
|
||||
@ -3612,7 +3616,8 @@ public class Catalog {
|
||||
indexes,
|
||||
isInMemory,
|
||||
tabletType,
|
||||
dataSortInfo);
|
||||
dataSortInfo,
|
||||
compressionType);
|
||||
task.setStorageFormat(storageFormat);
|
||||
batchTask.addTask(task);
|
||||
// add to AgentTaskQueue for handling finish report.
|
||||
@ -3730,6 +3735,15 @@ public class Catalog {
|
||||
}
|
||||
olapTable.setStorageFormat(storageFormat);
|
||||
|
||||
// get compression type
|
||||
TCompressionType compressionType = TCompressionType.LZ4;
|
||||
try {
|
||||
compressionType = PropertyAnalyzer.analyzeCompressionType(properties);
|
||||
} catch (AnalysisException e) {
|
||||
throw new DdlException(e.getMessage());
|
||||
}
|
||||
olapTable.setCompressionType(compressionType);
|
||||
|
||||
// check data sort properties
|
||||
DataSortInfo dataSortInfo = PropertyAnalyzer.analyzeDataSortInfo(properties, keysType,
|
||||
keysDesc.keysColumnSize(), storageFormat);
|
||||
@ -3778,6 +3792,7 @@ public class Catalog {
|
||||
throw new DdlException(e.getMessage());
|
||||
}
|
||||
|
||||
|
||||
if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
|
||||
// if this is an unpartitioned table, we should analyze data property and replication num here.
|
||||
// if this is a partitioned table, there properties are already analyzed in RangePartitionDesc analyze phase.
|
||||
@ -3914,7 +3929,7 @@ public class Catalog {
|
||||
partitionInfo.getReplicaAllocation(partitionId),
|
||||
versionInfo, bfColumns, bfFpp,
|
||||
tabletIdSet, olapTable.getCopiedIndexes(),
|
||||
isInMemory, storageFormat, tabletType, olapTable.getDataSortInfo());
|
||||
isInMemory, storageFormat, tabletType, compressionType, olapTable.getDataSortInfo());
|
||||
olapTable.addPartition(partition);
|
||||
} else if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
|
||||
try {
|
||||
@ -3965,7 +3980,8 @@ public class Catalog {
|
||||
versionInfo, bfColumns, bfFpp,
|
||||
tabletIdSet, olapTable.getCopiedIndexes(),
|
||||
isInMemory, storageFormat,
|
||||
partitionInfo.getTabletType(entry.getValue()), olapTable.getDataSortInfo());
|
||||
partitionInfo.getTabletType(entry.getValue()),
|
||||
compressionType, olapTable.getDataSortInfo());
|
||||
olapTable.addPartition(partition);
|
||||
}
|
||||
} else {
|
||||
@ -4339,6 +4355,11 @@ public class Catalog {
|
||||
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE).append("\" = \"");
|
||||
sb.append(remoteStorageResource).append("\"");
|
||||
}
|
||||
// compression type
|
||||
if (olapTable.getCompressionType() != TCompressionType.LZ4F) {
|
||||
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPRESSION).append("\" = \"");
|
||||
sb.append(olapTable.getCompressionType()).append("\"");
|
||||
}
|
||||
|
||||
sb.append("\n)");
|
||||
} else if (table.getType() == TableType.MYSQL) {
|
||||
@ -6882,6 +6903,7 @@ public class Catalog {
|
||||
copiedTbl.isInMemory(),
|
||||
copiedTbl.getStorageFormat(),
|
||||
copiedTbl.getPartitionInfo().getTabletType(oldPartitionId),
|
||||
copiedTbl.getCompressionType(),
|
||||
copiedTbl.getDataSortInfo());
|
||||
newPartitions.add(newPartition);
|
||||
}
|
||||
|
||||
@ -48,6 +48,7 @@ import org.apache.doris.qe.OriginStatement;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TCompressionType;
|
||||
import org.apache.doris.thrift.TOlapTable;
|
||||
import org.apache.doris.thrift.TSortType;
|
||||
import org.apache.doris.thrift.TStorageFormat;
|
||||
@ -1679,6 +1680,14 @@ public class OlapTable extends Table {
|
||||
return !tempPartitions.isEmpty();
|
||||
}
|
||||
|
||||
public void setCompressionType(TCompressionType compressionType) {
|
||||
if (tableProperty == null) {
|
||||
tableProperty = new TableProperty(new HashMap<>());
|
||||
}
|
||||
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_COMPRESSION, compressionType.name());
|
||||
tableProperty.buildCompressionType();
|
||||
}
|
||||
|
||||
public void setStorageFormat(TStorageFormat storageFormat) {
|
||||
if (tableProperty == null) {
|
||||
tableProperty = new TableProperty(new HashMap<>());
|
||||
@ -1694,6 +1703,13 @@ public class OlapTable extends Table {
|
||||
return tableProperty.getStorageFormat();
|
||||
}
|
||||
|
||||
public TCompressionType getCompressionType() {
|
||||
if (tableProperty == null) {
|
||||
return TCompressionType.LZ4F;
|
||||
}
|
||||
return tableProperty.getCompressionType();
|
||||
}
|
||||
|
||||
public DataSortInfo getDataSortInfo() {
|
||||
if (tableProperty == null) {
|
||||
return new DataSortInfo(TSortType.LEXICAL, this.getKeysNum());
|
||||
|
||||
@ -25,6 +25,7 @@ import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.persist.OperationType;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
import org.apache.doris.thrift.TCompressionType;
|
||||
import org.apache.doris.thrift.TStorageFormat;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
@ -67,6 +68,8 @@ public class TableProperty implements Writable {
|
||||
*/
|
||||
private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
|
||||
|
||||
private TCompressionType compressionType = TCompressionType.LZ4F;
|
||||
|
||||
private DataSortInfo dataSortInfo = new DataSortInfo();
|
||||
|
||||
// remote storage resource, for cold data
|
||||
@ -147,6 +150,12 @@ public class TableProperty implements Writable {
|
||||
return this;
|
||||
}
|
||||
|
||||
public TableProperty buildCompressionType() {
|
||||
compressionType = TCompressionType.valueOf(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPRESSION,
|
||||
TCompressionType.LZ4F.name()));
|
||||
return this;
|
||||
}
|
||||
|
||||
public TableProperty buildStorageFormat() {
|
||||
storageFormat = TStorageFormat.valueOf(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT,
|
||||
TStorageFormat.DEFAULT.name()));
|
||||
@ -227,6 +236,10 @@ public class TableProperty implements Writable {
|
||||
return remoteStorageResource;
|
||||
}
|
||||
|
||||
public TCompressionType getCompressionType() {
|
||||
return compressionType;
|
||||
}
|
||||
|
||||
public void buildReplicaAllocation() {
|
||||
try {
|
||||
// Must copy the properties because "analyzeReplicaAllocation" with remove the property
|
||||
@ -251,7 +264,8 @@ public class TableProperty implements Writable {
|
||||
.buildInMemory()
|
||||
.buildStorageFormat()
|
||||
.buildDataSortInfo()
|
||||
.buildRemoteStorageResource();
|
||||
.buildRemoteStorageResource()
|
||||
.buildCompressionType();
|
||||
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_105) {
|
||||
// get replica num from property map and create replica allocation
|
||||
String repNum = tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM);
|
||||
|
||||
@ -32,6 +32,7 @@ import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.thrift.TCompressionType;
|
||||
import org.apache.doris.thrift.TSortType;
|
||||
import org.apache.doris.thrift.TStorageFormat;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
@ -75,6 +76,7 @@ public class PropertyAnalyzer {
|
||||
public static final String PROPERTIES_COLOCATE_WITH = "colocate_with";
|
||||
|
||||
public static final String PROPERTIES_TIMEOUT = "timeout";
|
||||
public static final String PROPERTIES_COMPRESSION = "compression";
|
||||
|
||||
public static final String PROPERTIES_DISTRIBUTION_TYPE = "distribution_type";
|
||||
public static final String PROPERTIES_SEND_CLEAR_ALTER_TASK = "send_clear_alter_tasks";
|
||||
@ -433,6 +435,35 @@ public class PropertyAnalyzer {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
// analyzeCompressionType will parse the compression type from properties
|
||||
public static TCompressionType analyzeCompressionType(Map<String, String> properties) throws AnalysisException {
|
||||
String compressionType = "";
|
||||
if (properties != null && properties.containsKey(PROPERTIES_COMPRESSION)) {
|
||||
compressionType = properties.get(PROPERTIES_COMPRESSION);
|
||||
properties.remove(PROPERTIES_COMPRESSION);
|
||||
} else {
|
||||
return TCompressionType.LZ4F;
|
||||
}
|
||||
|
||||
if (compressionType.equalsIgnoreCase("no_compression")) {
|
||||
return TCompressionType.NO_COMPRESSION;
|
||||
} else if (compressionType.equalsIgnoreCase("lz4")) {
|
||||
return TCompressionType.LZ4;
|
||||
} else if (compressionType.equalsIgnoreCase("lz4f")) {
|
||||
return TCompressionType.LZ4F;
|
||||
} else if (compressionType.equalsIgnoreCase("zlib")) {
|
||||
return TCompressionType.ZLIB;
|
||||
} else if (compressionType.equalsIgnoreCase("zstd")) {
|
||||
return TCompressionType.ZSTD;
|
||||
} else if (compressionType.equalsIgnoreCase("snappy")) {
|
||||
return TCompressionType.SNAPPY;
|
||||
} else if (compressionType.equalsIgnoreCase("default_compression")) {
|
||||
return TCompressionType.LZ4F;
|
||||
} else {
|
||||
throw new AnalysisException("unknown compression type: " + compressionType);
|
||||
}
|
||||
}
|
||||
|
||||
// analyzeStorageFormat will parse the storage format from properties
|
||||
// sql: alter table tablet_name set ("storage_format" = "v2")
|
||||
// Use this sql to convert all tablets(base and rollup index) to a new format segment
|
||||
|
||||
@ -601,7 +601,8 @@ public class ReportHandler extends Daemon {
|
||||
TStorageMedium.HDD, indexMeta.getSchema(), bfColumns, bfFpp, null,
|
||||
olapTable.getCopiedIndexes(),
|
||||
olapTable.isInMemory(),
|
||||
olapTable.getPartitionInfo().getTabletType(partitionId));
|
||||
olapTable.getPartitionInfo().getTabletType(partitionId),
|
||||
olapTable.getCompressionType());
|
||||
createReplicaTask.setIsRecoverTask(true);
|
||||
createReplicaBatchTask.addTask(createReplicaTask);
|
||||
} else {
|
||||
|
||||
@ -25,6 +25,7 @@ import org.apache.doris.catalog.KeysType;
|
||||
import org.apache.doris.common.MarkedCountDownLatch;
|
||||
import org.apache.doris.common.Status;
|
||||
import org.apache.doris.thrift.TColumn;
|
||||
import org.apache.doris.thrift.TCompressionType;
|
||||
import org.apache.doris.thrift.TCreateTabletReq;
|
||||
import org.apache.doris.thrift.TOlapTableIndex;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
@ -54,6 +55,7 @@ public class CreateReplicaTask extends AgentTask {
|
||||
private KeysType keysType;
|
||||
private TStorageType storageType;
|
||||
private TStorageMedium storageMedium;
|
||||
private TCompressionType compressionType;
|
||||
|
||||
private List<Column> columns;
|
||||
|
||||
@ -93,7 +95,7 @@ public class CreateReplicaTask extends AgentTask {
|
||||
Set<String> bfColumns, double bfFpp, MarkedCountDownLatch<Long, Long> latch,
|
||||
List<Index> indexes,
|
||||
boolean isInMemory,
|
||||
TTabletType tabletType) {
|
||||
TTabletType tabletType, TCompressionType compressionType) {
|
||||
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);
|
||||
|
||||
this.shortKeyColumnCount = shortKeyColumnCount;
|
||||
@ -104,6 +106,7 @@ public class CreateReplicaTask extends AgentTask {
|
||||
this.keysType = keysType;
|
||||
this.storageType = storageType;
|
||||
this.storageMedium = storageMedium;
|
||||
this.compressionType = compressionType;
|
||||
|
||||
this.columns = columns;
|
||||
|
||||
@ -125,7 +128,8 @@ public class CreateReplicaTask extends AgentTask {
|
||||
List<Index> indexes,
|
||||
boolean isInMemory,
|
||||
TTabletType tabletType,
|
||||
DataSortInfo dataSortInfo) {
|
||||
DataSortInfo dataSortInfo,
|
||||
TCompressionType compressionType) {
|
||||
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);
|
||||
|
||||
this.shortKeyColumnCount = shortKeyColumnCount;
|
||||
@ -136,6 +140,7 @@ public class CreateReplicaTask extends AgentTask {
|
||||
this.keysType = keysType;
|
||||
this.storageType = storageType;
|
||||
this.storageMedium = storageMedium;
|
||||
this.compressionType = compressionType;
|
||||
|
||||
this.columns = columns;
|
||||
|
||||
@ -267,6 +272,7 @@ public class CreateReplicaTask extends AgentTask {
|
||||
}
|
||||
|
||||
createTabletReq.setTabletType(tabletType);
|
||||
createTabletReq.setCompressionType(compressionType);
|
||||
return createTabletReq;
|
||||
}
|
||||
}
|
||||
|
||||
@ -138,6 +138,16 @@ public class CreateTableTest {
|
||||
.expectThrowsNoException(() -> createTable("create table test.tb7(key1 int, key2 varchar(10)) \n"
|
||||
+ "distributed by hash(key1) buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');"));
|
||||
|
||||
ExceptionChecker
|
||||
.expectThrowsNoException(() -> createTable("create table test.compression1(key1 int, key2 varchar(10)) \n"
|
||||
+ "distributed by hash(key1) buckets 1 \n"
|
||||
+ "properties('replication_num' = '1', 'compression' = 'lz4f');"));
|
||||
|
||||
ExceptionChecker
|
||||
.expectThrowsNoException(() -> createTable("create table test.compression2(key1 int, key2 varchar(10)) \n"
|
||||
+ "distributed by hash(key1) buckets 1 \n"
|
||||
+ "properties('replication_num' = '1', 'compression' = 'snappy');"));
|
||||
|
||||
ExceptionChecker
|
||||
.expectThrowsNoException(() -> createTable("create table test.tbl8\n" + "(k1 varchar(40), k2 int, v1 int)\n"
|
||||
+ "unique key(k1, k2)\n"
|
||||
|
||||
@ -28,6 +28,7 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.MarkedCountDownLatch;
|
||||
import org.apache.doris.thrift.TAgentTaskRequest;
|
||||
import org.apache.doris.thrift.TBackend;
|
||||
import org.apache.doris.thrift.TCompressionType;
|
||||
import org.apache.doris.thrift.TPriority;
|
||||
import org.apache.doris.thrift.TPushType;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
@ -110,7 +111,7 @@ public class AgentTaskTest {
|
||||
version, KeysType.AGG_KEYS,
|
||||
storageType, TStorageMedium.SSD,
|
||||
columns, null, 0, latch, null,
|
||||
false, TTabletType.TABLET_TYPE_DISK);
|
||||
false, TTabletType.TABLET_TYPE_DISK, TCompressionType.LZ4F);
|
||||
|
||||
// drop
|
||||
dropTask = new DropReplicaTask(backendId1, tabletId1, schemaHash1);
|
||||
|
||||
Reference in New Issue
Block a user