[fix](fe) when bdbje adding follower, master write op may failed. (#10376)

This commit is contained in:
Lei Zhang
2022-07-06 10:29:16 +08:00
committed by GitHub
parent 5f5e01b285
commit c936abd2a3
6 changed files with 140 additions and 53 deletions

View File

@ -2470,6 +2470,7 @@ public class Catalog {
if (role == FrontendNodeType.FOLLOWER || role == FrontendNodeType.REPLICA) {
bdbha.addHelperSocket(host, editLogPort);
helperNodes.add(Pair.create(host, editLogPort));
bdbha.addUnReadyElectableNode(nodeName, getFollowerCount());
}
bdbha.removeConflictNodeIfExist(host, editLogPort);
editLog.logAddFrontend(fe);
@ -2499,6 +2500,8 @@ public class Catalog {
if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) {
haProtocol.removeElectableNode(fe.getNodeName());
helperNodes.remove(Pair.create(host, port));
BDBHA ha = (BDBHA) haProtocol;
ha.removeUnReadyElectableNode(nodeName, getFollowerCount());
}
editLog.logRemoveFrontend(fe);
} finally {
@ -4931,4 +4934,14 @@ public class Catalog {
sb.append("\nCOMMENT '").append(table.getComment(true)).append("'");
}
}
public int getFollowerCount() {
int count = 0;
for (Frontend fe : frontends.values()) {
if (fe.getRole() == FrontendNodeType.FOLLOWER) {
count++;
}
}
return count;
}
}

View File

