[improvement](checkpoint) checkpoint thread update tablet invert index (#25098)

This commit is contained in:
yujun
2023-10-11 18:18:03 +08:00
committed by GitHub
parent 73f632a4e3
commit 1e300d895d
6 changed files with 63 additions and 88 deletions

View File

@ -1010,11 +1010,9 @@ public class MaterializedViewHandler extends AlterHandler {
for (Partition partition : olapTable.getPartitions()) {
MaterializedIndex rollupIndex = partition.deleteRollupIndex(rollupIndexId);
if (!Env.isCheckpointThread()) {
// remove from inverted index
for (Tablet tablet : rollupIndex.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
}
// remove from inverted index
for (Tablet tablet : rollupIndex.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
}
}
String rollupIndexName = olapTable.getIndexNameById(rollupIndexId);

View File

@ -409,7 +409,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
RecycleTableInfo tableInfo = idToTable.remove(tableId);
idToRecycleTime.remove(tableId);
Table table = tableInfo.getTable();
if (table.getType() == TableType.OLAP && !Env.isCheckpointThread()) {
if (table.getType() == TableType.OLAP) {
Env.getCurrentEnv().onEraseOlapTable((OlapTable) table, true);
}
LOG.info("replay erase table[{}]", tableId);
@ -519,9 +519,7 @@ public class CatalogRecycleBin extends MasterDaemon implements Writable {
}
Partition partition = partitionInfo.getPartition();
if (!Env.isCheckpointThread()) {
Env.getCurrentEnv().onErasePartition(partition);
}
Env.getCurrentEnv().onErasePartition(partition);
LOG.info("replay erase partition[{}]", partitionId);
}

View File

@ -1957,10 +1957,8 @@ public class Env {
public long loadRecycleBin(DataInputStream dis, long checksum) throws IOException {
recycleBin.readFields(dis);
if (!isCheckpointThread()) {
// add tablet in Recycle bin to TabletInvertedIndex
recycleBin.addTabletToInvertedIndex();
}
// add tablet in Recycle bin to TabletInvertedIndex
recycleBin.addTabletToInvertedIndex();
// create DatabaseTransactionMgr for db in recycle bin.
// these dbs do not exist in `idToDb` of the catalog.
for (Long dbId : recycleBin.getAllDbIds()) {
@ -5446,8 +5444,8 @@ public class Env {
version, lastSuccessVersion, lastFailedVersion, updateTime);
getEditLog().logSetReplicaVersion(log);
}
LOG.info("set replica {} of tablet {} on backend {} as version {}, last success version {} ,"
+ ", last failed version {}, update time {}. is replay: {}", replica.getId(), tabletId,
LOG.info("set replica {} of tablet {} on backend {} as version {}, last success version {}, "
+ "last failed version {}, update time {}. is replay: {}", replica.getId(), tabletId,
backendId, version, lastSuccessVersion, lastFailedVersion, updateTime, isReplay);
} finally {
table.writeUnlock();
@ -5481,7 +5479,7 @@ public class Env {
}
}
if (!isReplay) {
if (!isReplay && !Env.isCheckpointThread()) {
// drop all replicas
AgentBatchTask batchTask = new AgentBatchTask();
for (Partition partition : olapTable.getAllPartitions()) {
@ -5505,6 +5503,7 @@ public class Env {
AgentTaskExecutor.submit(batchTask);
}
// TODO: does checkpoint need update colocate index ?
// colocation
Env.getCurrentColocateIndex().removeTable(olapTable.getId());
}

View File

@ -506,9 +506,6 @@ public class TabletInvertedIndex {
// always add tablet before adding replicas
public void addTablet(long tabletId, TabletMeta tabletMeta) {
if (Env.isCheckpointThread()) {
return;
}
long stamp = writeLock();
try {
if (tabletMetaMap.containsKey(tabletId)) {
@ -527,9 +524,6 @@ public class TabletInvertedIndex {
}
public void deleteTablet(long tabletId) {
if (Env.isCheckpointThread()) {
return;
}
long stamp = writeLock();
try {
Map<Long, Replica> replicas = replicaMetaTable.rowMap().remove(tabletId);
@ -555,9 +549,6 @@ public class TabletInvertedIndex {
}
public void addReplica(long tabletId, Replica replica) {
if (Env.isCheckpointThread()) {
return;
}
long stamp = writeLock();
try {
Preconditions.checkState(tabletMetaMap.containsKey(tabletId));
@ -572,9 +563,6 @@ public class TabletInvertedIndex {
}
public void deleteReplica(long tabletId, long backendId) {
if (Env.isCheckpointThread()) {
return;
}
long stamp = writeLock();
try {
Preconditions.checkState(tabletMetaMap.containsKey(tabletId));

View File

@ -77,7 +77,7 @@ public class TempPartitions implements Writable, GsonPostProcessable {
if (partition != null) {
idToPartition.remove(partition.getId());
nameToPartition.remove(partitionName);
if (!Env.isCheckpointThread() && needDropTablet) {
if (needDropTablet) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {

View File

@ -375,10 +375,6 @@ public class InternalCatalog implements CatalogIf<Database> {
* create the tablet inverted index from metadata.
*/
public void recreateTabletInvertIndex() {
if (Env.isCheckpointThread()) {
return;
}
// create inverted index
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Database db : this.fullNameToDb.values()) {
@ -1311,31 +1307,31 @@ public class InternalCatalog implements CatalogIf<Database> {
} catch (DdlException e) {
throw new MetaNotFoundException(e.getMessage());
}
if (!Env.isCheckpointThread()) {
// add to inverted index
if (table.isManagedTable()) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
OlapTable olapTable = (OlapTable) table;
long dbId = db.getId();
long tableId = table.getId();
for (Partition partition : olapTable.getAllPartitions()) {
long partitionId = partition.getId();
TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId)
.getStorageMedium();
for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) {
long indexId = mIndex.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : mIndex.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash,
medium);
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
invertedIndex.addReplica(tabletId, replica);
}
// add to inverted index
if (table.isManagedTable()) {
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
OlapTable olapTable = (OlapTable) table;
long dbId = db.getId();
long tableId = table.getId();
for (Partition partition : olapTable.getAllPartitions()) {
long partitionId = partition.getId();
TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId)
.getStorageMedium();
for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) {
long indexId = mIndex.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : mIndex.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash,
medium);
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
invertedIndex.addReplica(tabletId, replica);
}
}
} // end for partitions
}
} // end for partitions
if (!Env.isCheckpointThread()) {
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(dbId, olapTable, true);
}
}
@ -1710,20 +1706,18 @@ public class InternalCatalog implements CatalogIf<Database> {
partitionInfo.unprotectHandleNewSinglePartitionDesc(partition.getId(), info.isTempPartition(),
partitionItem, info.getDataProperty(), info.getReplicaAlloc(), info.isInMemory(), info.isMutable());
if (!Env.isCheckpointThread()) {
// add to inverted index
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
long indexId = index.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : index.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(info.getDbId(), info.getTableId(), partition.getId(),
index.getId(), schemaHash, info.getDataProperty().getStorageMedium());
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
invertedIndex.addReplica(tabletId, replica);
}
// add to inverted index
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
long indexId = index.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : index.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(info.getDbId(), info.getTableId(), partition.getId(),
index.getId(), schemaHash, info.getDataProperty().getStorageMedium());
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
invertedIndex.addReplica(tabletId, replica);
}
}
}
@ -3092,24 +3086,22 @@ public class InternalCatalog implements CatalogIf<Database> {
try {
truncateTableInternal(olapTable, info.getPartitions(), info.isEntireTable());
if (!Env.isCheckpointThread()) {
// add tablet to inverted index
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Partition partition : info.getPartitions()) {
long partitionId = partition.getId();
TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId)
.getStorageMedium();
for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) {
long indexId = mIndex.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : mIndex.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), partitionId, indexId,
schemaHash, medium);
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
invertedIndex.addReplica(tabletId, replica);
}
// add tablet to inverted index
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Partition partition : info.getPartitions()) {
long partitionId = partition.getId();
TStorageMedium medium = olapTable.getPartitionInfo().getDataProperty(partitionId)
.getStorageMedium();
for (MaterializedIndex mIndex : partition.getMaterializedIndices(IndexExtState.ALL)) {
long indexId = mIndex.getId();
int schemaHash = olapTable.getSchemaHashByIndexId(indexId);
for (Tablet tablet : mIndex.getTablets()) {
TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), partitionId, indexId,
schemaHash, medium);
long tabletId = tablet.getId();
invertedIndex.addTablet(tabletId, tabletMeta);
for (Replica replica : tablet.getReplicas()) {
invertedIndex.addReplica(tabletId, replica);
}
}
}