[Bug][DynamicPartition] Take table_id as key of runtimeInfo (#6053)
Co-authored-by: wangxixu <wangxixu@xiaomi.com>
This commit is contained in:
@ -3849,7 +3849,7 @@ public class Catalog {
|
||||
// register or remove table from DynamicPartition after table created
|
||||
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
|
||||
dynamicPartitionScheduler.createOrUpdateRuntimeInfo(
|
||||
tableName, DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime());
|
||||
tableId, DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime());
|
||||
}
|
||||
} catch (DdlException e) {
|
||||
for (Long tabletId : tabletIdSet) {
|
||||
@ -5439,7 +5439,7 @@ public class Catalog {
|
||||
|
||||
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), table, false);
|
||||
dynamicPartitionScheduler.createOrUpdateRuntimeInfo(
|
||||
table.getName(), DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime());
|
||||
table.getId(), DynamicPartitionScheduler.LAST_UPDATE_TIME, TimeUtils.getCurrentFormatTime());
|
||||
ModifyTablePropertyOperationLog info = new ModifyTablePropertyOperationLog(db.getId(), table.getId(), logProperties);
|
||||
editLog.logDynamicPartition(info);
|
||||
}
|
||||
|
||||
@ -46,15 +46,12 @@ import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.common.util.RangeUtils;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Range;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
@ -81,7 +78,7 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
|
||||
private final String DEFAULT_RUNTIME_VALUE = FeConstants.null_string;
|
||||
|
||||
private Map<String, Map<String, String>> runtimeInfos = Maps.newConcurrentMap();
|
||||
private Map<Long, Map<String, String>> runtimeInfos = Maps.newConcurrentMap();
|
||||
private Set<Pair<Long, Long>> dynamicPartitionTableInfo = Sets.newConcurrentHashSet();
|
||||
private boolean initialize;
|
||||
|
||||
@ -108,21 +105,21 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
dynamicPartitionTableInfo.remove(new Pair<>(dbId, tableId));
|
||||
}
|
||||
|
||||
public String getRuntimeInfo(String tableName, String key) {
|
||||
Map<String, String> tableRuntimeInfo = runtimeInfos.getOrDefault(tableName, createDefaultRuntimeInfo());
|
||||
public String getRuntimeInfo(long tableId, String key) {
|
||||
Map<String, String> tableRuntimeInfo = runtimeInfos.getOrDefault(tableId, createDefaultRuntimeInfo());
|
||||
return tableRuntimeInfo.getOrDefault(key, DEFAULT_RUNTIME_VALUE);
|
||||
}
|
||||
|
||||
public void removeRuntimeInfo(String tableName) {
|
||||
runtimeInfos.remove(tableName);
|
||||
public void removeRuntimeInfo(long tableId) {
|
||||
runtimeInfos.remove(tableId);
|
||||
}
|
||||
|
||||
public void createOrUpdateRuntimeInfo(String tableName, String key, String value) {
|
||||
Map<String, String> runtimeInfo = runtimeInfos.get(tableName);
|
||||
public void createOrUpdateRuntimeInfo(long tableId, String key, String value) {
|
||||
Map<String, String> runtimeInfo = runtimeInfos.get(tableId);
|
||||
if (runtimeInfo == null) {
|
||||
runtimeInfo = createDefaultRuntimeInfo();
|
||||
runtimeInfo.put(key, value);
|
||||
runtimeInfos.put(tableName, runtimeInfo);
|
||||
runtimeInfos.put(tableId, runtimeInfo);
|
||||
} else {
|
||||
runtimeInfo.put(key, value);
|
||||
}
|
||||
@ -176,9 +173,9 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
} catch (DdlException e) {
|
||||
isPartitionExists = true;
|
||||
if (addPartitionKeyRange.equals(partitionItem.getItems())) {
|
||||
clearCreatePartitionFailedMsg(olapTable.getName());
|
||||
clearCreatePartitionFailedMsg(olapTable.getId());
|
||||
} else {
|
||||
recordCreatePartitionFailedMsg(db.getFullName(), olapTable.getName(), e.getMessage());
|
||||
recordCreatePartitionFailedMsg(db.getFullName(), olapTable.getName(), e.getMessage(), olapTable.getId());
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -313,7 +310,7 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
if (olapTable.getState() != OlapTable.OlapTableState.NORMAL) {
|
||||
String errorMsg = "Table[" + olapTable.getName() + "]'s state is not NORMAL."
|
||||
+ "Do not allow doing dynamic add partition. table state=" + olapTable.getState();
|
||||
recordCreatePartitionFailedMsg(db.getFullName(), olapTable.getName(), errorMsg);
|
||||
recordCreatePartitionFailedMsg(db.getFullName(), olapTable.getName(), errorMsg, olapTable.getId());
|
||||
skipAddPartition = true;
|
||||
}
|
||||
|
||||
@ -321,7 +318,7 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
// if column type is Date, format partition name as yyyyMMdd
|
||||
// if column type is DateTime, format partition name as yyyyMMddHHssmm
|
||||
// scheduler time should be record even no partition added
|
||||
createOrUpdateRuntimeInfo(olapTable.getName(), LAST_SCHEDULER_TIME, TimeUtils.getCurrentFormatTime());
|
||||
createOrUpdateRuntimeInfo(olapTable.getId(), LAST_SCHEDULER_TIME, TimeUtils.getCurrentFormatTime());
|
||||
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo();
|
||||
if (rangePartitionInfo.getPartitionColumns().size() != 1) {
|
||||
// currently only support partition with single column.
|
||||
@ -334,7 +331,7 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
try {
|
||||
partitionFormat = DynamicPartitionUtil.getPartitionFormat(partitionColumn);
|
||||
} catch (DdlException e) {
|
||||
recordCreatePartitionFailedMsg(db.getFullName(), olapTable.getName(), e.getMessage());
|
||||
recordCreatePartitionFailedMsg(db.getFullName(), olapTable.getName(), e.getMessage(), olapTable.getId());
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -351,9 +348,9 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
olapTable.writeLock();
|
||||
try {
|
||||
Catalog.getCurrentCatalog().dropPartition(db, olapTable, dropPartitionClause);
|
||||
clearDropPartitionFailedMsg(tableName);
|
||||
clearDropPartitionFailedMsg(olapTable.getId());
|
||||
} catch (DdlException e) {
|
||||
recordDropPartitionFailedMsg(db.getFullName(), tableName, e.getMessage());
|
||||
recordDropPartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId());
|
||||
} finally {
|
||||
olapTable.writeUnlock();
|
||||
}
|
||||
@ -363,35 +360,35 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
for (AddPartitionClause addPartitionClause : addPartitionClauses) {
|
||||
try {
|
||||
Catalog.getCurrentCatalog().addPartition(db, tableName, addPartitionClause);
|
||||
clearCreatePartitionFailedMsg(tableName);
|
||||
clearCreatePartitionFailedMsg(olapTable.getId());
|
||||
} catch (DdlException e) {
|
||||
recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage());
|
||||
recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void recordCreatePartitionFailedMsg(String dbName, String tableName, String msg) {
|
||||
private void recordCreatePartitionFailedMsg(String dbName, String tableName, String msg, long tableId) {
|
||||
LOG.warn("dynamic add partition failed: {}, db: {}, table: {}", msg, dbName, tableName);
|
||||
createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.ERROR.toString());
|
||||
createOrUpdateRuntimeInfo(tableName, CREATE_PARTITION_MSG, msg);
|
||||
createOrUpdateRuntimeInfo(tableId, DYNAMIC_PARTITION_STATE, State.ERROR.toString());
|
||||
createOrUpdateRuntimeInfo(tableId, CREATE_PARTITION_MSG, msg);
|
||||
}
|
||||
|
||||
private void clearCreatePartitionFailedMsg(String tableName) {
|
||||
createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.NORMAL.toString());
|
||||
createOrUpdateRuntimeInfo(tableName, CREATE_PARTITION_MSG, DEFAULT_RUNTIME_VALUE);
|
||||
private void clearCreatePartitionFailedMsg(long tableId) {
|
||||
createOrUpdateRuntimeInfo(tableId, DYNAMIC_PARTITION_STATE, State.NORMAL.toString());
|
||||
createOrUpdateRuntimeInfo(tableId, CREATE_PARTITION_MSG, DEFAULT_RUNTIME_VALUE);
|
||||
}
|
||||
|
||||
private void recordDropPartitionFailedMsg(String dbName, String tableName, String msg) {
|
||||
private void recordDropPartitionFailedMsg(String dbName, String tableName, String msg, long tableId) {
|
||||
LOG.warn("dynamic drop partition failed: {}, db: {}, table: {}", msg, dbName, tableName);
|
||||
createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.ERROR.toString());
|
||||
createOrUpdateRuntimeInfo(tableName, DROP_PARTITION_MSG, msg);
|
||||
createOrUpdateRuntimeInfo(tableId, DYNAMIC_PARTITION_STATE, State.ERROR.toString());
|
||||
createOrUpdateRuntimeInfo(tableId, DROP_PARTITION_MSG, msg);
|
||||
}
|
||||
|
||||
private void clearDropPartitionFailedMsg(String tableName) {
|
||||
createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.NORMAL.toString());
|
||||
createOrUpdateRuntimeInfo(tableName, DROP_PARTITION_MSG, DEFAULT_RUNTIME_VALUE);
|
||||
private void clearDropPartitionFailedMsg(long tableId) {
|
||||
createOrUpdateRuntimeInfo(tableId, DYNAMIC_PARTITION_STATE, State.NORMAL.toString());
|
||||
createOrUpdateRuntimeInfo(tableId, DROP_PARTITION_MSG, DEFAULT_RUNTIME_VALUE);
|
||||
}
|
||||
|
||||
private void initDynamicPartitionTable() {
|
||||
|
||||
@ -1725,7 +1725,7 @@ public class ShowExecutor {
|
||||
olapTable.readLock();
|
||||
try {
|
||||
if (!olapTable.dynamicPartitionExists()) {
|
||||
dynamicPartitionScheduler.removeRuntimeInfo(olapTable.getName());
|
||||
dynamicPartitionScheduler.removeRuntimeInfo(olapTable.getId());
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -1749,11 +1749,11 @@ public class ShowExecutor {
|
||||
String.valueOf(dynamicPartitionProperty.getBuckets()),
|
||||
String.valueOf(replicationNum),
|
||||
dynamicPartitionProperty.getStartOfInfo(),
|
||||
dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.LAST_UPDATE_TIME),
|
||||
dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.LAST_SCHEDULER_TIME),
|
||||
dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.DYNAMIC_PARTITION_STATE),
|
||||
dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.CREATE_PARTITION_MSG),
|
||||
dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.DROP_PARTITION_MSG)));
|
||||
dynamicPartitionScheduler.getRuntimeInfo(olapTable.getId(), DynamicPartitionScheduler.LAST_UPDATE_TIME),
|
||||
dynamicPartitionScheduler.getRuntimeInfo(olapTable.getId(), DynamicPartitionScheduler.LAST_SCHEDULER_TIME),
|
||||
dynamicPartitionScheduler.getRuntimeInfo(olapTable.getId(), DynamicPartitionScheduler.DYNAMIC_PARTITION_STATE),
|
||||
dynamicPartitionScheduler.getRuntimeInfo(olapTable.getId(), DynamicPartitionScheduler.CREATE_PARTITION_MSG),
|
||||
dynamicPartitionScheduler.getRuntimeInfo(olapTable.getId(), DynamicPartitionScheduler.DROP_PARTITION_MSG)));
|
||||
} finally {
|
||||
olapTable.readUnlock();
|
||||
}
|
||||
|
||||
@ -19,23 +19,20 @@ package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.CreateDbStmt;
|
||||
import org.apache.doris.analysis.CreateTableStmt;
|
||||
import org.apache.doris.clone.DynamicPartitionScheduler;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.util.DynamicPartitionUtil;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
import org.apache.doris.utframe.UtFrameUtils;
|
||||
|
||||
import com.clearspring.analytics.util.Lists;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collections;
|
||||
@ -1017,4 +1014,27 @@ public class DynamicPartitionTableTest {
|
||||
");";
|
||||
createTable(createOlapTblStmt);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRuntimeInfo() throws Exception {
|
||||
DynamicPartitionScheduler scheduler = new DynamicPartitionScheduler("test", 10);
|
||||
long tableId = 1001;
|
||||
String key1 = "key1";
|
||||
String value1 = "value1";
|
||||
String key2 = "key2";
|
||||
String value2 = "value2";
|
||||
// add
|
||||
scheduler.createOrUpdateRuntimeInfo(tableId, key1, value1);
|
||||
scheduler.createOrUpdateRuntimeInfo(tableId, key2, value2);
|
||||
Assert.assertTrue(scheduler.getRuntimeInfo(tableId, key1) == value1);
|
||||
|
||||
// modify
|
||||
String value3 = "value2";
|
||||
scheduler.createOrUpdateRuntimeInfo(tableId, key1, value3);
|
||||
Assert.assertTrue(scheduler.getRuntimeInfo(tableId, key1) == value3);
|
||||
|
||||
// remove
|
||||
scheduler.removeRuntimeInfo(tableId);
|
||||
Assert.assertTrue(scheduler.getRuntimeInfo(tableId, key1) == FeConstants.null_string);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user