@ -1,17 +1,17 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
@ -28,7 +28,9 @@ import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.rep.MasterStateException;
import com.sleepycat.je.rep.MemberNotFoundException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.ReplicationGroup;
import com.sleepycat.je.rep.ReplicationMutableConfig;
import com.sleepycat.je.rep.ReplicationNode;
import com.sleepycat.je.rep.UnknownMasterException;
import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
@ -37,16 +39,30 @@ import org.apache.logging.log4j.Logger;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class BDBHA implements HAProtocol {
private static final Logger LOG = LogManager.getLogger(BDBHA.class);
private BDBEnvironment environment;
private String nodeName;
private final BDBEnvironment environment;
private final String nodeName;
private static final int RETRY_TIME = 3;
// Unstable node is a follower node that is joining the cluster but have not
// completed.
// We should record this kind node and set the bdb electable group size to
// (size_of_all_followers - size_of_unstable_nodes).
// Because once the handshake is successful, the joined node is put into the
// optional group,
// but it may take a little time for this node to replicate the historical data.
// This node will never respond to a new data replication until the historical
// replication is completed,
// and if the master cannot receive a quorum response, the write operation will
// fail.
private final Set<String> unReadyElectableNodes = new HashSet<>();
public BDBHA(BDBEnvironment env, String nodeName) {
this.environment = env;
this.nodeName = nodeName;
@ -124,7 +140,8 @@ public class BDBHA implements HAProtocol {
if (leaderIncluded) {
ret.add(replicationNode.getSocketAddress());
} else {
if (!replicationNode.getName().equals(replicationGroupAdmin.getMasterNodeName())) {
if (!replicationNode.getName()
.equals(replicationGroupAdmin.getMasterNodeName())) {
ret.add(replicationNode.getSocketAddress());
}
}
@ -204,17 +221,20 @@ public class BDBHA implements HAProtocol {
return true;
}
// When new Follower FE is added to the cluster, it should also be added to the helper sockets in
// When new Follower FE is added to the cluster, it should also be added to the
// helper sockets in
// ReplicationGroupAdmin, in order to fix the following case:
// 1. A Observer starts with helper of master FE.
// 2. Master FE is dead, new Master is elected.
// 3. Observer's helper sockets only contains the info of the dead master FE.
// So when you try to get frontends' info from this Observer, it will throw the Exception:
// "Could not determine master from helpers at:[/dead master FE host:port]"
// So when you try to get frontends' info from this Observer, it will throw the
// Exception:
// "Could not determine master from helpers at:[/dead master FE host:port]"
public void addHelperSocket(String ip, Integer port) {
ReplicationGroupAdmin replicationGroupAdmin = environment.getReplicationGroupAdmin();
Set<InetSocketAddress> helperSockets = Sets.newHashSet(replicationGroupAdmin.getHelperSockets());
InetSocketAddress newHelperSocket = new InetSocketAddress(ip, port);
Set<InetSocketAddress> helperSockets =
Sets.newHashSet(replicationGroupAdmin.getHelperSockets());
InetSocketAddress newHelperSocket = new InetSocketAddress(ip, port);
if (!helperSockets.contains(newHelperSocket)) {
helperSockets.add(newHelperSocket);
environment.setNewReplicationGroupAdmin(helperSockets);
@ -240,4 +260,29 @@ public class BDBHA implements HAProtocol {
removeElectableNode(conflictNode);
}
}
public synchronized void addUnReadyElectableNode(String nodeName, int totalFollowerCount) {
unReadyElectableNodes.add(nodeName);
ReplicatedEnvironment replicatedEnvironment = environment.getReplicatedEnvironment();
if (replicatedEnvironment != null) {
replicatedEnvironment.setRepMutableConfig(new ReplicationMutableConfig()
.setElectableGroupSizeOverride(totalFollowerCount - unReadyElectableNodes.size()));
}
}
public synchronized void removeUnReadyElectableNode(String nodeName, int totalFollowerCount) {
unReadyElectableNodes.remove(nodeName);
ReplicatedEnvironment replicatedEnvironment = environment.getReplicatedEnvironment();
if (replicatedEnvironment != null) {
if (unReadyElectableNodes.isEmpty()) {
// Setting ElectableGroupSizeOverride to 0 means remove this config,
// and bdb will use the normal electable group size.
replicatedEnvironment.setRepMutableConfig(
new ReplicationMutableConfig().setElectableGroupSizeOverride(0));
} else {
replicatedEnvironment.setRepMutableConfig(new ReplicationMutableConfig()
.setElectableGroupSizeOverride(totalFollowerCount - unReadyElectableNodes.size()));
}
}
}
}

View File

@ -163,7 +163,8 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
if (op == OperationType.OP_TIMESTAMP) {
/*
* Do not exit if the write operation is OP_TIMESTAMP.
* If all the followers exit except master, master should continue provide query service.
* If all the followers exit except master, master should continue provide query
* service.
* To prevent master exit, we should exempt OP_TIMESTAMP write
*/
nextJournalId.set(id);
@ -307,7 +308,8 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
}
}
// Open a new journal database or get last existing one as current journal database
// Open a new journal database or get last existing one as current journal
// database
List<Long> dbNames = null;
for (int i = 0; i < RETRY_TIME; i++) {
try {
@ -319,10 +321,11 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
}
if (dbNames.size() == 0) {
/*
* This is the very first time to open. Usually, we will open a new database named "1".
* But when we start cluster with an image file copied from other cluster,
* here we should open database with name image max journal id + 1.
* (default Catalog.getServingCatalog().getReplayedJournalId() is 0)
* This is the very first time to open. Usually, we will open a new database
* named "1".
* But when we start cluster with an image file copied from other cluster,
* here we should open database with name image max journal id + 1.
* (default Catalog.getServingCatalog().getReplayedJournalId() is 0)
*/
String dbName = Long.toString(Catalog.getServingCatalog().getReplayedJournalId() + 1);
LOG.info("the very first time to open bdb, dbname is {}", dbName);
@ -344,8 +347,10 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
private void reSetupBdbEnvironment(InsufficientLogException insufficientLogEx) {
LOG.warn("catch insufficient log exception. will recover and try again.", insufficientLogEx);
// Copy the missing log files from a member of the replication group who owns the files
// ATTN: here we use `getServingCatalog()`, because only serving catalog has helper nodes.
// Copy the missing log files from a member of the replication group who owns
// the files
// ATTN: here we use `getServingCatalog()`, because only serving catalog has
// helper nodes.
Pair<String, Integer> helperNode = Catalog.getServingCatalog().getHelperNode();
NetworkRestore restore = new NetworkRestore();
NetworkRestoreConfig config = new NetworkRestoreConfig();
@ -413,19 +418,23 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
return null;
}
// Open a new journal database or get last existing one as current journal database
List<Long> dbNames = null;
// Open a new journal database or get last existing one as current journal
// database
List<Long> dbNames = null;
for (int i = 0; i < RETRY_TIME; i++) {
try {
dbNames = bdbEnvironment.getDatabaseNames();
break;
} catch (InsufficientLogException insufficientLogEx) {
/*
* If this is not a checkpoint thread, which means this maybe the FE startup thread,
* or a replay thread. We will reopen bdbEnvironment for these 2 cases to get valid log
* If this is not a checkpoint thread, which means this maybe the FE startup
* thread,
* or a replay thread. We will reopen bdbEnvironment for these 2 cases to get
* valid log
* from helper nodes.
*
* The checkpoint thread will only run on Master FE. And Master FE should not encounter
* The checkpoint thread will only run on Master FE. And Master FE should not
* encounter
* these exception. So if it happens, throw exception out.
*/
if (!Catalog.isCheckpointThread()) {
@ -446,4 +455,8 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
return dbNames;
}
public BDBEnvironment getBDBEnvironment() {
return this.bdbEnvironment;
}
}

View File

@ -466,7 +466,7 @@ public class EditLog {
int version = Integer.parseInt(versionString);
if (version > FeConstants.meta_version) {
LOG.error("meta data version is out of date, image: {}. meta: {}."
+ "please update FeConstants.meta_version and restart.",
+ "please update FeConstants.meta_version and restart.",
MetaContext.get().getMetaVersion(), FeConstants.meta_version);
System.exit(-1);
}
@ -546,8 +546,8 @@ public class EditLog {
break;
}
case OperationType.OP_BATCH_REMOVE_TXNS: {
final BatchRemoveTransactionsOperation operation
= (BatchRemoveTransactionsOperation) journal.getData();
final BatchRemoveTransactionsOperation operation = (BatchRemoveTransactionsOperation) journal
.getData();
Catalog.getCurrentGlobalTransactionMgr().replayBatchRemoveTransactions(operation);
break;
}
@ -647,8 +647,8 @@ public class EditLog {
break;
}
case OperationType.OP_CREATE_LOAD_JOB: {
org.apache.doris.load.loadv2.LoadJob loadJob =
(org.apache.doris.load.loadv2.LoadJob) journal.getData();
org.apache.doris.load.loadv2.LoadJob loadJob = (org.apache.doris.load.loadv2.LoadJob) journal
.getData();
catalog.getLoadManager().replayCreateLoadJob(loadJob);
break;
}
@ -742,8 +742,8 @@ public class EditLog {
break;
}
case OperationType.OP_REPLACE_TEMP_PARTITION: {
ReplacePartitionOperationLog replaceTempPartitionLog
= (ReplacePartitionOperationLog) journal.getData();
ReplacePartitionOperationLog replaceTempPartitionLog = (ReplacePartitionOperationLog) journal
.getData();
catalog.replayReplaceTempPartition(replaceTempPartitionLog);
break;
}
@ -858,13 +858,19 @@ public class EditLog {
* for a table that no longer exists.
* 1. Thread 1: get TableA object
* 2. Thread 2: lock db and drop table and record edit log of the dropped TableA
* 3. Thread 1: lock table, modify table and record edit log of the modified TableA
* 3. Thread 1: lock table, modify table and record edit log of the modified
* TableA
* **The modified edit log is after the dropped edit log**
* Because the table has been dropped, the olapTable in here is null when the modified edit log is replayed.
* So in this case, we will ignore the edit log of the modified table after the table is dropped.
* This could make the meta inconsistent, for example, an edit log on a dropped table is ignored, but
* this table is restored later, so there may be an inconsistent situation between master and followers. We
* log a warning here to debug when happens. This could happen to other meta like DB.
* Because the table has been dropped, the olapTable in here is null when the
* modified edit log is replayed.
* So in this case, we will ignore the edit log of the modified table after the
* table is dropped.
* This could make the meta inconsistent, for example, an edit log on a dropped
* table is ignored, but
* this table is restored later, so there may be an inconsistent situation
* between master and followers. We
* log a warning here to debug when happens. This could happen to other meta
* like DB.
*/
LOG.warn("[INCONSISTENT META] replay failed {}: {}", journal, e.getMessage(), e);
} catch (Exception e) {
@ -911,7 +917,8 @@ public class EditLog {
try {
journal.write(op, writable);
} catch (Throwable t) {
// Throwable contains all Exception and Error, such as IOException and OutOfMemoryError
// Throwable contains all Exception and Error, such as IOException and
// OutOfMemoryError
LOG.error("Fatal Error : write stream Exception", t);
System.exit(-1);
}
@ -1461,4 +1468,8 @@ public class EditLog {
public void logDatasourceLog(short id, CatalogLog log) {
logEdit(id, log);
}
public Journal getJournal() {
return this.journal;
}
}

View File

@ -1,24 +1,26 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.system;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.ha.BDBHA;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.system.HeartbeatResponse.HbStatus;
@ -42,8 +44,7 @@ public class Frontend implements Writable {
private boolean isAlive = false;
public Frontend() {
}
public Frontend() {}
public Frontend(FrontendNodeType role, String nodeName, String host, int editLogPort) {
this.role = role;
@ -97,14 +98,18 @@ public class Frontend implements Writable {
}
/**
* handle Frontend's heartbeat response.
* Because the replayed journal id is very likely to be changed at each heartbeat response,
* so we simple return true if the heartbeat status is OK.
* But if heartbeat status is BAD, only return true if it is the first time to transfer from alive to dead.
* handle Frontend's heartbeat response. Because the replayed journal id is very likely to be
* changed at each heartbeat response, so we simple return true if the heartbeat status is OK.
* But if heartbeat status is BAD, only return true if it is the first time to transfer from
* alive to dead.
*/
public boolean handleHbResponse(FrontendHbResponse hbResponse) {
public boolean handleHbResponse(FrontendHbResponse hbResponse, boolean isReplay) {
boolean isChanged = false;
if (hbResponse.getStatus() == HbStatus.OK) {
if (!isAlive && !isReplay) {
BDBHA bdbha = (BDBHA) Catalog.getCurrentCatalog().getHaProtocol();
bdbha.removeUnReadyElectableNode(nodeName, Catalog.getCurrentCatalog().getFollowerCount());
}
isAlive = true;
version = hbResponse.getVersion();
queryPort = hbResponse.getQueryPort();

View File

@ -157,7 +157,7 @@ public class HeartbeatMgr extends MasterDaemon {
FrontendHbResponse hbResponse = (FrontendHbResponse) response;
Frontend fe = Catalog.getCurrentCatalog().getFeByName(hbResponse.getName());
if (fe != null) {
return fe.handleHbResponse(hbResponse);
return fe.handleHbResponse(hbResponse, isReplay);
}
break;
}