[fix](replica) Fix inconsistent replica id between FE and BE (#18688)
This commit is contained in:
@ -26,6 +26,7 @@ import org.apache.doris.thrift.TPartitionVersionInfo;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
import org.apache.doris.thrift.TTablet;
|
||||
import org.apache.doris.thrift.TTabletInfo;
|
||||
import org.apache.doris.thrift.TTabletMetaInfo;
|
||||
import org.apache.doris.transaction.GlobalTransactionMgr;
|
||||
import org.apache.doris.transaction.PartitionCommitInfo;
|
||||
import org.apache.doris.transaction.TableCommitInfo;
|
||||
@ -42,8 +43,6 @@ import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.Table;
|
||||
import com.google.common.collect.TreeMultimap;
|
||||
import org.apache.commons.lang3.tuple.ImmutableTriple;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -127,7 +126,7 @@ public class TabletInvertedIndex {
|
||||
Map<Long, ListMultimap<Long, TPartitionVersionInfo>> transactionsToPublish,
|
||||
ListMultimap<Long, Long> transactionsToClear,
|
||||
ListMultimap<Long, Long> tabletRecoveryMap,
|
||||
List<Triple<Long, Integer, Boolean>> tabletToInMemory,
|
||||
List<TTabletMetaInfo> tabletToUpdate,
|
||||
List<CooldownConf> cooldownConfToPush,
|
||||
List<CooldownConf> cooldownConfToUpdate) {
|
||||
long stamp = readLock();
|
||||
@ -148,11 +147,18 @@ public class TabletInvertedIndex {
|
||||
Replica replica = entry.getValue();
|
||||
tabletFoundInMeta.add(tabletId);
|
||||
TTabletInfo backendTabletInfo = backendTablet.getTabletInfos().get(0);
|
||||
TTabletMetaInfo tabletMetaInfo = null;
|
||||
if (backendTabletInfo.getReplicaId() != replica.getId()
|
||||
&& replica.getState() != ReplicaState.CLONE) {
|
||||
// Need to update replica id in BE
|
||||
tabletMetaInfo = new TTabletMetaInfo();
|
||||
tabletMetaInfo.setReplicaId(replica.getId());
|
||||
}
|
||||
if (partitionIdInMemorySet.contains(
|
||||
backendTabletInfo.getPartitionId()) != backendTabletInfo.isIsInMemory()) {
|
||||
synchronized (tabletToInMemory) {
|
||||
tabletToInMemory.add(new ImmutableTriple<>(tabletId,
|
||||
backendTabletInfo.getSchemaHash(), !backendTabletInfo.isIsInMemory()));
|
||||
if (tabletMetaInfo == null) {
|
||||
tabletMetaInfo = new TTabletMetaInfo();
|
||||
tabletMetaInfo.setIsInMemory(!backendTabletInfo.isIsInMemory());
|
||||
}
|
||||
}
|
||||
// 1. (intersection)
|
||||
@ -300,6 +306,12 @@ public class TabletInvertedIndex {
|
||||
if (backendTabletInfo.isSetVersionCount()) {
|
||||
replica.setVersionCount(backendTabletInfo.getVersionCount());
|
||||
}
|
||||
if (tabletMetaInfo != null) {
|
||||
tabletMetaInfo.setTabletId(tabletId);
|
||||
synchronized (tabletToUpdate) {
|
||||
tabletToUpdate.add(tabletMetaInfo);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// 2. (meta - be)
|
||||
// may need delete from meta
|
||||
@ -318,10 +330,10 @@ public class TabletInvertedIndex {
|
||||
long end = System.currentTimeMillis();
|
||||
LOG.info("finished to do tablet diff with backend[{}]. sync: {}."
|
||||
+ " metaDel: {}. foundInMeta: {}. migration: {}. "
|
||||
+ "found invalid transactions {}. found republish transactions {}. tabletInMemorySync: {}."
|
||||
+ "found invalid transactions {}. found republish transactions {}. tabletToUpdate: {}."
|
||||
+ " need recovery: {}. cost: {} ms", backendId, tabletSyncMap.size(),
|
||||
tabletDeleteFromMeta.size(), tabletFoundInMeta.size(), tabletMigrationMap.size(),
|
||||
transactionsToClear.size(), transactionsToPublish.size(), tabletToInMemory.size(),
|
||||
transactionsToClear.size(), transactionsToPublish.size(), tabletToUpdate.size(),
|
||||
tabletRecoveryMap.size(), (end - start));
|
||||
}
|
||||
|
||||
|
||||
@ -81,6 +81,7 @@ import org.apache.doris.thrift.TStorageResource;
|
||||
import org.apache.doris.thrift.TStorageType;
|
||||
import org.apache.doris.thrift.TTablet;
|
||||
import org.apache.doris.thrift.TTabletInfo;
|
||||
import org.apache.doris.thrift.TTabletMetaInfo;
|
||||
import org.apache.doris.thrift.TTaskType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -91,7 +92,6 @@ import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Queues;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.TException;
|
||||
@ -420,7 +420,7 @@ public class ReportHandler extends Daemon {
|
||||
// db id -> tablet id
|
||||
ListMultimap<Long, Long> tabletRecoveryMap = LinkedListMultimap.create();
|
||||
|
||||
List<Triple<Long, Integer, Boolean>> tabletToInMemory = Lists.newArrayList();
|
||||
List<TTabletMetaInfo> tabletToUpdate = Lists.newArrayList();
|
||||
|
||||
List<CooldownConf> cooldownConfToPush = new LinkedList<>();
|
||||
List<CooldownConf> cooldownConfToUpdate = new LinkedList<>();
|
||||
@ -434,7 +434,7 @@ public class ReportHandler extends Daemon {
|
||||
transactionsToPublish,
|
||||
transactionsToClear,
|
||||
tabletRecoveryMap,
|
||||
tabletToInMemory,
|
||||
tabletToUpdate,
|
||||
cooldownConfToPush,
|
||||
cooldownConfToUpdate);
|
||||
|
||||
@ -474,9 +474,9 @@ public class ReportHandler extends Daemon {
|
||||
handleRecoverTablet(tabletRecoveryMap, backendTablets, backendId);
|
||||
}
|
||||
|
||||
// 9. send set tablet in memory to be
|
||||
if (!tabletToInMemory.isEmpty()) {
|
||||
handleSetTabletInMemory(backendId, tabletToInMemory);
|
||||
// 9. send tablet meta to be for updating
|
||||
if (!tabletToUpdate.isEmpty()) {
|
||||
handleUpdateTabletMeta(backendId, tabletToUpdate);
|
||||
}
|
||||
|
||||
// handle cooldown conf
|
||||
@ -1032,10 +1032,14 @@ public class ReportHandler extends Daemon {
|
||||
}
|
||||
}
|
||||
|
||||
private static void handleSetTabletInMemory(long backendId, List<Triple<Long, Integer, Boolean>> tabletToInMemory) {
|
||||
private static void handleUpdateTabletMeta(long backendId, List<TTabletMetaInfo> tabletToUpdate) {
|
||||
final int updateBatchSize = 4096;
|
||||
AgentBatchTask batchTask = new AgentBatchTask();
|
||||
UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(backendId, tabletToInMemory);
|
||||
batchTask.addTask(task);
|
||||
for (int start = 0; start < tabletToUpdate.size(); start += updateBatchSize) {
|
||||
UpdateTabletMetaInfoTask task = new UpdateTabletMetaInfoTask(backendId,
|
||||
tabletToUpdate.subList(start, Math.min(start + updateBatchSize, tabletToUpdate.size())));
|
||||
batchTask.addTask(task);
|
||||
}
|
||||
AgentTaskExecutor.submit(batchTask);
|
||||
}
|
||||
|
||||
|
||||
@ -17,23 +17,19 @@
|
||||
|
||||
package org.apache.doris.task;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TabletMeta;
|
||||
import org.apache.doris.common.MarkedCountDownLatch;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.Status;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
import org.apache.doris.thrift.TTabletMetaInfo;
|
||||
import org.apache.doris.thrift.TTabletMetaType;
|
||||
import org.apache.doris.thrift.TTaskType;
|
||||
import org.apache.doris.thrift.TUpdateTabletMetaInfoReq;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.lang3.tuple.Triple;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
|
||||
public class UpdateTabletMetaInfoTask extends AgentTask {
|
||||
@ -45,36 +41,31 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
|
||||
|
||||
private Set<Pair<Long, Integer>> tableIdWithSchemaHash;
|
||||
private int inMemory = -1; // < 0 means not to update inMemory property, > 0 means true, == 0 means false
|
||||
private TTabletMetaType metaType;
|
||||
private long storagePolicyId = -1; // < 0 means not to update storage policy, == 0 means to reset storage policy
|
||||
// For ReportHandler
|
||||
private List<TTabletMetaInfo> tabletMetaInfos;
|
||||
|
||||
// <tablet id, tablet schema hash, tablet in memory>
|
||||
private List<Triple<Long, Integer, Boolean>> tabletToInMemory;
|
||||
|
||||
public UpdateTabletMetaInfoTask(long backendId, Set<Pair<Long, Integer>> tableIdWithSchemaHash,
|
||||
TTabletMetaType metaType) {
|
||||
public UpdateTabletMetaInfoTask(long backendId, Set<Pair<Long, Integer>> tableIdWithSchemaHash) {
|
||||
super(null, backendId, TTaskType.UPDATE_TABLET_META_INFO,
|
||||
-1L, -1L, -1L, -1L, -1L, tableIdWithSchemaHash.hashCode());
|
||||
-1L, -1L, -1L, -1L, -1L, Math.abs(new Random().nextLong()));
|
||||
this.tableIdWithSchemaHash = tableIdWithSchemaHash;
|
||||
this.metaType = metaType;
|
||||
}
|
||||
|
||||
public UpdateTabletMetaInfoTask(long backendId,
|
||||
Set<Pair<Long, Integer>> tableIdWithSchemaHash,
|
||||
int inMemory, long storagePolicyId,
|
||||
MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> latch) {
|
||||
this(backendId, tableIdWithSchemaHash, TTabletMetaType.INMEMORY);
|
||||
this(backendId, tableIdWithSchemaHash);
|
||||
this.storagePolicyId = storagePolicyId;
|
||||
this.inMemory = inMemory;
|
||||
this.latch = latch;
|
||||
}
|
||||
|
||||
public UpdateTabletMetaInfoTask(long backendId,
|
||||
List<Triple<Long, Integer, Boolean>> tabletToInMemory) {
|
||||
public UpdateTabletMetaInfoTask(long backendId, List<TTabletMetaInfo> tabletMetaInfos) {
|
||||
// For ReportHandler, never add to AgentTaskQueue, so signature is useless.
|
||||
super(null, backendId, TTaskType.UPDATE_TABLET_META_INFO,
|
||||
-1L, -1L, -1L, -1L, -1L, tabletToInMemory.hashCode());
|
||||
this.metaType = TTabletMetaType.INMEMORY;
|
||||
this.tabletToInMemory = tabletToInMemory;
|
||||
-1L, -1L, -1L, -1L, -1L);
|
||||
this.tabletMetaInfos = tabletMetaInfos;
|
||||
}
|
||||
|
||||
public void countDownLatch(long backendId, Set<Pair<Long, Integer>> tablets) {
|
||||
@ -100,64 +91,24 @@ public class UpdateTabletMetaInfoTask extends AgentTask {
|
||||
|
||||
public TUpdateTabletMetaInfoReq toThrift() {
|
||||
TUpdateTabletMetaInfoReq updateTabletMetaInfoReq = new TUpdateTabletMetaInfoReq();
|
||||
List<TTabletMetaInfo> metaInfos = Lists.newArrayList();
|
||||
switch (metaType) {
|
||||
case PARTITIONID: {
|
||||
int tabletEntryNum = 0;
|
||||
for (Pair<Long, Integer> pair : tableIdWithSchemaHash) {
|
||||
// add at most 10000 tablet meta during one sync to avoid too large task
|
||||
if (tabletEntryNum > 10000) {
|
||||
break;
|
||||
}
|
||||
TTabletMetaInfo metaInfo = new TTabletMetaInfo();
|
||||
metaInfo.setTabletId(pair.first);
|
||||
metaInfo.setSchemaHash(pair.second);
|
||||
TabletMeta tabletMeta = Env.getCurrentEnv()
|
||||
.getTabletInvertedIndex().getTabletMeta(pair.first);
|
||||
if (tabletMeta == null) {
|
||||
LOG.warn("could not find tablet [{}] in meta ignore it", pair.second);
|
||||
continue;
|
||||
}
|
||||
metaInfo.setPartitionId(tabletMeta.getPartitionId());
|
||||
metaInfo.setMetaType(metaType);
|
||||
metaInfos.add(metaInfo);
|
||||
++tabletEntryNum;
|
||||
if (latch != null) {
|
||||
// for schema change
|
||||
for (Pair<Long, Integer> pair : tableIdWithSchemaHash) {
|
||||
TTabletMetaInfo metaInfo = new TTabletMetaInfo();
|
||||
metaInfo.setTabletId(pair.first);
|
||||
metaInfo.setSchemaHash(pair.second);
|
||||
if (inMemory >= 0) {
|
||||
metaInfo.setIsInMemory(inMemory > 0);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case INMEMORY: {
|
||||
if (latch != null) {
|
||||
// for schema change
|
||||
for (Pair<Long, Integer> pair : tableIdWithSchemaHash) {
|
||||
TTabletMetaInfo metaInfo = new TTabletMetaInfo();
|
||||
metaInfo.setTabletId(pair.first);
|
||||
metaInfo.setSchemaHash(pair.second);
|
||||
if (inMemory >= 0) {
|
||||
metaInfo.setIsInMemory(inMemory > 0);
|
||||
}
|
||||
if (storagePolicyId >= 0) {
|
||||
metaInfo.setStoragePolicyId(storagePolicyId);
|
||||
}
|
||||
metaInfo.setMetaType(metaType);
|
||||
metaInfos.add(metaInfo);
|
||||
}
|
||||
} else {
|
||||
// for ReportHandler
|
||||
for (Triple<Long, Integer, Boolean> triple : tabletToInMemory) {
|
||||
TTabletMetaInfo metaInfo = new TTabletMetaInfo();
|
||||
metaInfo.setTabletId(triple.getLeft());
|
||||
metaInfo.setSchemaHash(triple.getMiddle());
|
||||
metaInfo.setIsInMemory(triple.getRight());
|
||||
metaInfo.setMetaType(metaType);
|
||||
metaInfos.add(metaInfo);
|
||||
}
|
||||
if (storagePolicyId >= 0) {
|
||||
metaInfo.setStoragePolicyId(storagePolicyId);
|
||||
}
|
||||
break;
|
||||
updateTabletMetaInfoReq.addToTabletMetaInfos(metaInfo);
|
||||
}
|
||||
default:
|
||||
break;
|
||||
} else {
|
||||
// for ReportHandler
|
||||
updateTabletMetaInfoReq.setTabletMetaInfos(tabletMetaInfos);
|
||||
}
|
||||
updateTabletMetaInfoReq.setTabletMetaInfos(metaInfos);
|
||||
return updateTabletMetaInfoReq;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user