[fix](cooldown) Handle re-add replica with cooldowned data #17047

Modify rule of choosing cooldown replica, only alive replica can be cooldown replica.
Handle re-add replica with cooldowned data.
This commit is contained in:
plat1ko
2023-02-26 12:36:55 +08:00
committed by GitHub
parent a0782a1855
commit 0251cb8941
4 changed files with 82 additions and 37 deletions

View File

@ -372,19 +372,21 @@ public class TabletInvertedIndex {
return;
}
// validate replica is active
// check cooldown replica is alive
Map<Long, Replica> replicaMap = replicaMetaTable.row(beTabletInfo.getTabletId());
if (replicaMap.isEmpty()) {
return;
}
boolean replicaInvalid = true;
boolean replicaAlive = false;
for (Replica replica : replicaMap.values()) {
if (replica.getId() == cooldownConf.first) {
replicaInvalid = false;
if (replica.isAlive()) {
replicaAlive = true;
}
break;
}
}
if (replicaInvalid) {
if (!replicaAlive) {
CooldownConf conf = new CooldownConf(tabletMeta.getDbId(), tabletMeta.getTableId(),
tabletMeta.getPartitionId(), tabletMeta.getIndexId(), beTabletInfo.tablet_id, cooldownConf.second);
synchronized (cooldownConfToUpdate) {

View File

@ -72,8 +72,9 @@ public class CooldownConfHandler extends MasterDaemon {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (CooldownConf conf : confToUpdate) {
// choose cooldown replica
List<Replica> replicas = invertedIndex.getReplicas(conf.getTabletId());
// choose cooldown replica from alive replicas
List<Replica> replicas = invertedIndex.getReplicas(conf.getTabletId()).stream().filter(r -> r.isAlive())
.collect(Collectors.toList());
if (replicas.isEmpty()) {
continue;
}

View File

@ -849,18 +849,12 @@ public class ReportHandler extends Daemon {
if (isBackendReplicaHealthy(backendTabletInfo)) {
// if this tablet meta is still in invertedIndex. try to add it.
// if add failed. delete this tablet from backend.
try {
tabletMeta = invertedIndex.getTabletMeta(tabletId);
if (tabletMeta != null) {
addReplica(tabletId, tabletMeta, backendTabletInfo, backendId);
// update counter
++addToMetaCounter;
} else {
needDelete = true;
}
} catch (MetaNotFoundException e) {
LOG.debug("failed add to meta. tablet[{}], backend[{}]. {}",
tabletId, backendId, e.getMessage());
tabletMeta = invertedIndex.getTabletMeta(tabletId);
if (tabletMeta != null && addReplica(tabletId, tabletMeta, backendTabletInfo, backendId)) {
// update counter
++addToMetaCounter;
} else {
LOG.debug("failed add to meta. tablet[{}], backend[{}]", tabletId, backendId);
needDelete = true;
}
} else {
@ -1053,8 +1047,9 @@ public class ReportHandler extends Daemon {
AgentTaskExecutor.submit(batchTask);
}
private static void addReplica(long tabletId, TabletMeta tabletMeta, TTabletInfo backendTabletInfo, long backendId)
throws MetaNotFoundException {
// return false if add replica failed
private static boolean addReplica(long tabletId, TabletMeta tabletMeta, TTabletInfo backendTabletInfo,
long backendId) {
long dbId = tabletMeta.getDbId();
long tableId = tabletMeta.getTableId();
long partitionId = tabletMeta.getPartitionId();
@ -1066,50 +1061,62 @@ public class ReportHandler extends Daemon {
long remoteDataSize = backendTabletInfo.getRemoteDataSize();
long rowCount = backendTabletInfo.getRowCount();
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP);
olapTable.writeLockOrMetaException();
Database db;
OlapTable olapTable;
try {
db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
olapTable = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP);
olapTable.writeLockOrMetaException();
} catch (MetaNotFoundException e) {
LOG.warn(e);
return false;
}
try {
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
throw new MetaNotFoundException("partition[" + partitionId + "] does not exist");
LOG.warn("partition[{}] does not exist", partitionId);
return false;
}
ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId());
MaterializedIndex materializedIndex = partition.getIndex(indexId);
if (materializedIndex == null) {
throw new MetaNotFoundException("index[" + indexId + "] does not exist");
LOG.warn("index[{}] does not exist", indexId);
return false;
}
Tablet tablet = materializedIndex.getTablet(tabletId);
if (tablet == null) {
throw new MetaNotFoundException("tablet[" + tabletId + "] does not exist");
LOG.warn("tablet[{}] does not exist", tabletId);
return false;
}
// check replica id
long replicaId = backendTabletInfo.getReplicaId();
if (replicaId <= 0) {
throw new MetaNotFoundException("replica id is invalid");
LOG.warn("replica id is invalid");
return false;
}
long visibleVersion = partition.getVisibleVersion();
// check replica version
if (version < visibleVersion) {
throw new MetaNotFoundException("version is invalid. tablet[" + version + "]"
+ ", visible[" + visibleVersion + "]");
LOG.warn("version is invalid. tablet[{}], visible[{}]", version, visibleVersion);
return false;
}
// check schema hash
if (schemaHash != olapTable.getSchemaHashByIndexId(indexId)) {
throw new MetaNotFoundException("schema hash is diff[" + schemaHash + "-"
+ olapTable.getSchemaHashByIndexId(indexId) + "]");
LOG.warn("schema hash is diff[{}-{}]", schemaHash, olapTable.getSchemaHashByIndexId(indexId));
return false;
}
// colocate table will delete Replica in meta when balance
// but we need to rely on MetaNotFoundException to decide whether delete the tablet in backend
if (Env.getCurrentColocateIndex().isColocateTable(olapTable.getId())) {
return;
return true;
}
SystemInfoService infoService = Env.getCurrentSystemInfo();
@ -1130,11 +1137,32 @@ public class ReportHandler extends Daemon {
// just throw exception in this case
if (version > partition.getNextVersion() - 1) {
// this is a fatal error
throw new MetaNotFoundException("version is invalid. tablet[" + version + "]"
+ ", partition's max version [" + (partition.getNextVersion() - 1) + "]");
LOG.warn("version is invalid. tablet[{}], partition's max version [{}]", version,
partition.getNextVersion() - 1);
return false;
} else if (version < partition.getCommittedVersion()) {
lastFailedVersion = partition.getCommittedVersion();
}
if (backendTabletInfo.isSetCooldownMetaId()) {
// replica has cooldowned data
do {
if (backendTabletInfo.getReplicaId() == tablet.getCooldownConf().first) {
// this replica is true cooldown replica, so replica's cooldowned data must not be deleted
break;
}
if (backendTabletInfo.getReplicaId() != backendTabletInfo.getCooldownReplicaId()
&& Env.getCurrentInvertedIndex().getReplicas(tabletId).stream()
.anyMatch(r -> backendTabletInfo.getCooldownMetaId().equals(r.getCooldownMetaId()))) {
// this replica can not cooldown data, and shares same cooldowned data with others replica,
// so replica's cooldowned data must not be deleted
break;
}
LOG.warn("replica's cooldowned data may have been deleted");
return false;
} while (false);
}
// use replicaId reported by BE to maintain replica meta consistent between FE and BE
Replica replica = new Replica(replicaId, backendId, version, schemaHash,
dataSize, remoteDataSize, rowCount, ReplicaState.NORMAL,
@ -1151,17 +1179,18 @@ public class ReportHandler extends Daemon {
Env.getCurrentEnv().getEditLog().logAddReplica(info);
LOG.info("add replica[{}-{}] to catalog. backend[{}]", tabletId, replicaId, backendId);
return true;
} else {
// replica is enough. check if this tablet is already in meta
// (status changed between 'tabletReport()' and 'addReplica()')
for (Replica replica : tablet.getReplicas()) {
if (replica.getBackendId() == backendId) {
// tablet is already in meta. return true
return;
return true;
}
}
throw new MetaNotFoundException(
"replica is enough[" + tablet.getReplicas().size() + "-" + replicaAlloc.toCreateStmt() + "]");
LOG.warn("replica is enough[{}-{}]", tablet.getReplicas().size(), replicaAlloc.toCreateStmt());
return false;
}
} finally {
olapTable.writeUnlock();

View File

@ -201,11 +201,14 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return;
}
Tablet tablet;
int replicaNum;
try {
OlapTable table = (OlapTable) Env.getCurrentInternalCatalog().getDbNullable(tabletMeta.getDbId())
.getTable(tabletMeta.getTableId())
.get();
table.readLock();
replicaNum = table.getPartitionInfo().getReplicaAllocation(tabletMeta.getPartitionId())
.getTotalReplicaNum();
try {
tablet = table.getPartition(tabletMeta.getPartitionId()).getIndex(tabletMeta.getIndexId())
.getTablet(info.tablet_id);
@ -225,7 +228,17 @@ public class FrontendServiceImpl implements FrontendService.Iface {
}
// check cooldownMetaId of all replicas are the same
List<Replica> replicas = Env.getCurrentEnv().getTabletInvertedIndex().getReplicas(info.tablet_id);
// FIXME(plat1ko): We only delete remote files when tablet is under a stable state: enough replicas and
// all replicas are alive. Are these conditions really sufficient or necessary?
if (replicas.size() < replicaNum) {
LOG.info("num replicas are not enough, tablet={}", info.tablet_id);
return;
}
for (Replica replica : replicas) {
if (!replica.isAlive()) {
LOG.info("replica is not alive, tablet={}, replica={}", info.tablet_id, replica.getId());
return;
}
if (!info.cooldown_meta_id.equals(replica.getCooldownMetaId())) {
LOG.info("cooldown meta id are not same, tablet={}", info.tablet_id);
return;