[Fix](Nereids) fix serialize colocate table index concurrent bug (#19862)
When doing serialization of minidump input, we can find that when serializing colocate table index, the size and entry get by the hash map always unmatched when concurrent occur. So a write lock be added to ensure concurrency.
This commit is contained in:
@ -653,39 +653,45 @@ public class ColocateTableIndex implements Writable {
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
int size = groupName2Id.size();
|
||||
out.writeInt(size);
|
||||
for (Map.Entry<String, GroupId> entry : ImmutableMap.copyOf(groupName2Id).entrySet()) {
|
||||
Text.writeString(out, entry.getKey()); // group name
|
||||
entry.getValue().write(out); // group id
|
||||
Collection<Long> tableIds = group2Tables.get(entry.getValue());
|
||||
out.writeInt(tableIds.size());
|
||||
for (Long tblId : tableIds) {
|
||||
out.writeLong(tblId); // table ids
|
||||
}
|
||||
ColocateGroupSchema groupSchema = group2Schema.get(entry.getValue());
|
||||
groupSchema.write(out); // group schema
|
||||
writeLock();
|
||||
try {
|
||||
int size = groupName2Id.size();
|
||||
out.writeInt(size);
|
||||
for (Map.Entry<String, GroupId> entry : ImmutableMap.copyOf(groupName2Id).entrySet()) {
|
||||
Text.writeString(out, entry.getKey()); // group name
|
||||
entry.getValue().write(out); // group id
|
||||
Collection<Long> tableIds = group2Tables.get(entry.getValue());
|
||||
out.writeInt(tableIds.size());
|
||||
for (Long tblId : tableIds) {
|
||||
out.writeLong(tblId); // table ids
|
||||
}
|
||||
ColocateGroupSchema groupSchema = group2Schema.get(entry.getValue());
|
||||
groupSchema.write(out); // group schema
|
||||
|
||||
// backend seq
|
||||
Map<Tag, List<List<Long>>> backendsPerBucketSeq = group2BackendsPerBucketSeq.row(entry.getValue());
|
||||
out.writeInt(backendsPerBucketSeq.size());
|
||||
for (Map.Entry<Tag, List<List<Long>>> tag2Bucket2BEs : backendsPerBucketSeq.entrySet()) {
|
||||
tag2Bucket2BEs.getKey().write(out);
|
||||
out.writeInt(tag2Bucket2BEs.getValue().size());
|
||||
for (List<Long> beIds : tag2Bucket2BEs.getValue()) {
|
||||
out.writeInt(beIds.size());
|
||||
for (Long be : beIds) {
|
||||
out.writeLong(be);
|
||||
// backend seq
|
||||
Map<Tag, List<List<Long>>> backendsPerBucketSeq = group2BackendsPerBucketSeq.row(entry.getValue());
|
||||
out.writeInt(backendsPerBucketSeq.size());
|
||||
for (Map.Entry<Tag, List<List<Long>>> tag2Bucket2BEs : backendsPerBucketSeq.entrySet()) {
|
||||
tag2Bucket2BEs.getKey().write(out);
|
||||
out.writeInt(tag2Bucket2BEs.getValue().size());
|
||||
for (List<Long> beIds : tag2Bucket2BEs.getValue()) {
|
||||
out.writeInt(beIds.size());
|
||||
for (Long be : beIds) {
|
||||
out.writeLong(be);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
size = unstableGroups.size();
|
||||
out.writeInt(size);
|
||||
for (GroupId groupId : unstableGroups) {
|
||||
groupId.write(out);
|
||||
}
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
|
||||
size = unstableGroups.size();
|
||||
out.writeInt(size);
|
||||
for (GroupId groupId : unstableGroups) {
|
||||
groupId.write(out);
|
||||
}
|
||||
}
|
||||
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
|
||||
Reference in New Issue
Block a user