[fix](alter table property) fix alter property if rpc failed (#22845)
* fix alter property * add regression case * do not repeat
This commit is contained in:
@ -211,27 +211,13 @@ public class Alter {
|
||||
} else if (currentAlterOps.checkIsBeingSynced(alterClauses)) {
|
||||
olapTable.setIsBeingSynced(currentAlterOps.isBeingSynced(alterClauses));
|
||||
needProcessOutsideTableLock = true;
|
||||
} else if (currentAlterOps.checkCompactionPolicy(alterClauses)
|
||||
&& currentAlterOps.getCompactionPolicy(alterClauses) != olapTable.getCompactionPolicy()) {
|
||||
olapTable.setCompactionPolicy(currentAlterOps.getCompactionPolicy(alterClauses));
|
||||
} else if (currentAlterOps.checkCompactionPolicy(alterClauses)) {
|
||||
needProcessOutsideTableLock = true;
|
||||
} else if (currentAlterOps.checkTimeSeriesCompactionGoalSizeMbytes(alterClauses)
|
||||
&& currentAlterOps.getTimeSeriesCompactionGoalSizeMbytes(alterClauses)
|
||||
!= olapTable.getTimeSeriesCompactionGoalSizeMbytes()) {
|
||||
olapTable.setTimeSeriesCompactionGoalSizeMbytes(currentAlterOps
|
||||
.getTimeSeriesCompactionGoalSizeMbytes(alterClauses));
|
||||
} else if (currentAlterOps.checkTimeSeriesCompactionGoalSizeMbytes(alterClauses)) {
|
||||
needProcessOutsideTableLock = true;
|
||||
} else if (currentAlterOps.checkTimeSeriesCompactionFileCountThreshold(alterClauses)
|
||||
&& currentAlterOps.getTimeSeriesCompactionFileCountThreshold(alterClauses)
|
||||
!= olapTable.getTimeSeriesCompactionFileCountThreshold()) {
|
||||
olapTable.setTimeSeriesCompactionFileCountThreshold(currentAlterOps
|
||||
.getTimeSeriesCompactionFileCountThreshold(alterClauses));
|
||||
} else if (currentAlterOps.checkTimeSeriesCompactionFileCountThreshold(alterClauses)) {
|
||||
needProcessOutsideTableLock = true;
|
||||
} else if (currentAlterOps.checkTimeSeriesCompactionTimeThresholdSeconds(alterClauses)
|
||||
&& currentAlterOps.getTimeSeriesCompactionTimeThresholdSeconds(alterClauses)
|
||||
!= olapTable.getTimeSeriesCompactionTimeThresholdSeconds()) {
|
||||
olapTable.setTimeSeriesCompactionTimeThresholdSeconds(currentAlterOps
|
||||
.getTimeSeriesCompactionTimeThresholdSeconds(alterClauses));
|
||||
} else if (currentAlterOps.checkTimeSeriesCompactionTimeThresholdSeconds(alterClauses)) {
|
||||
needProcessOutsideTableLock = true;
|
||||
} else if (currentAlterOps.checkBinlogConfigChange(alterClauses)) {
|
||||
if (!Config.enable_feature_binlog) {
|
||||
|
||||
@ -2218,7 +2218,8 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
|
||||
for (String partitionName : partitionNames) {
|
||||
try {
|
||||
updatePartitionProperties(db, olapTable.getName(), partitionName, storagePolicyId, isInMemory, null);
|
||||
updatePartitionProperties(db, olapTable.getName(), partitionName, storagePolicyId,
|
||||
isInMemory, null, null, null);
|
||||
} catch (Exception e) {
|
||||
String errMsg = "Failed to update partition[" + partitionName + "]'s 'in_memory' property. "
|
||||
+ "The reason is [" + e.getMessage() + "]";
|
||||
@ -2227,86 +2228,6 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update one specified partition's properties by partition name of table
|
||||
* This operation may return partial successfully, with an exception to inform user to retry
|
||||
*/
|
||||
public void updatePartitionProperties(Database db, String tableName, String partitionName, long storagePolicyId,
|
||||
int isInMemory, BinlogConfig binlogConfig) throws UserException {
|
||||
// be id -> <tablet id,schemaHash>
|
||||
Map<Long, Set<Pair<Long, Integer>>> beIdToTabletIdWithHash = Maps.newHashMap();
|
||||
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName, Table.TableType.OLAP);
|
||||
olapTable.readLock();
|
||||
try {
|
||||
Partition partition = olapTable.getPartition(partitionName);
|
||||
if (partition == null) {
|
||||
throw new DdlException(
|
||||
"Partition[" + partitionName + "] does not exist in table[" + olapTable.getName() + "]");
|
||||
}
|
||||
|
||||
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
|
||||
int schemaHash = olapTable.getSchemaHashByIndexId(index.getId());
|
||||
for (Tablet tablet : index.getTablets()) {
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
Set<Pair<Long, Integer>> tabletIdWithHash = beIdToTabletIdWithHash.computeIfAbsent(
|
||||
replica.getBackendId(), k -> Sets.newHashSet());
|
||||
tabletIdWithHash.add(Pair.of(tablet.getId(), schemaHash));
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
olapTable.readUnlock();
|
||||
}
|
||||
|
||||
int totalTaskNum = beIdToTabletIdWithHash.keySet().size();
|
||||
MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> countDownLatch = new MarkedCountDownLatch<>(totalTaskNum);
|
||||
AgentBatchTask batchTask = new AgentBatchTask();
|
||||
for (Map.Entry<Long, Set<Pair<Long, Integer>>> kv : beIdToTabletIdWithHash.entrySet()) {
|
||||
countDownLatch.addMark(kv.getKey(), kv.getValue());
|
||||
UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(kv.getKey(), kv.getValue(), isInMemory,
|
||||
storagePolicyId, binlogConfig, countDownLatch);
|
||||
batchTask.addTask(task);
|
||||
}
|
||||
if (!FeConstants.runningUnitTest) {
|
||||
// send all tasks and wait them finished
|
||||
AgentTaskQueue.addBatchTask(batchTask);
|
||||
AgentTaskExecutor.submit(batchTask);
|
||||
LOG.info("send update tablet meta task for table {}, partitions {}, number: {}", tableName, partitionName,
|
||||
batchTask.getTaskNum());
|
||||
|
||||
// estimate timeout
|
||||
long timeout = DbUtil.getCreateReplicasTimeoutMs(totalTaskNum);
|
||||
boolean ok = false;
|
||||
try {
|
||||
ok = countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("InterruptedException: ", e);
|
||||
}
|
||||
|
||||
if (!ok || !countDownLatch.getStatus().ok()) {
|
||||
String errMsg = "Failed to update partition[" + partitionName + "]. tablet meta.";
|
||||
// clear tasks
|
||||
AgentTaskQueue.removeBatchTask(batchTask, TTaskType.UPDATE_TABLET_META_INFO);
|
||||
|
||||
if (!countDownLatch.getStatus().ok()) {
|
||||
errMsg += " Error: " + countDownLatch.getStatus().getErrorMsg();
|
||||
} else {
|
||||
// only show at most 3 results
|
||||
List<String> subList = countDownLatch.getLeftMarks().stream().limit(3)
|
||||
.map(item -> "(backendId = " + item.getKey() + ", tabletsWithHash = "
|
||||
+ item.getValue() + ")")
|
||||
.collect(Collectors.toList());
|
||||
if (!subList.isEmpty()) {
|
||||
errMsg += " Unfinished: " + Joiner.on(", ").join(subList);
|
||||
}
|
||||
}
|
||||
errMsg += ". This operation maybe partial successfully, You should retry until success.";
|
||||
LOG.warn(errMsg);
|
||||
throw new DdlException(errMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update one specified partition's properties by partition name of table
|
||||
* This operation may return partial successfully, with an exception to inform user to retry
|
||||
@ -2372,12 +2293,13 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
if (!countDownLatch.getStatus().ok()) {
|
||||
errMsg += " Error: " + countDownLatch.getStatus().getErrorMsg();
|
||||
} else {
|
||||
List<Map.Entry<Long, Set<Pair<Long, Integer>>>> unfinishedMarks = countDownLatch.getLeftMarks();
|
||||
// only show at most 3 results
|
||||
List<Map.Entry<Long, Set<Pair<Long, Integer>>>> subList = unfinishedMarks.subList(0,
|
||||
Math.min(unfinishedMarks.size(), 3));
|
||||
List<String> subList = countDownLatch.getLeftMarks().stream().limit(3)
|
||||
.map(item -> "(backendId = " + item.getKey() + ", tabletsWithHash = "
|
||||
+ item.getValue() + ")")
|
||||
.collect(Collectors.toList());
|
||||
if (!subList.isEmpty()) {
|
||||
errMsg += " Unfinished mark: " + Joiner.on(", ").join(subList);
|
||||
errMsg += " Unfinished: " + Joiner.on(", ").join(subList);
|
||||
}
|
||||
}
|
||||
errMsg += ". This operation maybe partial successfully, You should retry until success.";
|
||||
@ -2988,7 +2910,8 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
|
||||
|
||||
for (Partition partition : partitions) {
|
||||
updatePartitionProperties(db, olapTable.getName(), partition.getName(), -1, -1, newBinlogConfig);
|
||||
updatePartitionProperties(db, olapTable.getName(), partition.getName(), -1, -1,
|
||||
newBinlogConfig, null, null);
|
||||
}
|
||||
|
||||
olapTable.writeLockOrDdlException();
|
||||
|
||||
@ -4577,7 +4577,11 @@ public class Env {
|
||||
}
|
||||
tableProperty.buildInMemory()
|
||||
.buildStoragePolicy()
|
||||
.buildIsBeingSynced();
|
||||
.buildIsBeingSynced()
|
||||
.buildCompactionPolicy()
|
||||
.buildTimeSeriesCompactionGoalSizeMbytes()
|
||||
.buildTimeSeriesCompactionFileCountThreshold()
|
||||
.buildTimeSeriesCompactionTimeThresholdSeconds();
|
||||
|
||||
// need to update partition info meta
|
||||
for (Partition partition : table.getPartitions()) {
|
||||
|
||||
@ -83,11 +83,7 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
|
||||
MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> latch,
|
||||
String compactionPolicy,
|
||||
Map<String, Long> timeSeriesCompactionConfig) {
|
||||
this(backendId, tableIdWithSchemaHash);
|
||||
this.storagePolicyId = storagePolicyId;
|
||||
this.inMemory = inMemory;
|
||||
this.binlogConfig = binlogConfig;
|
||||
this.latch = latch;
|
||||
this(backendId, tableIdWithSchemaHash, inMemory, storagePolicyId, binlogConfig, latch);
|
||||
this.compactionPolicy = compactionPolicy;
|
||||
this.timeSeriesCompactionConfig = timeSeriesCompactionConfig;
|
||||
}
|
||||
|
||||
@ -65,7 +65,7 @@ suite("test_table_level_compaction_policy") {
|
||||
logger.info("${showResult3}")
|
||||
assertTrue(showResult3.toString().containsIgnoreCase('"time_series_compaction_file_count_threshold" = "6000"'))
|
||||
|
||||
sql """
|
||||
sql """
|
||||
alter table ${tableName} set ("time_series_compaction_time_threshold_seconds" = "3000")
|
||||
"""
|
||||
sql """sync"""
|
||||
@ -74,6 +74,33 @@ suite("test_table_level_compaction_policy") {
|
||||
logger.info("${showResult4}")
|
||||
assertTrue(showResult4.toString().containsIgnoreCase('"time_series_compaction_time_threshold_seconds" = "3000"'))
|
||||
|
||||
sql """
|
||||
alter table ${tableName} set ("time_series_compaction_goal_size_mbytes" = "1024")
|
||||
"""
|
||||
sql """sync"""
|
||||
|
||||
def showResult6 = sql """show create table ${tableName}"""
|
||||
logger.info("${showResult6}")
|
||||
assertTrue(showResult6.toString().containsIgnoreCase('"time_series_compaction_goal_size_mbytes" = "1024"'))
|
||||
|
||||
sql """
|
||||
alter table ${tableName} set ("time_series_compaction_file_count_threshold" = "6000")
|
||||
"""
|
||||
sql """sync"""
|
||||
|
||||
def showResult7 = sql """show create table ${tableName}"""
|
||||
logger.info("${showResult7}")
|
||||
assertTrue(showResult7.toString().containsIgnoreCase('"time_series_compaction_file_count_threshold" = "6000"'))
|
||||
|
||||
sql """
|
||||
alter table ${tableName} set ("time_series_compaction_time_threshold_seconds" = "3000")
|
||||
"""
|
||||
sql """sync"""
|
||||
|
||||
def showResult8 = sql """show create table ${tableName}"""
|
||||
logger.info("${showResult8}")
|
||||
assertTrue(showResult8.toString().containsIgnoreCase('"time_series_compaction_time_threshold_seconds" = "3000"'))
|
||||
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
sql """sync"""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user