[fix](journal) ensure txns are matched with the master before replaying (#28192)

This commit is contained in:
walter
2023-12-13 18:14:51 +08:00
committed by GitHub
parent 62859f38c1
commit ea832744df
2 changed files with 167 additions and 16 deletions

View File

@ -38,12 +38,16 @@ import com.sleepycat.je.DatabaseNotFoundException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.StatsConfig;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;
import com.sleepycat.je.rep.ReplicaConsistencyException;
import com.sleepycat.je.rep.ReplicaWriteException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.RollbackException;
import com.sleepycat.je.rep.TimeConsistencyPolicy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -52,6 +56,7 @@ import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/*
@ -124,10 +129,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
// id is the key
long id = nextJournalId.getAndIncrement();
Long idLong = id;
DatabaseEntry theKey = new DatabaseEntry();
TupleBinding<Long> idBinding = TupleBinding.getPrimitiveBinding(Long.class);
idBinding.objectToEntry(idLong, theKey);
DatabaseEntry theKey = idToKey(id);
// entity is the value
DataOutputBuffer buffer = new DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
@ -203,6 +205,13 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
return id;
}
private static DatabaseEntry idToKey(Long id) {
DatabaseEntry theKey = new DatabaseEntry();
TupleBinding<Long> idBinding = TupleBinding.getPrimitiveBinding(Long.class);
idBinding.objectToEntry(id, theKey);
return theKey;
}
@Override
public JournalEntity read(long journalId) {
List<Long> dbNames = getDatabaseNames();
@ -224,7 +233,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
}
JournalEntity ret = null;
Long key = new Long(journalId);
Long key = journalId;
DatabaseEntry theKey = new DatabaseEntry();
TupleBinding<Long> myBinding = TupleBinding.getPrimitiveBinding(Long.class);
myBinding.objectToEntry(key, theKey);
@ -270,7 +279,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
if (dbNames == null) {
return ret;
}
if (dbNames.size() == 0) {
if (dbNames.isEmpty()) {
return ret;
}
@ -278,9 +287,52 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
String dbName = dbNames.get(index).toString();
long dbNumberName = dbNames.get(index);
Database database = bdbEnvironment.openDatabase(dbName);
ret = dbNumberName + database.count() - 1;
if (!isReplicaTxnAreMatched(database, dbNumberName)) {
LOG.warn("The current replica hasn't synced up with the master, current db name: {}", dbNumberName);
if (index != 0) {
// Because roll journal occurs after write, the previous write must have
// been replicated to the majority, so it can be guaranteed that the database
// will not be rollback.
return dbNumberName - 1;
}
return -1;
}
return dbNumberName + database.count() - 1;
}
return ret;
// Whether the replica txns are matched with the master.
//
// BDBJE could throw InsufficientAcksException during post commit, at that time the
// log has persisted in disk. When the replica is restarted, we need to ensure that
// before replaying the journals, sync up txns with the new master in the cluster and
// rollback the txns that have been persisted but have not committed to the majority.
//
// See org.apache.doris.journal.bdbje.BDBEnvironmentTest#testReadTxnIsNotMatched for details.
private boolean isReplicaTxnAreMatched(Database database, Long id) {
// The time lag is set to Integer.MAX_VALUE if the replica haven't synced up
// with the master. By allowing a very large lag, we can detect whether the
// replica has synced up with the master.
TimeConsistencyPolicy consistencyPolicy = new TimeConsistencyPolicy(
1, TimeUnit.DAYS, 1, TimeUnit.MINUTES);
Transaction txn = null;
try {
TransactionConfig cfg = new TransactionConfig()
.setReadOnly(true)
.setReadCommitted(true)
.setConsistencyPolicy(consistencyPolicy);
txn = bdbEnvironment.getReplicatedEnvironment().beginTransaction(null, cfg);
DatabaseEntry key = idToKey(id);
database.get(txn, key, null, LockMode.READ_COMMITTED);
return true;
} catch (ReplicaConsistencyException e) {
return false;
} finally {
if (txn != null) {
txn.abort();
}
}
}
@Override
@ -293,7 +345,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
if (dbNames == null) {
return ret;
}
if (dbNames.size() == 0) {
if (dbNames.isEmpty()) {
return ret;
}
@ -350,7 +402,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
LOG.error("fail to get dbNames while open bdbje journal. will exit");
System.exit(-1);
}
if (dbNames.size() == 0) {
if (dbNames.isEmpty()) {
/*
* This is the very first time to open. Usually, we will open a new database
* named "1".