From 78bcc68ab85f42dd71102009c988cf0f168a1f57 Mon Sep 17 00:00:00 2001 From: LiBinfeng <46676950+LiBinfeng-01@users.noreply.github.com> Date: Fri, 19 May 2023 19:51:22 +0800 Subject: [PATCH] [Fix](Nereids) fix serialize colocate table index concurrent bug (#19862) When doing serialization of minidump input, we can find that when serializing colocate table index, the size and entry get by the hash map always unmatched when concurrent occur. So a write lock be added to ensure concurrency. --- .../doris/catalog/ColocateTableIndex.java | 60 ++++++++++--------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java index 82c6f04183..23703278fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java @@ -653,39 +653,45 @@ public class ColocateTableIndex implements Writable { @Override public void write(DataOutput out) throws IOException { - int size = groupName2Id.size(); - out.writeInt(size); - for (Map.Entry entry : ImmutableMap.copyOf(groupName2Id).entrySet()) { - Text.writeString(out, entry.getKey()); // group name - entry.getValue().write(out); // group id - Collection tableIds = group2Tables.get(entry.getValue()); - out.writeInt(tableIds.size()); - for (Long tblId : tableIds) { - out.writeLong(tblId); // table ids - } - ColocateGroupSchema groupSchema = group2Schema.get(entry.getValue()); - groupSchema.write(out); // group schema + writeLock(); + try { + int size = groupName2Id.size(); + out.writeInt(size); + for (Map.Entry entry : ImmutableMap.copyOf(groupName2Id).entrySet()) { + Text.writeString(out, entry.getKey()); // group name + entry.getValue().write(out); // group id + Collection tableIds = group2Tables.get(entry.getValue()); + out.writeInt(tableIds.size()); + for (Long tblId : tableIds) { + out.writeLong(tblId); // table ids + } + ColocateGroupSchema groupSchema = group2Schema.get(entry.getValue()); + groupSchema.write(out); // group schema - // backend seq - Map>> backendsPerBucketSeq = group2BackendsPerBucketSeq.row(entry.getValue()); - out.writeInt(backendsPerBucketSeq.size()); - for (Map.Entry>> tag2Bucket2BEs : backendsPerBucketSeq.entrySet()) { - tag2Bucket2BEs.getKey().write(out); - out.writeInt(tag2Bucket2BEs.getValue().size()); - for (List beIds : tag2Bucket2BEs.getValue()) { - out.writeInt(beIds.size()); - for (Long be : beIds) { - out.writeLong(be); + // backend seq + Map>> backendsPerBucketSeq = group2BackendsPerBucketSeq.row(entry.getValue()); + out.writeInt(backendsPerBucketSeq.size()); + for (Map.Entry>> tag2Bucket2BEs : backendsPerBucketSeq.entrySet()) { + tag2Bucket2BEs.getKey().write(out); + out.writeInt(tag2Bucket2BEs.getValue().size()); + for (List beIds : tag2Bucket2BEs.getValue()) { + out.writeInt(beIds.size()); + for (Long be : beIds) { + out.writeLong(be); + } } } } + + size = unstableGroups.size(); + out.writeInt(size); + for (GroupId groupId : unstableGroups) { + groupId.write(out); + } + } finally { + writeUnlock(); } - size = unstableGroups.size(); - out.writeInt(size); - for (GroupId groupId : unstableGroups) { - groupId.write(out); - } } public void readFields(DataInput in) throws IOException {