From b91bb9f5032790bc8ac9d17174589340d0d943cb Mon Sep 17 00:00:00 2001 From: Chenyang Sun Date: Thu, 17 Aug 2023 18:02:34 +0800 Subject: [PATCH] [fix](alter table property) fix alter property if rpc failed (#22845) * fix alter property * add regression case * do not repeat --- .../java/org/apache/doris/alter/Alter.java | 22 +---- .../doris/alter/SchemaChangeHandler.java | 95 ++----------------- .../java/org/apache/doris/catalog/Env.java | 6 +- .../doris/task/UpdateTabletMetaInfoTask.java | 6 +- .../test_table_level_compaction_policy.groovy | 29 +++++- 5 files changed, 47 insertions(+), 111 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index f8c90b8a52..8a423e5894 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -211,27 +211,13 @@ public class Alter { } else if (currentAlterOps.checkIsBeingSynced(alterClauses)) { olapTable.setIsBeingSynced(currentAlterOps.isBeingSynced(alterClauses)); needProcessOutsideTableLock = true; - } else if (currentAlterOps.checkCompactionPolicy(alterClauses) - && currentAlterOps.getCompactionPolicy(alterClauses) != olapTable.getCompactionPolicy()) { - olapTable.setCompactionPolicy(currentAlterOps.getCompactionPolicy(alterClauses)); + } else if (currentAlterOps.checkCompactionPolicy(alterClauses)) { needProcessOutsideTableLock = true; - } else if (currentAlterOps.checkTimeSeriesCompactionGoalSizeMbytes(alterClauses) - && currentAlterOps.getTimeSeriesCompactionGoalSizeMbytes(alterClauses) - != olapTable.getTimeSeriesCompactionGoalSizeMbytes()) { - olapTable.setTimeSeriesCompactionGoalSizeMbytes(currentAlterOps - .getTimeSeriesCompactionGoalSizeMbytes(alterClauses)); + } else if (currentAlterOps.checkTimeSeriesCompactionGoalSizeMbytes(alterClauses)) { needProcessOutsideTableLock = true; - } else if (currentAlterOps.checkTimeSeriesCompactionFileCountThreshold(alterClauses) - && currentAlterOps.getTimeSeriesCompactionFileCountThreshold(alterClauses) - != olapTable.getTimeSeriesCompactionFileCountThreshold()) { - olapTable.setTimeSeriesCompactionFileCountThreshold(currentAlterOps - .getTimeSeriesCompactionFileCountThreshold(alterClauses)); + } else if (currentAlterOps.checkTimeSeriesCompactionFileCountThreshold(alterClauses)) { needProcessOutsideTableLock = true; - } else if (currentAlterOps.checkTimeSeriesCompactionTimeThresholdSeconds(alterClauses) - && currentAlterOps.getTimeSeriesCompactionTimeThresholdSeconds(alterClauses) - != olapTable.getTimeSeriesCompactionTimeThresholdSeconds()) { - olapTable.setTimeSeriesCompactionTimeThresholdSeconds(currentAlterOps - .getTimeSeriesCompactionTimeThresholdSeconds(alterClauses)); + } else if (currentAlterOps.checkTimeSeriesCompactionTimeThresholdSeconds(alterClauses)) { needProcessOutsideTableLock = true; } else if (currentAlterOps.checkBinlogConfigChange(alterClauses)) { if (!Config.enable_feature_binlog) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 9130555414..b555516bc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2218,7 +2218,8 @@ public class SchemaChangeHandler extends AlterHandler { for (String partitionName : partitionNames) { try { - updatePartitionProperties(db, olapTable.getName(), partitionName, storagePolicyId, isInMemory, null); + updatePartitionProperties(db, olapTable.getName(), partitionName, storagePolicyId, + isInMemory, null, null, null); } catch (Exception e) { String errMsg = "Failed to update partition[" + partitionName + "]'s 'in_memory' property. " + "The reason is [" + e.getMessage() + "]"; @@ -2227,86 +2228,6 @@ public class SchemaChangeHandler extends AlterHandler { } } - /** - * Update one specified partition's properties by partition name of table - * This operation may return partial successfully, with an exception to inform user to retry - */ - public void updatePartitionProperties(Database db, String tableName, String partitionName, long storagePolicyId, - int isInMemory, BinlogConfig binlogConfig) throws UserException { - // be id -> - Map>> beIdToTabletIdWithHash = Maps.newHashMap(); - OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName, Table.TableType.OLAP); - olapTable.readLock(); - try { - Partition partition = olapTable.getPartition(partitionName); - if (partition == null) { - throw new DdlException( - "Partition[" + partitionName + "] does not exist in table[" + olapTable.getName() + "]"); - } - - for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) { - int schemaHash = olapTable.getSchemaHashByIndexId(index.getId()); - for (Tablet tablet : index.getTablets()) { - for (Replica replica : tablet.getReplicas()) { - Set> tabletIdWithHash = beIdToTabletIdWithHash.computeIfAbsent( - replica.getBackendId(), k -> Sets.newHashSet()); - tabletIdWithHash.add(Pair.of(tablet.getId(), schemaHash)); - } - } - } - } finally { - olapTable.readUnlock(); - } - - int totalTaskNum = beIdToTabletIdWithHash.keySet().size(); - MarkedCountDownLatch>> countDownLatch = new MarkedCountDownLatch<>(totalTaskNum); - AgentBatchTask batchTask = new AgentBatchTask(); - for (Map.Entry>> kv : beIdToTabletIdWithHash.entrySet()) { - countDownLatch.addMark(kv.getKey(), kv.getValue()); - UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(kv.getKey(), kv.getValue(), isInMemory, - storagePolicyId, binlogConfig, countDownLatch); - batchTask.addTask(task); - } - if (!FeConstants.runningUnitTest) { - // send all tasks and wait them finished - AgentTaskQueue.addBatchTask(batchTask); - AgentTaskExecutor.submit(batchTask); - LOG.info("send update tablet meta task for table {}, partitions {}, number: {}", tableName, partitionName, - batchTask.getTaskNum()); - - // estimate timeout - long timeout = DbUtil.getCreateReplicasTimeoutMs(totalTaskNum); - boolean ok = false; - try { - ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - LOG.warn("InterruptedException: ", e); - } - - if (!ok || !countDownLatch.getStatus().ok()) { - String errMsg = "Failed to update partition[" + partitionName + "]. tablet meta."; - // clear tasks - AgentTaskQueue.removeBatchTask(batchTask, TTaskType.UPDATE_TABLET_META_INFO); - - if (!countDownLatch.getStatus().ok()) { - errMsg += " Error: " + countDownLatch.getStatus().getErrorMsg(); - } else { - // only show at most 3 results - List subList = countDownLatch.getLeftMarks().stream().limit(3) - .map(item -> "(backendId = " + item.getKey() + ", tabletsWithHash = " - + item.getValue() + ")") - .collect(Collectors.toList()); - if (!subList.isEmpty()) { - errMsg += " Unfinished: " + Joiner.on(", ").join(subList); - } - } - errMsg += ". This operation maybe partial successfully, You should retry until success."; - LOG.warn(errMsg); - throw new DdlException(errMsg); - } - } - } - /** * Update one specified partition's properties by partition name of table * This operation may return partial successfully, with an exception to inform user to retry @@ -2372,12 +2293,13 @@ public class SchemaChangeHandler extends AlterHandler { if (!countDownLatch.getStatus().ok()) { errMsg += " Error: " + countDownLatch.getStatus().getErrorMsg(); } else { - List>>> unfinishedMarks = countDownLatch.getLeftMarks(); // only show at most 3 results - List>>> subList = unfinishedMarks.subList(0, - Math.min(unfinishedMarks.size(), 3)); + List subList = countDownLatch.getLeftMarks().stream().limit(3) + .map(item -> "(backendId = " + item.getKey() + ", tabletsWithHash = " + + item.getValue() + ")") + .collect(Collectors.toList()); if (!subList.isEmpty()) { - errMsg += " Unfinished mark: " + Joiner.on(", ").join(subList); + errMsg += " Unfinished: " + Joiner.on(", ").join(subList); } } errMsg += ". This operation maybe partial successfully, You should retry until success."; @@ -2988,7 +2910,8 @@ public class SchemaChangeHandler extends AlterHandler { for (Partition partition : partitions) { - updatePartitionProperties(db, olapTable.getName(), partition.getName(), -1, -1, newBinlogConfig); + updatePartitionProperties(db, olapTable.getName(), partition.getName(), -1, -1, + newBinlogConfig, null, null); } olapTable.writeLockOrDdlException(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 84875a82dc..37e6d05f51 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -4577,7 +4577,11 @@ public class Env { } tableProperty.buildInMemory() .buildStoragePolicy() - .buildIsBeingSynced(); + .buildIsBeingSynced() + .buildCompactionPolicy() + .buildTimeSeriesCompactionGoalSizeMbytes() + .buildTimeSeriesCompactionFileCountThreshold() + .buildTimeSeriesCompactionTimeThresholdSeconds(); // need to update partition info meta for (Partition partition : table.getPartitions()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java index 6b12c34e0d..7b368cbf7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java @@ -83,11 +83,7 @@ public class UpdateTabletMetaInfoTask extends AgentTask { MarkedCountDownLatch>> latch, String compactionPolicy, Map timeSeriesCompactionConfig) { - this(backendId, tableIdWithSchemaHash); - this.storagePolicyId = storagePolicyId; - this.inMemory = inMemory; - this.binlogConfig = binlogConfig; - this.latch = latch; + this(backendId, tableIdWithSchemaHash, inMemory, storagePolicyId, binlogConfig, latch); this.compactionPolicy = compactionPolicy; this.timeSeriesCompactionConfig = timeSeriesCompactionConfig; } diff --git a/regression-test/suites/compaction/test_table_level_compaction_policy.groovy b/regression-test/suites/compaction/test_table_level_compaction_policy.groovy index 5bb33eebf2..e0b606af9d 100644 --- a/regression-test/suites/compaction/test_table_level_compaction_policy.groovy +++ b/regression-test/suites/compaction/test_table_level_compaction_policy.groovy @@ -65,7 +65,7 @@ suite("test_table_level_compaction_policy") { logger.info("${showResult3}") assertTrue(showResult3.toString().containsIgnoreCase('"time_series_compaction_file_count_threshold" = "6000"')) - sql """ + sql """ alter table ${tableName} set ("time_series_compaction_time_threshold_seconds" = "3000") """ sql """sync""" @@ -74,6 +74,33 @@ suite("test_table_level_compaction_policy") { logger.info("${showResult4}") assertTrue(showResult4.toString().containsIgnoreCase('"time_series_compaction_time_threshold_seconds" = "3000"')) + sql """ + alter table ${tableName} set ("time_series_compaction_goal_size_mbytes" = "1024") + """ + sql """sync""" + + def showResult6 = sql """show create table ${tableName}""" + logger.info("${showResult6}") + assertTrue(showResult6.toString().containsIgnoreCase('"time_series_compaction_goal_size_mbytes" = "1024"')) + + sql """ + alter table ${tableName} set ("time_series_compaction_file_count_threshold" = "6000") + """ + sql """sync""" + + def showResult7 = sql """show create table ${tableName}""" + logger.info("${showResult7}") + assertTrue(showResult7.toString().containsIgnoreCase('"time_series_compaction_file_count_threshold" = "6000"')) + + sql """ + alter table ${tableName} set ("time_series_compaction_time_threshold_seconds" = "3000") + """ + sql """sync""" + + def showResult8 = sql """show create table ${tableName}""" + logger.info("${showResult8}") + assertTrue(showResult8.toString().containsIgnoreCase('"time_series_compaction_time_threshold_seconds" = "3000"')) + sql """ DROP TABLE IF EXISTS ${tableName} """ sql """sync"""