[fix](mtmv)fix getIdToItem cause ConcurrentModificationException (#31511)

This commit is contained in:
zhangdong
2024-02-29 12:06:38 +08:00
committed by yiguolei
parent 6ef3455786
commit c60fea9bdf
6 changed files with 130 additions and 54 deletions

View File

@ -112,7 +112,12 @@ public class MTMV extends OlapTable {
}
public MTMVRefreshInfo getRefreshInfo() {
return refreshInfo;
readMvLock();
try {
return refreshInfo;
} finally {
readMvUnlock();
}
}
public String getQuerySql() {
@ -120,8 +125,8 @@ public class MTMV extends OlapTable {
}
public MTMVStatus getStatus() {
readMvLock();
try {
readMvLock();
return status;
} finally {
readMvUnlock();
@ -133,11 +138,21 @@ public class MTMV extends OlapTable {
}
public MTMVJobInfo getJobInfo() {
return jobInfo;
readMvLock();
try {
return jobInfo;
} finally {
readMvUnlock();
}
}
public MTMVRelation getRelation() {
return relation;
readMvLock();
try {
return relation;
} finally {
readMvUnlock();
}
}
public void setCache(MTMVCache cache) {
@ -145,12 +160,17 @@ public class MTMV extends OlapTable {
}
public MTMVRefreshInfo alterRefreshInfo(MTMVRefreshInfo newRefreshInfo) {
return refreshInfo.updateNotNull(newRefreshInfo);
writeMvLock();
try {
return refreshInfo.updateNotNull(newRefreshInfo);
} finally {
writeMvUnlock();
}
}
public MTMVStatus alterStatus(MTMVStatus newStatus) {
writeMvLock();
try {
writeMvLock();
return this.status.updateNotNull(newStatus);
} finally {
writeMvUnlock();
@ -159,8 +179,8 @@ public class MTMV extends OlapTable {
public void addTaskResult(MTMVTask task, MTMVRelation relation,
Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) {
writeMvLock();
try {
writeMvLock();
if (task.getStatus() == TaskStatus.SUCCESS) {
this.status.setState(MTMVState.NORMAL);
this.status.setSchemaChangeDetail(null);
@ -185,41 +205,66 @@ public class MTMV extends OlapTable {
}
public Map<String, String> alterMvProperties(Map<String, String> mvProperties) {
this.mvProperties.putAll(mvProperties);
return this.mvProperties;
writeMvLock();
try {
this.mvProperties.putAll(mvProperties);
return this.mvProperties;
} finally {
writeMvUnlock();
}
}
public long getGracePeriod() {
if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) {
return Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) * 1000;
} else {
return 0L;
readMvLock();
try {
if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) {
return Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) * 1000;
} else {
return 0L;
}
} finally {
readMvUnlock();
}
}
public Optional<String> getWorkloadGroup() {
if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP) && !StringUtils
.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP))) {
return Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP));
readMvLock();
try {
if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP) && !StringUtils
.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP))) {
return Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP));
}
return Optional.empty();
} finally {
readMvUnlock();
}
return Optional.empty();
}
public int getRefreshPartitionNum() {
if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) {
int value = Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
return value < 1 ? MTMVTask.DEFAULT_REFRESH_PARTITION_NUM : value;
} else {
return MTMVTask.DEFAULT_REFRESH_PARTITION_NUM;
readMvLock();
try {
if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) {
int value = Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
return value < 1 ? MTMVTask.DEFAULT_REFRESH_PARTITION_NUM : value;
} else {
return MTMVTask.DEFAULT_REFRESH_PARTITION_NUM;
}
} finally {
readMvUnlock();
}
}
public Set<String> getExcludedTriggerTables() {
if (!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) {
return Sets.newHashSet();
readMvLock();
try {
if (!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) {
return Sets.newHashSet();
}
String[] split = mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(",");
return Sets.newHashSet(split);
} finally {
readMvUnlock();
}
String[] split = mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(",");
return Sets.newHashSet(split);
}
public MTMVCache getOrGenerateCache() throws AnalysisException {
@ -237,7 +282,12 @@ public class MTMV extends OlapTable {
}
public Map<String, String> getMvProperties() {
return mvProperties;
readMvLock();
try {
return mvProperties;
} finally {
readMvUnlock();
}
}
public MTMVPartitionInfo getMvPartitionInfo() {
@ -254,7 +304,7 @@ public class MTMV extends OlapTable {
* @return mvPartitionId ==> mvPartitionKeyDesc
*/
public Map<Long, PartitionKeyDesc> generateMvPartitionDescs() {
Map<Long, PartitionItem> mtmvItems = getPartitionItems();
Map<Long, PartitionItem> mtmvItems = getAndCopyPartitionItems();
Map<Long, PartitionKeyDesc> result = Maps.newHashMap();
for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
result.put(entry.getKey(), entry.getValue().toPartitionKeyDesc());
@ -276,7 +326,7 @@ public class MTMV extends OlapTable {
return Maps.newHashMap();
}
Map<PartitionKeyDesc, Set<Long>> res = new HashMap<>();
Map<Long, PartitionItem> relatedPartitionItems = mvPartitionInfo.getRelatedTable().getPartitionItems();
Map<Long, PartitionItem> relatedPartitionItems = mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems();
int relatedColPos = mvPartitionInfo.getRelatedColPos();
for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) {
PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(relatedColPos);
@ -303,7 +353,7 @@ public class MTMV extends OlapTable {
}
Map<Long, Set<Long>> res = Maps.newHashMap();
Map<PartitionKeyDesc, Set<Long>> relatedPartitionDescs = generateRelatedPartitionDescs();
Map<Long, PartitionItem> mvPartitionItems = getPartitionInfo().getIdToItem(false);
Map<Long, PartitionItem> mvPartitionItems = getAndCopyPartitionItems();
for (Entry<Long, PartitionItem> entry : mvPartitionItems.entrySet()) {
res.put(entry.getKey(),
relatedPartitionDescs.getOrDefault(entry.getValue().toPartitionKeyDesc(), Sets.newHashSet()));

View File

@ -1053,7 +1053,12 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
}
public List<Long> getPartitionIds() {
return new ArrayList<>(idToPartition.keySet());
readLock();
try {
return new ArrayList<>(idToPartition.keySet());
} finally {
readUnlock();
}
}
public Set<String> getCopiedBfColumns() {
@ -2572,8 +2577,13 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
}
@Override
public Map<Long, PartitionItem> getPartitionItems() {
return getPartitionInfo().getIdToItem(false);
public Map<Long, PartitionItem> getAndCopyPartitionItems() {
readLock();
try {
return Maps.newHashMap(getPartitionInfo().getIdToItem(false));
} finally {
readUnlock();
}
}
@Override
@ -2595,7 +2605,12 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
@Override
public String getPartitionName(long partitionId) throws AnalysisException {
return getPartitionOrAnalysisException(partitionId).getName();
readLock();
try {
return getPartitionOrAnalysisException(partitionId).getName();
} finally {
readUnlock();
}
}
@Override

View File

@ -773,7 +773,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
}
@Override
public Map<Long, PartitionItem> getPartitionItems() {
public Map<Long, PartitionItem> getAndCopyPartitionItems() {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
@ -816,7 +816,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
long partitionId = 0L;
long maxVersionTime = 0L;
long visibleVersionTime;
for (Entry<Long, PartitionItem> entry : getPartitionItems().entrySet()) {
for (Entry<Long, PartitionItem> entry : getAndCopyPartitionItems().entrySet()) {
visibleVersionTime = getPartitionLastModifyTime(entry.getKey());
if (visibleVersionTime > maxVersionTime) {
maxVersionTime = visibleVersionTime;
@ -831,7 +831,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
}
private HivePartition getPartitionById(long partitionId) throws AnalysisException {
PartitionItem item = getPartitionItems().get(partitionId);
PartitionItem item = getAndCopyPartitionItems().get(partitionId);
List<List<String>> partitionValuesList = transferPartitionItemToPartitionValues(item);
List<HivePartition> partitions = getPartitionsByPartitionValues(partitionValuesList);
if (partitions.size() != 1) {

View File

@ -141,7 +141,7 @@ public class MTMVPartitionUtil {
throws AnalysisException {
int pos = getPos(relatedTable, relatedCol);
Set<PartitionKeyDesc> res = Sets.newHashSet();
for (Entry<Long, PartitionItem> entry : relatedTable.getPartitionItems().entrySet()) {
for (Entry<Long, PartitionItem> entry : relatedTable.getAndCopyPartitionItems().entrySet()) {
PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(pos);
res.add(partitionKeyDesc);
}
@ -163,18 +163,24 @@ public class MTMVPartitionUtil {
public static List<String> getPartitionNamesByIds(MTMV mtmv, Collection<Long> ids) throws AnalysisException {
List<String> res = Lists.newArrayList();
for (Long partitionId : ids) {
res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName());
res.add(mtmv.getPartitionName(partitionId));
}
return res;
}
public static List<Long> getPartitionsIdsByNames(MTMV mtmv, List<String> partitions) throws AnalysisException {
List<Long> res = Lists.newArrayList();
for (String partitionName : partitions) {
Partition partition = mtmv.getPartitionOrAnalysisException(partitionName);
res.add(partition.getId());
mtmv.readLock();
try {
List<Long> res = Lists.newArrayList();
for (String partitionName : partitions) {
Partition partition = mtmv.getPartitionOrAnalysisException(partitionName);
res.add(partition.getId());
}
return res;
} finally {
mtmv.readUnlock();
}
return res;
}
/**
@ -209,9 +215,9 @@ public class MTMVPartitionUtil {
public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables,
Map<Long, Set<Long>> partitionMappings)
throws AnalysisException {
Collection<Partition> partitions = mtmv.getPartitions();
for (Partition partition : partitions) {
if (!isMTMVPartitionSync(mtmv, partition.getId(), partitionMappings.get(partition.getId()), tables,
List<Long> partitionIds = mtmv.getPartitionIds();
for (Long partitionId : partitionIds) {
if (!isMTMVPartitionSync(mtmv, partitionId, partitionMappings.get(partitionId), tables,
excludeTables)) {
return false;
}
@ -277,16 +283,16 @@ public class MTMVPartitionUtil {
*/
public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv, Set<BaseTableInfo> baseTables,
Map<Long, Set<Long>> partitionMappings) {
Collection<Partition> allPartitions = mtmv.getPartitions();
List<Long> partitionIds = mtmv.getPartitionIds();
List<Long> res = Lists.newArrayList();
for (Partition partition : allPartitions) {
for (Long partitionId : partitionIds) {
try {
if (!isMTMVPartitionSync(mtmv, partition.getId(), partitionMappings.get(partition.getId()), baseTables,
if (!isMTMVPartitionSync(mtmv, partitionId, partitionMappings.get(partitionId), baseTables,
mtmv.getExcludedTriggerTables())) {
res.add(partition.getId());
res.add(partitionId);
}
} catch (AnalysisException e) {
res.add(partition.getId());
res.add(partitionId);
LOG.warn("check isMTMVPartitionSync failed", e);
}
}
@ -361,6 +367,7 @@ public class MTMVPartitionUtil {
/**
* add partition for mtmv like relatedPartitionId of relatedTable
* `Env.getCurrentEnv().addPartition` has obtained the lock internally, but we do not obtain the lock here
*
* @param mtmv
* @param oldPartitionKeyDesc
@ -448,7 +455,7 @@ public class MTMVPartitionUtil {
throws AnalysisException {
Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap();
for (Long partitionId : partitionIds) {
res.put(mtmv.getPartition(partitionId).getName(),
res.put(mtmv.getPartitionName(partitionId),
generatePartitionSnapshot(mtmv, baseTables, partitionMappings.get(partitionId)));
}
return res;

View File

@ -38,7 +38,7 @@ public interface MTMVRelatedTableIf extends TableIf {
*
* @return partitionId->PartitionItem
*/
Map<Long, PartitionItem> getPartitionItems();
Map<Long, PartitionItem> getAndCopyPartitionItems();
/**
* getPartitionType LIST/RANGE/UNPARTITIONED

View File

@ -71,6 +71,10 @@ public class MTMVPartitionUtilTest {
minTimes = 0;
result = Lists.newArrayList(p1);
mtmv.getPartitionIds();
minTimes = 0;
result = Lists.newArrayList(1L);
p1.getId();
minTimes = 0;
result = 1L;