[refactor][storage format] Forbidden rowset v1 (#9248)

- Force change the existing olaptable's storage format from V1 to V2
- Forbidden to create new olap table with storage format == v1 OR do schema change that want to create new v1 format
This commit is contained in:
yiguolei
2022-05-04 17:32:20 +08:00
committed by GitHub
parent ac3f981175
commit f1aa9668af
13 changed files with 37 additions and 457 deletions

View File

@ -221,6 +221,10 @@ public class TableProperty implements Writable {
}
public TStorageFormat getStorageFormat() {
// Force convert all V1 table to V2 table
if (TStorageFormat.V1 == storageFormat) {
return TStorageFormat.V2;
}
return storageFormat;
}

View File

@ -1446,14 +1446,6 @@ public class Config extends ConfigBase {
@ConfField
public static String http_api_extra_base_path = "";
/**
* Whether to support the creation of alpha rowset tables.
* The default is false and should only be used in emergency situations,
* this config should be remove in some future version
*/
@ConfField
public static boolean enable_alpha_rowset = false;
/**
* If set to true, FE will be started in BDBJE debug mode
*/

View File

@ -447,11 +447,8 @@ public class PropertyAnalyzer {
}
if (storageFormat.equalsIgnoreCase("v1")) {
if (!Config.enable_alpha_rowset) {
throw new AnalysisException("Storage format V1 has been deprecated since version 0.14," +
" please use V2 instead");
}
return TStorageFormat.V1;
throw new AnalysisException("Storage format V1 has been deprecated since version 0.14, "
+ "please use V2 instead");
} else if (storageFormat.equalsIgnoreCase("v2")) {
return TStorageFormat.V2;
} else if (storageFormat.equalsIgnoreCase("default")) {

View File

@ -28,18 +28,23 @@ import org.apache.logging.log4j.Logger;
// Now the flag is represented by 64-bit long type, each bit can be used to control
// one behavior. The first bit is used for set default rowset type to beta flag.
public class HeartbeatFlags {
private static final Logger LOG = LogManager.getLogger(HeartbeatFlags.class);
private static final Logger LOG = LogManager.getLogger(HeartbeatFlags.class);
public static boolean isValidRowsetType(String rowsetType) {
return "alpha".equalsIgnoreCase(rowsetType) || "beta".equalsIgnoreCase(rowsetType);
}
public static boolean isValidRowsetType(String rowsetType) {
return "alpha".equalsIgnoreCase(rowsetType) || "beta".equalsIgnoreCase(rowsetType);
}
public long getHeartbeatFlags() {
long heartbeatFlags = 0;
if ("beta".equalsIgnoreCase(GlobalVariable.defaultRowsetType)) {
heartbeatFlags |= HeartbeatServiceConstants.IS_SET_DEFAULT_ROWSET_TO_BETA_BIT;
}
public long getHeartbeatFlags() {
long heartbeatFlags = 0;
// If user set default rowset type to ALPHA, then convert it to beta, because
// alpha rowset will be removed
if ("beta".equalsIgnoreCase(GlobalVariable.defaultRowsetType)
|| "alpha".equalsIgnoreCase(GlobalVariable.defaultRowsetType)) {
heartbeatFlags |= HeartbeatServiceConstants.IS_SET_DEFAULT_ROWSET_TO_BETA_BIT;
} else {
throw new IllegalArgumentException("unknown DEFAULT_ROWSET_TYPE in global variable");
}
return heartbeatFlags;
}
return heartbeatFlags;
}
}

View File

@ -24,7 +24,6 @@ import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TAgentServiceVersion;
import org.apache.doris.thrift.TAgentTaskRequest;
import org.apache.doris.thrift.TAlterTabletReq;
import org.apache.doris.thrift.TAlterTabletReqV2;
import org.apache.doris.thrift.TCheckConsistencyReq;
import org.apache.doris.thrift.TClearAlterTaskRequest;
@ -235,26 +234,6 @@ public class AgentBatchTask implements Runnable {
tAgentTaskRequest.setCloneReq(request);
return tAgentTaskRequest;
}
case ROLLUP: {
CreateRollupTask rollupTask = (CreateRollupTask) task;
TAlterTabletReq request = rollupTask.toThrift();
if (LOG.isDebugEnabled()) {
LOG.debug(request.toString());
}
tAgentTaskRequest.setAlterTabletReq(request);
tAgentTaskRequest.setResourceInfo(rollupTask.getResourceInfo());
return tAgentTaskRequest;
}
case SCHEMA_CHANGE: {
SchemaChangeTask schemaChangeTask = (SchemaChangeTask) task;
TAlterTabletReq request = schemaChangeTask.toThrift();
if (LOG.isDebugEnabled()) {
LOG.debug(request.toString());
}
tAgentTaskRequest.setAlterTabletReq(request);
tAgentTaskRequest.setResourceInfo(schemaChangeTask.getResourceInfo());
return tAgentTaskRequest;
}
case STORAGE_MEDIUM_MIGRATE: {
StorageMediaMigrationTask migrationTask = (StorageMediaMigrationTask) task;
TStorageMediumMigrateReq request = migrationTask.toThrift();

View File

@ -76,8 +76,10 @@ public class CreateReplicaTask extends AgentTask {
// if base tablet id is set, BE will create the replica on same disk as this base tablet
private long baseTabletId = -1;
private int baseSchemaHash = -1;
private TStorageFormat storageFormat = null;
// V2 is beta rowset, v1 is alpha rowset
// TODO should unify the naming of v1(alpha rowset), v2(beta rowset), it is very confused to read code
private TStorageFormat storageFormat = TStorageFormat.V2;
// true if this task is created by recover request(See comment of Config.recover_with_empty_tablet)
private boolean isRecoverTask = false;
@ -187,7 +189,7 @@ public class CreateReplicaTask extends AgentTask {
}
public void setStorageFormat(TStorageFormat storageFormat) {
this.storageFormat = storageFormat;
this.storageFormat = storageFormat;
}
public TCreateTabletReq toThrift() {

View File

@ -1,157 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import org.apache.doris.catalog.Column;
import org.apache.doris.thrift.TAlterTabletReq;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TCreateTabletReq;
import org.apache.doris.thrift.TKeysType;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTabletSchema;
import org.apache.doris.thrift.TTaskType;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@Deprecated
public class CreateRollupTask extends AgentTask {
private long baseTableId;
private long baseTabletId;
private long rollupReplicaId;
private int rollupSchemaHash;
private int baseSchemaHash;
private short shortKeyColumnCount;
private TStorageType storageType;
private TKeysType keysType;
private List<Column> rollupColumns;
// bloom filter columns
private Set<String> bfColumns;
private double bfFpp;
public CreateRollupTask(TResourceInfo resourceInfo, long backendId, long dbId, long tableId,
long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId,
long baseTabletId, long rollupReplicaId, short shortKeyColumnCount,
int rollupSchemaHash, int baseSchemaHash, TStorageType storageType,
List<Column> rollupColumns, Set<String> bfColumns, double bfFpp, TKeysType keysType) {
super(resourceInfo, backendId, TTaskType.ROLLUP, dbId, tableId, partitionId, rollupIndexId, rollupTabletId);
this.baseTableId = baseIndexId;
this.baseTabletId = baseTabletId;
this.rollupReplicaId = rollupReplicaId;
this.rollupSchemaHash = rollupSchemaHash;
this.baseSchemaHash = baseSchemaHash;
this.shortKeyColumnCount = shortKeyColumnCount;
this.storageType = storageType;
this.keysType = keysType;
this.rollupColumns = rollupColumns;
this.bfColumns = bfColumns;
this.bfFpp = bfFpp;
}
public TAlterTabletReq toThrift() {
TAlterTabletReq tAlterTabletReq = new TAlterTabletReq();
tAlterTabletReq.setBaseTabletId(baseTabletId);
tAlterTabletReq.setBaseSchemaHash(baseSchemaHash);
// make 1 TCreateTableReq
TCreateTabletReq createTabletReq = new TCreateTabletReq();
createTabletReq.setTabletId(tabletId);
// no need to set version
// schema
TTabletSchema tSchema = new TTabletSchema();
tSchema.setShortKeyColumnCount(shortKeyColumnCount);
tSchema.setSchemaHash(rollupSchemaHash);
tSchema.setStorageType(storageType);
tSchema.setKeysType(keysType);
List<TColumn> tColumns = new ArrayList<TColumn>();
int deleteSign = -1;
for (int i = 0; i < rollupColumns.size(); i++) {
Column column = rollupColumns.get(i);
TColumn tColumn = column.toThrift();
// is bloom filter column
if (bfColumns != null && bfColumns.contains(column.getName())) {
tColumn.setIsBloomFilterColumn(true);
}
tColumn.setVisible(column.isVisible());
if (column.isDeleteSignColumn()) {
deleteSign = i;
}
tColumns.add(tColumn);
}
tSchema.setColumns(tColumns);
tSchema.setDeleteSignIdx(deleteSign);
if (bfColumns != null) {
tSchema.setBloomFilterFpp(bfFpp);
}
createTabletReq.setTabletSchema(tSchema);
createTabletReq.setTableId(tableId);
createTabletReq.setPartitionId(partitionId);
tAlterTabletReq.setNewTabletReq(createTabletReq);
return tAlterTabletReq;
}
public long getBaseTableId() {
return baseTableId;
}
public long getBaseTabletId() {
return baseTabletId;
}
public long getRollupReplicaId() {
return rollupReplicaId;
}
public int getRollupSchemaHash() {
return rollupSchemaHash;
}
public int getBaseSchemaHash() {
return baseSchemaHash;
}
public short getShortKeyColumnCount() {
return shortKeyColumnCount;
}
public TStorageType getStorageType() {
return storageType;
}
public List<Column> getRollupColumns() {
return rollupColumns;
}
}

View File

@ -1,141 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import org.apache.doris.catalog.Column;
import org.apache.doris.thrift.TAlterTabletReq;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TCreateTabletReq;
import org.apache.doris.thrift.TKeysType;
import org.apache.doris.thrift.TResourceInfo;
import org.apache.doris.thrift.TStorageType;
import org.apache.doris.thrift.TTabletSchema;
import org.apache.doris.thrift.TTaskType;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@Deprecated
public class SchemaChangeTask extends AgentTask {
private long baseReplicaId;
private int baseSchemaHash;
private TStorageType storageType;
private TKeysType keysType;
private int newSchemaHash;
private short newShortKeyColumnCount;
private List<Column> newColumns;
// bloom filter columns
private Set<String> bfColumns;
private double bfFpp;
public SchemaChangeTask(TResourceInfo resourceInfo, long backendId, long dbId, long tableId,
long partitionId, long indexId, long baseTabletId, long baseReplicaId,
List<Column> newColumns, int newSchemaHash, int baseSchemaHash,
short newShortKeyColumnCount, TStorageType storageType,
Set<String> bfColumns, double bfFpp, TKeysType keysType) {
super(resourceInfo, backendId, TTaskType.SCHEMA_CHANGE, dbId, tableId, partitionId, indexId, baseTabletId);
this.baseReplicaId = baseReplicaId;
this.baseSchemaHash = baseSchemaHash;
this.storageType = storageType;
this.keysType = keysType;
this.newSchemaHash = newSchemaHash;
this.newShortKeyColumnCount = newShortKeyColumnCount;
this.newColumns = newColumns;
this.bfColumns = bfColumns;
this.bfFpp = bfFpp;
}
public TAlterTabletReq toThrift() {
TAlterTabletReq tAlterTabletReq = new TAlterTabletReq();
tAlterTabletReq.setBaseTabletId(tabletId);
tAlterTabletReq.setBaseSchemaHash(baseSchemaHash);
// make 1 TCreateTableReq
TCreateTabletReq createTabletReq = new TCreateTabletReq();
createTabletReq.setTabletId(tabletId);
// no need to set version
// schema
TTabletSchema tSchema = new TTabletSchema();
tSchema.setShortKeyColumnCount(newShortKeyColumnCount);
tSchema.setSchemaHash(newSchemaHash);
tSchema.setStorageType(storageType);
tSchema.setKeysType(keysType);
int deleteSign = -1;
List<TColumn> tColumns = new ArrayList<TColumn>();
for (int i = 0; i < newColumns.size(); i++) {
Column column = newColumns.get(i);
TColumn tColumn = column.toThrift();
// is bloom filter column
if (bfColumns != null && bfColumns.contains(column.getName())) {
tColumn.setIsBloomFilterColumn(true);
}
tColumn.setVisible(column.isVisible());
if (column.isDeleteSignColumn()) {
deleteSign = i;
}
tColumns.add(tColumn);
}
tSchema.setColumns(tColumns);
tSchema.setDeleteSignIdx(deleteSign);
if (bfColumns != null) {
tSchema.setBloomFilterFpp(bfFpp);
}
createTabletReq.setTabletSchema(tSchema);
createTabletReq.setTableId(tableId);
createTabletReq.setPartitionId(partitionId);
tAlterTabletReq.setNewTabletReq(createTabletReq);
return tAlterTabletReq;
}
public long getReplicaId() {
return baseReplicaId;
}
public int getSchemaHash() {
return newSchemaHash;
}
public int getBaseSchemaHash() {
return baseSchemaHash;
}
public short getNewShortKeyColumnCount() {
return newShortKeyColumnCount;
}
public TStorageType getStorageType() {
return storageType;
}
public List<Column> getColumns() {
return newColumns;
}
}

View File

@ -57,7 +57,6 @@ public class AlterJobV2Test {
FeConstants.runningUnitTest = true;
UtFrameUtils.createDorisCluster(runningDir);
Config.enable_alpha_rowset = true;
// create connect context
connectContext = UtFrameUtils.createDefaultCtx();
@ -67,8 +66,6 @@ public class AlterJobV2Test {
Catalog.getCurrentCatalog().createDb(createDbStmt);
createTable("CREATE TABLE test.schema_change_test(k1 int, k2 int, k3 int) distributed by hash(k1) buckets 3 properties('replication_num' = '1');");
createTable("CREATE TABLE test.segmentv2(k1 int, k2 int, v1 int sum) distributed by hash(k1) buckets 3 properties('replication_num' = '1', 'storage_format' = 'v1');");
}
@AfterClass
@ -145,59 +142,6 @@ public class AlterJobV2Test {
System.out.println(showResultSet.getResultRows());
}
@Test
@Deprecated
public void testAlterSegmentV2() throws Exception {
// TODO this test should remove after we disable segment v1 completely
Database db = Catalog.getCurrentCatalog().getDbOrMetaException("default_cluster:test");
OlapTable tbl = db.getTableOrMetaException("segmentv2", Table.TableType.OLAP);
Assert.assertEquals(TStorageFormat.V1, tbl.getTableProperty().getStorageFormat());
// 1. create a rollup r1
String alterStmtStr = "alter table test.segmentv2 add rollup r1(k2, v1)";
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);
String sql = "select k2, sum(v1) from test.segmentv2 group by k2";
String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql);
Assert.assertTrue(explainString.contains("rollup: r1"));
// 2. create a rollup with segment v2
alterStmtStr = "alter table test.segmentv2 add rollup segmentv2(k2, v1) properties('storage_format' = 'v2')";
alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
alterJobs = Catalog.getCurrentCatalog().getMaterializedViewHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql);
Assert.assertTrue(explainString.contains("rollup: r1"));
// set use_v2_rollup = true;
connectContext.getSessionVariable().setUseV2Rollup(true);
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, "explain " + sql);
Assert.assertTrue(explainString.contains("rollup: __v2_segmentv2"));
// 3. process alter segment v2
alterStmtStr = "alter table test.segmentv2 set ('storage_format' = 'v2');";
alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(alterStmtStr, connectContext);
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
// 4. check alter job
alterJobs = Catalog.getCurrentCatalog().getSchemaChangeHandler().getAlterJobsV2();
waitAlterJobDone(alterJobs);
// 5. check storage format of table
Assert.assertEquals(TStorageFormat.V2, tbl.getTableProperty().getStorageFormat());
// 6. alter again, that no job will be created
try {
Catalog.getCurrentCatalog().getAlterInstance().processAlterTable(alterTableStmt);
Assert.fail();
} catch (DdlException e) {
Assert.assertTrue(e.getMessage().contains("Nothing is changed"));
}
}
@Test
public void testDupTableSchemaChange() throws Exception {

View File

@ -28,7 +28,6 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.thrift.TAgentTaskRequest;
import org.apache.doris.thrift.TBackend;
import org.apache.doris.thrift.TKeysType;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.thrift.TPushType;
import org.apache.doris.thrift.TStorageMedium;
@ -86,8 +85,6 @@ public class AgentTaskTest {
private AgentTask dropTask;
private AgentTask pushTask;
private AgentTask cloneTask;
private AgentTask rollupTask;
private AgentTask schemaChangeTask;
private AgentTask cancelDeleteTask;
private AgentTask storageMediaMigrationTask;
@ -130,18 +127,6 @@ public class AgentTaskTest {
new CloneTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, schemaHash1,
Arrays.asList(new TBackend("host1", 8290, 8390)), TStorageMedium.HDD, -1, 3600);
// rollup
rollupTask =
new CreateRollupTask(null, backendId1, dbId, tableId, partitionId, indexId2, indexId1,
tabletId2, tabletId1, replicaId2, shortKeyNum, schemaHash2, schemaHash1,
storageType, columns, null, 0, TKeysType.AGG_KEYS);
// schemaChange
schemaChangeTask =
new SchemaChangeTask(null, backendId1, dbId, tableId, partitionId, indexId1,
tabletId1, replicaId1, columns, schemaHash2, schemaHash1,
shortKeyNum, storageType, null, 0, TKeysType.AGG_KEYS);
// storageMediaMigrationTask
storageMediaMigrationTask =
new StorageMediaMigrationTask(backendId1, tabletId1, schemaHash1, TStorageMedium.HDD);
@ -158,17 +143,12 @@ public class AgentTaskTest {
agentBatchTask.addTask(createReplicaTask);
Assert.assertEquals(1, agentBatchTask.getTaskNum());
agentBatchTask.addTask(rollupTask);
Assert.assertEquals(2, agentBatchTask.getTaskNum());
List<AgentTask> allTasks = agentBatchTask.getAllTasks();
Assert.assertEquals(2, allTasks.size());
Assert.assertEquals(1, allTasks.size());
for (AgentTask agentTask : allTasks) {
if (agentTask instanceof CreateReplicaTask) {
Assert.assertEquals(createReplicaTask, agentTask);
} else if (agentTask instanceof CreateRollupTask) {
Assert.assertEquals(rollupTask, agentTask);
} else {
Assert.fail();
}
@ -206,18 +186,6 @@ public class AgentTaskTest {
Assert.assertEquals(cloneTask.getSignature(), request4.getSignature());
Assert.assertNotNull(request4.getCloneReq());
// rollup
TAgentTaskRequest request5 = (TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, rollupTask);
Assert.assertEquals(TTaskType.ROLLUP, request5.getTaskType());
Assert.assertEquals(rollupTask.getSignature(), request5.getSignature());
Assert.assertNotNull(request5.getAlterTabletReq());
// schemaChange
TAgentTaskRequest request6 = (TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, schemaChangeTask);
Assert.assertEquals(TTaskType.SCHEMA_CHANGE, request6.getTaskType());
Assert.assertEquals(schemaChangeTask.getSignature(), request6.getSignature());
Assert.assertNotNull(request6.getAlterTabletReq());
// storageMediaMigrationTask
TAgentTaskRequest request7 =
(TAgentTaskRequest) toAgentTaskRequest.invoke(agentBatchTask, storageMediaMigrationTask);
@ -242,24 +210,18 @@ public class AgentTaskTest {
AgentTask task = AgentTaskQueue.getTask(backendId1, TTaskType.CREATE, createReplicaTask.getSignature());
Assert.assertEquals(createReplicaTask, task);
// diff
AgentTaskQueue.addTask(rollupTask);
Map<TTaskType, Set<Long>> runningTasks = new HashMap<TTaskType, Set<Long>>();
List<AgentTask> diffTasks = AgentTaskQueue.getDiffTasks(backendId1, runningTasks);
Assert.assertEquals(2, diffTasks.size());
Assert.assertEquals(1, diffTasks.size());
Set<Long> set = new HashSet<Long>();
set.add(createReplicaTask.getSignature());
runningTasks.put(TTaskType.CREATE, set);
diffTasks = AgentTaskQueue.getDiffTasks(backendId1, runningTasks);
Assert.assertEquals(1, diffTasks.size());
Assert.assertEquals(rollupTask, diffTasks.get(0));
Assert.assertEquals(0, diffTasks.size());
// remove
AgentTaskQueue.removeTask(backendId1, TTaskType.CREATE, createReplicaTask.getSignature());
Assert.assertEquals(1, AgentTaskQueue.getTaskNum());
AgentTaskQueue.removeTask(backendId1, TTaskType.ROLLUP, rollupTask.getSignature());
Assert.assertEquals(0, AgentTaskQueue.getTaskNum());
}