[Enhancement](TabletInvertIndex) use StampLock to instead of ReentranReadWriteLock to get better performance (#11422)

Co-authored-by: caiconghui1 <caiconghui1@jd.com>
This commit is contained in:
caiconghui
2022-08-02 17:53:30 +08:00
committed by GitHub
parent 842a5b8e24
commit d6149e4777
2 changed files with 44 additions and 44 deletions

View File

@ -50,7 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.StampedLock;
import java.util.stream.Collectors;
/*
@ -67,7 +67,7 @@ public class TabletInvertedIndex {
public static final TabletMeta NOT_EXIST_TABLET_META = new TabletMeta(NOT_EXIST_VALUE, NOT_EXIST_VALUE,
NOT_EXIST_VALUE, NOT_EXIST_VALUE, NOT_EXIST_VALUE, TStorageMedium.HDD);
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private StampedLock lock = new StampedLock();
// tablet id -> tablet meta
private Map<Long, TabletMeta> tabletMetaMap = Maps.newHashMap();
@ -99,20 +99,20 @@ public class TabletInvertedIndex {
public TabletInvertedIndex() {
}
private void readLock() {
this.lock.readLock().lock();
private long readLock() {
return this.lock.readLock();
}
private void readUnlock() {
this.lock.readLock().unlock();
private void readUnlock(long stamp) {
this.lock.unlockRead(stamp);
}
private void writeLock() {
this.lock.writeLock().lock();
private long writeLock() {
return this.lock.writeLock();
}
private void writeUnlock() {
this.lock.writeLock().unlock();
private void writeUnlock(long stamp) {
this.lock.unlockWrite(stamp);
}
public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
@ -125,10 +125,10 @@ public class TabletInvertedIndex {
ListMultimap<Long, Long> transactionsToClear,
ListMultimap<Long, Long> tabletRecoveryMap,
List<Triple<Long, Integer, Boolean>> tabletToInMemory) {
readLock();
long stamp = readLock();
long start = System.currentTimeMillis();
try {
LOG.info("begin to do tablet diff with backend[{}]. num: {}", backendId, backendTablets.size());
LOG.debug("begin to do tablet diff with backend[{}]. num: {}", backendId, backendTablets.size());
Map<Long, Replica> replicaMetaWithBackend = backingReplicaMetaTable.row(backendId);
if (replicaMetaWithBackend != null) {
taskPool.submit(() -> {
@ -258,7 +258,7 @@ public class TabletInvertedIndex {
}).join();
}
} finally {
readUnlock();
readUnlock(stamp);
}
long end = System.currentTimeMillis();
@ -272,33 +272,33 @@ public class TabletInvertedIndex {
}
public Long getTabletIdByReplica(long replicaId) {
readLock();
long stamp = readLock();
try {
return replicaToTabletMap.get(replicaId);
} finally {
readUnlock();
readUnlock(stamp);
}
}
public TabletMeta getTabletMeta(long tabletId) {
readLock();
long stamp = readLock();
try {
return tabletMetaMap.get(tabletId);
} finally {
readUnlock();
readUnlock(stamp);
}
}
public List<TabletMeta> getTabletMetaList(List<Long> tabletIdList) {
List<TabletMeta> tabletMetaList = new ArrayList<>(tabletIdList.size());
readLock();
long stamp = readLock();
try {
for (Long tabletId : tabletIdList) {
tabletMetaList.add(tabletMetaMap.getOrDefault(tabletId, NOT_EXIST_TABLET_META));
}
return tabletMetaList;
} finally {
readUnlock();
readUnlock(stamp);
}
}
@ -368,7 +368,7 @@ public class TabletInvertedIndex {
if (Env.isCheckpointThread()) {
return;
}
writeLock();
long stamp = writeLock();
try {
if (tabletMetaMap.containsKey(tabletId)) {
return;
@ -381,7 +381,7 @@ public class TabletInvertedIndex {
LOG.debug("add tablet: {}", tabletId);
} finally {
writeUnlock();
writeUnlock(stamp);
}
}
@ -389,7 +389,7 @@ public class TabletInvertedIndex {
if (Env.isCheckpointThread()) {
return;
}
writeLock();
long stamp = writeLock();
try {
Map<Long, Replica> replicas = replicaMetaTable.rowMap().remove(tabletId);
if (replicas != null) {
@ -409,7 +409,7 @@ public class TabletInvertedIndex {
LOG.debug("delete tablet: {}", tabletId);
} finally {
writeUnlock();
writeUnlock(stamp);
}
}
@ -417,7 +417,7 @@ public class TabletInvertedIndex {
if (Env.isCheckpointThread()) {
return;
}
writeLock();
long stamp = writeLock();
try {
Preconditions.checkState(tabletMetaMap.containsKey(tabletId));
replicaMetaTable.put(tabletId, replica.getBackendId(), replica);
@ -426,7 +426,7 @@ public class TabletInvertedIndex {
LOG.debug("add replica {} of tablet {} in backend {}",
replica.getId(), tabletId, replica.getBackendId());
} finally {
writeUnlock();
writeUnlock(stamp);
}
}
@ -434,7 +434,7 @@ public class TabletInvertedIndex {
if (Env.isCheckpointThread()) {
return;
}
writeLock();
long stamp = writeLock();
try {
Preconditions.checkState(tabletMetaMap.containsKey(tabletId));
if (replicaMetaTable.containsRow(tabletId)) {
@ -450,49 +450,49 @@ public class TabletInvertedIndex {
LOG.error("tablet[{}] contains no replica in inverted index", tabletId);
}
} finally {
writeUnlock();
writeUnlock(stamp);
}
}
public Replica getReplica(long tabletId, long backendId) {
readLock();
long stamp = readLock();
try {
Preconditions.checkState(tabletMetaMap.containsKey(tabletId), tabletId);
return replicaMetaTable.get(tabletId, backendId);
} finally {
readUnlock();
readUnlock(stamp);
}
}
public List<Replica> getReplicasByTabletId(long tabletId) {
readLock();
long stamp = readLock();
try {
if (replicaMetaTable.containsRow(tabletId)) {
return Lists.newArrayList(replicaMetaTable.row(tabletId).values());
}
return Lists.newArrayList();
} finally {
readUnlock();
readUnlock(stamp);
}
}
public List<Long> getTabletIdsByBackendId(long backendId) {
List<Long> tabletIds = Lists.newArrayList();
readLock();
long stamp = readLock();
try {
Map<Long, Replica> replicaMetaWithBackend = backingReplicaMetaTable.row(backendId);
if (replicaMetaWithBackend != null) {
tabletIds.addAll(replicaMetaWithBackend.keySet());
}
} finally {
readUnlock();
readUnlock(stamp);
}
return tabletIds;
}
public List<Long> getTabletIdsByBackendIdAndStorageMedium(long backendId, TStorageMedium storageMedium) {
List<Long> tabletIds = Lists.newArrayList();
readLock();
long stamp = readLock();
try {
Map<Long, Replica> replicaMetaWithBackend = backingReplicaMetaTable.row(backendId);
if (replicaMetaWithBackend != null) {
@ -500,20 +500,20 @@ public class TabletInvertedIndex {
id -> tabletMetaMap.get(id).getStorageMedium() == storageMedium).collect(Collectors.toList());
}
} finally {
readUnlock();
readUnlock(stamp);
}
return tabletIds;
}
public int getTabletNumByBackendId(long backendId) {
readLock();
long stamp = readLock();
try {
Map<Long, Replica> replicaMetaWithBackend = backingReplicaMetaTable.row(backendId);
if (replicaMetaWithBackend != null) {
return replicaMetaWithBackend.size();
}
} finally {
readUnlock();
readUnlock(stamp);
}
return 0;
}
@ -522,7 +522,7 @@ public class TabletInvertedIndex {
Map<TStorageMedium, Long> replicaNumMap = Maps.newHashMap();
long hddNum = 0;
long ssdNum = 0;
readLock();
long stamp = readLock();
try {
Map<Long, Replica> replicaMetaWithBackend = backingReplicaMetaTable.row(backendId);
if (replicaMetaWithBackend != null) {
@ -535,7 +535,7 @@ public class TabletInvertedIndex {
}
}
} finally {
readUnlock();
readUnlock(stamp);
}
replicaNumMap.put(TStorageMedium.HDD, hddNum);
replicaNumMap.put(TStorageMedium.SSD, ssdNum);
@ -544,7 +544,7 @@ public class TabletInvertedIndex {
// just for test
public void clear() {
writeLock();
long stamp = writeLock();
try {
tabletMetaMap.clear();
replicaToTabletMap.clear();
@ -552,7 +552,7 @@ public class TabletInvertedIndex {
replicaMetaTable.clear();
backingReplicaMetaTable.clear();
} finally {
writeUnlock();
writeUnlock(stamp);
}
}
@ -567,7 +567,7 @@ public class TabletInvertedIndex {
// Only build from available bes, exclude colocate tables
public Map<TStorageMedium, TreeMultimap<Long, PartitionBalanceInfo>> buildPartitionInfoBySkew(
List<Long> availableBeIds) {
readLock();
long stamp = readLock();
// 1. gen <partitionId-indexId, <beId, replicaCount>>
// for each replica(all tablets):
@ -610,7 +610,7 @@ public class TabletInvertedIndex {
}
}
} finally {
readUnlock();
readUnlock(stamp);
}
// 2. Populate ClusterBalanceInfo::table_info_by_skew

View File

@ -89,7 +89,7 @@ public class StatisticProcNode implements ProcNodeInterface {
this.db = db;
this.dbNum = 1;
this.db.getTables().stream().filter(t -> t != null).forEach(t -> {
this.db.getTables().stream().filter(Objects::nonNull).forEach(t -> {
++tableNum;
if (t.getType() == TableType.OLAP) {
OlapTable olapTable = (OlapTable) t;