From 16fa6a1615406890356e1adae69eed6e0909e492 Mon Sep 17 00:00:00 2001 From: Lei Zhang <27994433+SWJTU-ZhangLei@users.noreply.github.com> Date: Sun, 3 Dec 2023 23:10:07 +0800 Subject: [PATCH] [refact](bdbje) Refact `BDBEnvironment` and `BDBJEJournal` (#27778) * Add more ut about "org.apache.doris.journal.bdbje" * Make tiny refactor about "org.apache.doris.journal.bdbje" --- .../main/java/org/apache/doris/DorisFE.java | 2 +- .../doris/ha/BDBStateChangeListener.java | 6 +- .../doris/journal/bdbje/BDBDebugger.java | 41 +- .../doris/journal/bdbje/BDBEnvironment.java | 47 +-- .../doris/journal/bdbje/BDBJEJournal.java | 23 +- .../doris/journal/bdbje/BDBDebuggerTest.java | 180 ++++++++ .../journal/bdbje/BDBEnvironmentTest.java | 396 +++++++++++++++++- .../doris/journal/bdbje/BDBJEJournalTest.java | 16 +- .../journal/bdbje/BDBJournalCursorTest.java | 118 ++++++ .../bdbje}/BDBToolOptionsTest.java | 4 +- .../{bdb => journal/bdbje}/BDBToolTest.java | 4 +- .../doris/journal/bdbje/TimestampTest.java | 90 ++++ 12 files changed, 836 insertions(+), 91 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBDebuggerTest.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJournalCursorTest.java rename fe/fe-core/src/test/java/org/apache/doris/{bdb => journal/bdbje}/BDBToolOptionsTest.java (93%) rename fe/fe-core/src/test/java/org/apache/doris/{bdb => journal/bdbje}/BDBToolTest.java (97%) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/TimestampTest.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java index 224d82c401..dfadde6626 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java @@ -152,7 +152,7 @@ public class DorisFE { if (Config.enable_bdbje_debug_mode) { // Start in BDB Debug mode - BDBDebugger.get().startDebugMode(dorisHomeDir); + BDBDebugger.get().startDebugMode(Config.meta_dir + "/bdb"); return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/ha/BDBStateChangeListener.java b/fe/fe-core/src/main/java/org/apache/doris/ha/BDBStateChangeListener.java index d643646fc6..2ff66a5aad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/ha/BDBStateChangeListener.java +++ b/fe/fe-core/src/main/java/org/apache/doris/ha/BDBStateChangeListener.java @@ -28,8 +28,10 @@ import org.apache.logging.log4j.Logger; public class BDBStateChangeListener implements StateChangeListener { public static final Logger LOG = LogManager.getLogger(BDBStateChangeListener.class); + private final boolean isElectable; - public BDBStateChangeListener() { + public BDBStateChangeListener(boolean isElectable) { + this.isElectable = isElectable; } @Override @@ -41,7 +43,7 @@ public class BDBStateChangeListener implements StateChangeListener { break; } case REPLICA: { - if (Env.getCurrentEnv().isElectable()) { + if (isElectable) { newType = FrontendNodeType.FOLLOWER; } else { newType = FrontendNodeType.OBSERVER; diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java index ae48526515..97009edff1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBDebugger.java @@ -58,6 +58,10 @@ public class BDBDebugger { private static final Logger LOG = LogManager.getLogger(BDBDebugger.class); private BDBDebugEnv debugEnv; + private static class SingletonHolder { + private static final BDBDebugger INSTANCE = new BDBDebugger(); + } + public static BDBDebugger get() { return SingletonHolder.INSTANCE; } @@ -65,10 +69,10 @@ public class BDBDebugger { /** * Start in BDB Debug mode. */ - public void startDebugMode(String dorisHomeDir) { + public void startDebugMode(String bdbHome) { try { - initDebugEnv(); - startService(dorisHomeDir); + initDebugEnv(bdbHome); + startService(); while (true) { Thread.sleep(2000); } @@ -78,8 +82,13 @@ public class BDBDebugger { } } + private void initDebugEnv(String bdbHome) throws BDBDebugException { + debugEnv = new BDBDebugEnv(bdbHome); + debugEnv.init(); + } + // Only start MySQL and HttpServer - private void startService(String dorisHomeDir) throws Exception { + private void startService() throws Exception { // HTTP server HttpServer httpServer = new HttpServer(); @@ -94,19 +103,10 @@ public class BDBDebugger { ThreadPoolManager.registerAllThreadPoolMetric(); } - private void initDebugEnv() throws BDBDebugException { - debugEnv = new BDBDebugEnv(Config.meta_dir + "/bdb/"); - debugEnv.init(); - } - public BDBDebugEnv getEnv() { return debugEnv; } - private static class SingletonHolder { - private static final BDBDebugger INSTANCE = new BDBDebugger(); - } - /** * A wrapper class of the BDBJE environment, used to obtain information in bdbje. */ @@ -144,7 +144,9 @@ public class BDBDebugger { dbConfig.setAllowCreate(false); dbConfig.setReadOnly(true); Database db = env.openDatabase(null, dbName, dbConfig); - return db.count(); + long journalNumber = db.count(); + db.close(); + return journalNumber; } /** @@ -172,6 +174,8 @@ public class BDBDebugger { Long id = idBinding.entryToObject(key); journalIds.add(id); } + cursor.close(); + db.close(); } catch (Exception e) { LOG.warn("failed to get journal ids of {}", dbName, e); throw new BDBDebugException("failed to get journal ids of database " + dbName, e); @@ -205,6 +209,7 @@ public class BDBDebugger { // get the journal OperationStatus status = db.get(null, key, value, LockMode.READ_COMMITTED); + db.close(); if (status == OperationStatus.SUCCESS) { byte[] retData = value.getData(); DataInputStream in = new DataInputStream(new ByteArrayInputStream(retData)); @@ -223,6 +228,14 @@ public class BDBDebugger { MetaContext.remove(); return entityWrapper; } + + public void close() { + try { + env.close(); + } catch (Exception e) { + LOG.warn("exception:", e); + } + } } public static class JournalEntityWrapper { diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java index f2d1c1825c..7674cf6f59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java @@ -19,7 +19,6 @@ package org.apache.doris.journal.bdbje; import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; -import org.apache.doris.common.FeConstants; import org.apache.doris.ha.BDBHA; import org.apache.doris.ha.BDBStateChangeListener; import org.apache.doris.ha.FrontendNodeType; @@ -72,10 +71,8 @@ public class BDBEnvironment { private static final int MEMORY_CACHE_PERCENT = 20; private static final List BDBJE_LOG_LEVEL = ImmutableList.of("OFF", "SEVERE", "WARNING", "INFO", "CONFIG", "FINE", "FINER", "FINEST", "ALL"); - public static final String PALO_JOURNAL_GROUP = "PALO_JOURNAL_GROUP"; - private ReplicatedEnvironment replicatedEnvironment; private EnvironmentConfig environmentConfig; private ReplicationConfig replicationConfig; @@ -84,15 +81,19 @@ public class BDBEnvironment { private ReentrantReadWriteLock lock; private List openedDatabases; - public BDBEnvironment() { + private final boolean isElectable; + private final boolean metadataFailureRecovery; + + public BDBEnvironment(boolean isElectable, boolean metadataFailureRecovery) { openedDatabases = new ArrayList(); this.lock = new ReentrantReadWriteLock(true); + this.isElectable = isElectable; + this.metadataFailureRecovery = metadataFailureRecovery; } // The setup() method opens the environment and database public void setup(File envHome, String selfNodeName, String selfNodeHostPort, - String helperHostPort, boolean isElectable) { - boolean metadataFailureRecovery = null != System.getProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY); + String helperHostPort) { // Almost never used, just in case the master can not restart if (metadataFailureRecovery) { if (!isElectable) { @@ -112,7 +113,7 @@ public class BDBEnvironment { replicationConfig.setNodeHostPort(selfNodeHostPort); replicationConfig.setHelperHosts(helperHostPort); replicationConfig.setGroupName(PALO_JOURNAL_GROUP); - replicationConfig.setConfigParam(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "10"); + replicationConfig.setConfigParam(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "10 s"); replicationConfig.setMaxClockDelta(Config.max_bdbje_clock_delta_ms, TimeUnit.MILLISECONDS); replicationConfig.setConfigParam(ReplicationConfig.TXN_ROLLBACK_LIMIT, String.valueOf(Config.txn_rollback_limit)); @@ -170,6 +171,9 @@ public class BDBEnvironment { // open environment and epochDB for (int i = 0; i < RETRY_TIME; i++) { try { + if (replicatedEnvironment != null) { + this.close(); + } // open the environment replicatedEnvironment = new ReplicatedEnvironment(envHome, replicationConfig, environmentConfig); @@ -178,12 +182,13 @@ public class BDBEnvironment { Env.getCurrentEnv().setHaProtocol(protocol); // start state change listener - StateChangeListener listener = new BDBStateChangeListener(); + StateChangeListener listener = new BDBStateChangeListener(isElectable); replicatedEnvironment.setStateChangeListener(listener); // open epochDB. the first parameter null means auto-commit epochDB = replicatedEnvironment.openDatabase(null, "epochDB", dbConfig); break; } catch (InsufficientLogException insufficientLogEx) { + LOG.info("i:{} insufficientLogEx:", i, insufficientLogEx); NetworkRestore restore = new NetworkRestore(); NetworkRestoreConfig config = new NetworkRestoreConfig(); config.setRetainLogFiles(false); // delete obsolete log files. @@ -193,6 +198,7 @@ public class BDBEnvironment { // default selection of providers is not suitable. restore.execute(insufficientLogEx, config); } catch (DatabaseException e) { + LOG.info("i:{} exception:", i, e); if (i < RETRY_TIME - 1) { try { Thread.sleep(5 * 1000); @@ -381,6 +387,7 @@ public class BDBEnvironment { if (epochDB != null) { try { epochDB.close(); + epochDB = null; } catch (DatabaseException exception) { LOG.error("Error closing db {} will exit", epochDB.getDatabaseName(), exception); } @@ -390,19 +397,7 @@ public class BDBEnvironment { try { // Finally, close the store and environment. replicatedEnvironment.close(); - } catch (DatabaseException exception) { - LOG.error("Error closing replicatedEnvironment", exception); - } - } - } - - // Close environment - public void closeReplicatedEnvironment() { - if (replicatedEnvironment != null) { - try { - openedDatabases.clear(); - // Finally, close the store and environment. - replicatedEnvironment.close(); + replicatedEnvironment = null; } catch (DatabaseException exception) { LOG.error("Error closing replicatedEnvironment", exception); } @@ -413,18 +408,22 @@ public class BDBEnvironment { public void openReplicatedEnvironment(File envHome) { for (int i = 0; i < RETRY_TIME; i++) { try { + if (replicatedEnvironment != null) { + this.close(); + } // open the environment replicatedEnvironment = new ReplicatedEnvironment(envHome, replicationConfig, environmentConfig); // start state change listener - StateChangeListener listener = new BDBStateChangeListener(); + StateChangeListener listener = new BDBStateChangeListener(isElectable); replicatedEnvironment.setStateChangeListener(listener); // open epochDB. the first parameter null means auto-commit epochDB = replicatedEnvironment.openDatabase(null, "epochDB", dbConfig); break; } catch (DatabaseException e) { + LOG.info("i:{} exception:", i, e); if (i < RETRY_TIME - 1) { try { Thread.sleep(5 * 1000); @@ -439,7 +438,7 @@ public class BDBEnvironment { } } - private SyncPolicy getSyncPolicy(String policy) { + private static SyncPolicy getSyncPolicy(String policy) { if (policy.equalsIgnoreCase("SYNC")) { return Durability.SyncPolicy.SYNC; } @@ -450,7 +449,7 @@ public class BDBEnvironment { return Durability.SyncPolicy.WRITE_NO_SYNC; } - private ReplicaAckPolicy getAckPolicy(String policy) { + private static ReplicaAckPolicy getAckPolicy(String policy) { if (policy.equalsIgnoreCase("ALL")) { return Durability.ReplicaAckPolicy.ALL; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java index 395f5b0467..01db7f53ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java @@ -18,6 +18,7 @@ package org.apache.doris.journal.bdbje; import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeConstants; import org.apache.doris.common.io.DataOutputBuffer; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.NetUtils; @@ -75,14 +76,6 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B private AtomicLong nextJournalId = new AtomicLong(1); public BDBJEJournal(String nodeName) { - initBDBEnv(nodeName); - } - - /* - * Initialize bdb environment. - * node name is ip_port (the port is edit_log_port) - */ - private void initBDBEnv(String nodeName) { environmentPath = Env.getServingEnv().getBdbDir(); HostInfo selfNode = Env.getServingEnv().getSelfNode(); selfNodeName = nodeName; @@ -327,13 +320,14 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B public synchronized void open() { if (bdbEnvironment == null) { File dbEnv = new File(environmentPath); - bdbEnvironment = new BDBEnvironment(); + + boolean metadataFailureRecovery = null != System.getProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY); + bdbEnvironment = new BDBEnvironment(Env.getServingEnv().isElectable(), metadataFailureRecovery); HostInfo helperNode = Env.getServingEnv().getHelperNode(); String helperHostPort = NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), helperNode.getPort()); try { - bdbEnvironment.setup(dbEnv, selfNodeName, selfNodeHostPort, helperHostPort, - Env.getServingEnv().isElectable()); + bdbEnvironment.setup(dbEnv, selfNodeName, selfNodeHostPort, helperHostPort); } catch (Exception e) { if (e instanceof DatabaseNotFoundException) { LOG.error("It is not allowed to set metadata_failure_recovery" @@ -380,7 +374,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B reSetupBdbEnvironment(insufficientLogEx); } catch (RollbackException rollbackEx) { LOG.warn("catch rollback log exception. will reopen the ReplicatedEnvironment.", rollbackEx); - bdbEnvironment.closeReplicatedEnvironment(); + bdbEnvironment.close(); bdbEnvironment.openReplicatedEnvironment(new File(environmentPath)); } } @@ -414,8 +408,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B bdbEnvironment.close(); bdbEnvironment.setup(new File(environmentPath), selfNodeName, selfNodeHostPort, - NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), helperNode.getPort()), - Env.getServingEnv().isElectable()); + NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), helperNode.getPort())); } @Override @@ -506,7 +499,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B } catch (RollbackException rollbackEx) { if (!Env.isCheckpointThread()) { LOG.warn("catch rollback log exception. will reopen the ReplicatedEnvironment.", rollbackEx); - bdbEnvironment.closeReplicatedEnvironment(); + bdbEnvironment.close(); bdbEnvironment.openReplicatedEnvironment(new File(environmentPath)); } else { throw rollbackEx; diff --git a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBDebuggerTest.java b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBDebuggerTest.java new file mode 100644 index 0000000000..27c7efc430 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBDebuggerTest.java @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.journal.bdbje; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.journal.JournalEntity; +import org.apache.doris.persist.OperationType; +import org.apache.doris.system.SystemInfoService.HostInfo; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import mockit.Mock; +import mockit.MockUp; +import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.RepeatedTest; + +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.net.DatagramSocket; +import java.net.ServerSocket; +import java.net.SocketException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; + +public class BDBDebuggerTest { + private static final Logger LOG = LogManager.getLogger(BDBDebuggerTest.class); + private static List tmpDirs = new ArrayList<>(); + + public static File createTmpDir() throws Exception { + String dorisHome = System.getenv("DORIS_HOME"); + if (Strings.isNullOrEmpty(dorisHome)) { + dorisHome = Files.createTempDirectory("DORIS_HOME").toAbsolutePath().toString(); + } + Path mockDir = Paths.get(dorisHome, "fe", "mocked"); + if (!Files.exists(mockDir)) { + Files.createDirectories(mockDir); + } + Preconditions.checkArgument(!Strings.isNullOrEmpty(dorisHome)); + File dir = Files.createTempDirectory(Paths.get(dorisHome, "fe", "mocked"), "BDBJEJournalTest").toFile(); + LOG.debug("createTmpDir path {}", dir.getAbsolutePath()); + tmpDirs.add(dir); + return dir; + } + + @AfterAll + public static void cleanUp() throws Exception { + for (File dir : tmpDirs) { + LOG.debug("deleteTmpDir path {}", dir.getAbsolutePath()); + FileUtils.deleteDirectory(dir); + } + } + + private int findValidPort() { + int port = 0; + for (int i = 0; i < 65535; i++) { + try (ServerSocket socket = new ServerSocket(0)) { + socket.setReuseAddress(true); + port = socket.getLocalPort(); + try (DatagramSocket datagramSocket = new DatagramSocket(port)) { + datagramSocket.setReuseAddress(true); + break; + } catch (SocketException e) { + LOG.info("The port {} is invalid and try another port", port); + } + } catch (IOException e) { + throw new IllegalStateException("Could not find a free TCP/IP port"); + } + } + return port; + } + + @RepeatedTest(1) + public void testNormal() throws Exception { + int port = findValidPort(); + Preconditions.checkArgument(((port > 0) && (port < 65535))); + String nodeName = Env.genFeNodeName("127.0.0.1", port, false); + long replayedJournalId = 0; + File tmpDir = createTmpDir(); + new MockUp() { + HostInfo selfNode = new HostInfo("127.0.0.1", port); + @Mock + public String getBdbDir() { + return tmpDir.getAbsolutePath(); + } + + @Mock + public HostInfo getSelfNode() { + return this.selfNode; + } + + @Mock + public HostInfo getHelperNode() { + return this.selfNode; + } + + @Mock + public boolean isElectable() { + return true; + } + + @Mock + public long getReplayedJournalId() { + return replayedJournalId; + } + }; + + LOG.info("BdbDir:{}, selfNode:{}, nodeName:{}", Env.getServingEnv().getBdbDir(), + Env.getServingEnv().getBdbDir(), nodeName); + Assertions.assertEquals(tmpDir.getAbsolutePath(), Env.getServingEnv().getBdbDir()); + BDBJEJournal journal = new BDBJEJournal(nodeName); + journal.open(); + // BDBEnvrinment need several seconds election from unknown to master + for (int i = 0; i < 10; i++) { + if (journal.getBDBEnvironment().getReplicatedEnvironment().getState() + .equals(ReplicatedEnvironment.State.MASTER)) { + break; + } + Thread.sleep(1000); + } + Assertions.assertEquals(ReplicatedEnvironment.State.MASTER, + journal.getBDBEnvironment().getReplicatedEnvironment().getState()); + + journal.rollJournal(); + for (int i = 0; i < 10; i++) { + String data = "OperationType.OP_TIMESTAMP"; + Writable writable = new Writable() { + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, data); + } + }; + journal.write(OperationType.OP_TIMESTAMP, writable); + } + JournalEntity journalEntity = journal.read(1); + Assertions.assertEquals(OperationType.OP_TIMESTAMP, journalEntity.getOpCode()); + journal.close(); + + Deencapsulation.invoke(BDBDebugger.get(), "initDebugEnv", tmpDir.getAbsolutePath()); + // BDBDebugger.BDBDebugEnv bdbDebugEnv = new BDBDebugger.BDBDebugEnv(tmpDir.getAbsolutePath()); + // bdbDebugEnv.init(); + BDBDebugger.BDBDebugEnv bdbDebugEnv = BDBDebugger.get().getEnv(); + + LOG.info("{}|{}|{}", bdbDebugEnv.listDbNames(), bdbDebugEnv.getJournalIds("1"), + bdbDebugEnv.getJournalNumber("1")); + Assertions.assertEquals(2, bdbDebugEnv.listDbNames().size()); + Assertions.assertEquals(10, bdbDebugEnv.getJournalIds("1").size()); + Assertions.assertEquals(10, bdbDebugEnv.getJournalNumber("1")); + BDBDebugger.JournalEntityWrapper entityWrapper = bdbDebugEnv.getJournalEntity("1", 5L); + Assertions.assertEquals(5, entityWrapper.journalId); + Assertions.assertEquals(OperationType.OP_TIMESTAMP, entityWrapper.entity.getOpCode()); + bdbDebugEnv.close(); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java index 98f1fc57f9..09b10b604b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java @@ -18,19 +18,31 @@ package org.apache.doris.journal.bdbje; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.ha.FrontendNodeType; +import org.apache.doris.system.Frontend; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseEntry; +import com.sleepycat.je.Durability; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.util.ReplicationGroupAdmin; +import mockit.Mock; +import mockit.MockUp; import org.apache.commons.io.FileUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.RepeatedTest; +// import org.junit.jupiter.api.Test; import java.io.File; import java.io.IOException; @@ -38,29 +50,44 @@ import java.net.DatagramSocket; import java.net.ServerSocket; import java.net.SocketException; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.security.SecureRandom; import java.util.ArrayList; import java.util.List; +import java.util.UUID; public class BDBEnvironmentTest { private static final Logger LOG = LogManager.getLogger(BDBEnvironmentTest.class); - private static List tmpDirs = new ArrayList<>(); + private static List tmpDirs = new ArrayList<>(); - public static File createTmpDir() throws Exception { + public static String createTmpDir() throws Exception { String dorisHome = System.getenv("DORIS_HOME"); + if (Strings.isNullOrEmpty(dorisHome)) { + dorisHome = Files.createTempDirectory("DORIS_HOME").toAbsolutePath().toString(); + } Preconditions.checkArgument(!Strings.isNullOrEmpty(dorisHome)); - File dir = Files.createTempDirectory(Paths.get(dorisHome, "fe", "mocked"), "BDBEnvironmentTest").toFile(); + Path mockDir = Paths.get(dorisHome, "fe", "mocked"); + if (!Files.exists(mockDir)) { + Files.createDirectories(mockDir); + } + UUID uuid = UUID.randomUUID(); + File dir = Files.createDirectories(Paths.get(dorisHome, "fe", "mocked", "BDBEnvironmentTest-" + uuid.toString())).toFile(); LOG.debug("createTmpDir path {}", dir.getAbsolutePath()); - tmpDirs.add(dir); - return dir; + tmpDirs.add(dir.getAbsolutePath()); + return dir.getAbsolutePath(); + } + + @BeforeAll + public static void startUp() throws Exception { + Config.bdbje_file_logging_level = "ALL"; } @AfterAll public static void cleanUp() throws Exception { - for (File dir : tmpDirs) { - LOG.info("deleteTmpDir path {}", dir.getAbsolutePath()); - FileUtils.deleteDirectory(dir); + for (String dir : tmpDirs) { + LOG.debug("deleteTmpDir path {}", dir); + FileUtils.deleteDirectory(new File(dir)); } } @@ -90,15 +117,16 @@ public class BDBEnvironmentTest { return byteArray; } - @Test + // @Test + @RepeatedTest(1) public void testSetup() throws Exception { int port = findValidPort(); String selfNodeName = Env.genFeNodeName("127.0.0.1", port, false); String selfNodeHostPort = "127.0.0.1:" + port; LOG.debug("selfNodeName:{}, selfNodeHostPort:{}", selfNodeName, selfNodeHostPort); - BDBEnvironment bdbEnvironment = new BDBEnvironment(); - bdbEnvironment.setup(createTmpDir(), selfNodeName, selfNodeHostPort, selfNodeHostPort, true); + BDBEnvironment bdbEnvironment = new BDBEnvironment(true, false); + bdbEnvironment.setup(new File(createTmpDir()), selfNodeName, selfNodeHostPort, selfNodeHostPort); String dbName = "testEnvironment"; Database db = bdbEnvironment.openDatabase(dbName); @@ -113,6 +141,8 @@ public class BDBEnvironmentTest { // Remove database bdbEnvironment.removeDatabase(dbName); + // Rmove database twice will get DatabaseNotFoundException + bdbEnvironment.removeDatabase(dbName); Exception exception = Assertions.assertThrows(IllegalStateException.class, () -> { db.put(null, key, value); }); @@ -122,6 +152,34 @@ public class BDBEnvironmentTest { LOG.debug("exception:", exception); Assertions.assertTrue(actualMessage.contains(expectedMessage)); + Database epochDb = bdbEnvironment.getEpochDB(); + Assertions.assertEquals(OperationStatus.SUCCESS, epochDb.put(null, key, value)); + DatabaseEntry readValue2 = new DatabaseEntry(); + Assertions.assertEquals(OperationStatus.SUCCESS, epochDb.get(null, key, readValue2, LockMode.READ_COMMITTED)); + Assertions.assertEquals(new String(value.getData()), new String(readValue2.getData())); + + new MockUp() { + int i = 0; + @Mock + public List getFrontends(FrontendNodeType nodeType) { + ArrayList frontends = new ArrayList(); + if (i == 0) { + i++; + return frontends; + } + Frontend frontend = new Frontend(FrontendNodeType.FOLLOWER, selfNodeName, + "127.0.0.1", port); + frontend.setIsAlive(true); + frontends.add(frontend); + return frontends; + } + }; + + ReplicationGroupAdmin replicationGroupAdmin = bdbEnvironment.getReplicationGroupAdmin(); + Assertions.assertNull(replicationGroupAdmin); + replicationGroupAdmin = bdbEnvironment.getReplicationGroupAdmin(); + Assertions.assertNotNull(replicationGroupAdmin); + bdbEnvironment.close(); exception = Assertions.assertThrows(IllegalStateException.class, () -> { db.put(null, key, value); @@ -132,20 +190,101 @@ public class BDBEnvironmentTest { Assertions.assertTrue(actualMessage.contains(expectedMessage)); } + // @Test + @RepeatedTest(1) + public void testSetupTwice() throws Exception { + int port = findValidPort(); + String selfNodeName = Env.genFeNodeName("127.0.0.1", port, false); + String selfNodeHostPort = "127.0.0.1:" + port; + File homeFile = new File(createTmpDir()); + BDBEnvironment bdbEnvironment = new BDBEnvironment(true, false); + bdbEnvironment.setup(homeFile, selfNodeName, selfNodeHostPort, selfNodeHostPort); + + bdbEnvironment.setup(homeFile, selfNodeName, selfNodeHostPort, selfNodeHostPort); + bdbEnvironment.close(); + } + + // @Test + @RepeatedTest(1) + public void testMetadataRecovery() throws Exception { + int port = findValidPort(); + String selfNodeName = Env.genFeNodeName("127.0.0.1", port, false); + String selfNodeHostPort = "127.0.0.1:" + port; + + File recoveryFile = new File(createTmpDir()); + BDBEnvironment bdbEnvironment = new BDBEnvironment(true, false); + bdbEnvironment.setup(recoveryFile, selfNodeName, selfNodeHostPort, selfNodeHostPort); + + String dbName = "testMetadataRecovery"; + Database db = bdbEnvironment.openDatabase(dbName); + DatabaseEntry key = new DatabaseEntry(randomBytes()); + DatabaseEntry value = new DatabaseEntry(randomBytes()); + + Assertions.assertEquals(OperationStatus.SUCCESS, db.put(null, key, value)); + + DatabaseEntry readValue = new DatabaseEntry(); + Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key, readValue, LockMode.READ_COMMITTED)); + Assertions.assertEquals(new String(value.getData()), new String(readValue.getData())); + bdbEnvironment.close(); + + // recovery mode + BDBEnvironment bdbEnvironment2 = new BDBEnvironment(true, true); + bdbEnvironment2.setup(recoveryFile, selfNodeName, selfNodeHostPort, selfNodeHostPort); + Database db2 = bdbEnvironment2.openDatabase(dbName); + + DatabaseEntry readValue2 = new DatabaseEntry(); + Assertions.assertEquals(OperationStatus.SUCCESS, db2.get(null, key, readValue2, LockMode.READ_COMMITTED)); + Assertions.assertEquals(new String(value.getData()), new String(readValue2.getData())); + bdbEnvironment2.close(); + } + + // @Test + @RepeatedTest(1) + public void testOpenReplicatedEnvironmentTwice() throws Exception { + int port = findValidPort(); + String selfNodeName = Env.genFeNodeName("127.0.0.1", port, false); + String selfNodeHostPort = "127.0.0.1:" + port; + + File homeFile = new File(createTmpDir()); + BDBEnvironment bdbEnvironment = new BDBEnvironment(true, false); + bdbEnvironment.setup(homeFile, selfNodeName, selfNodeHostPort, selfNodeHostPort); + + String dbName = "testMetadataRecovery"; + Database db = bdbEnvironment.openDatabase(dbName); + DatabaseEntry key = new DatabaseEntry(randomBytes()); + DatabaseEntry value = new DatabaseEntry(randomBytes()); + + Assertions.assertEquals(OperationStatus.SUCCESS, db.put(null, key, value)); + + DatabaseEntry readValue = new DatabaseEntry(); + Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key, readValue, LockMode.READ_COMMITTED)); + Assertions.assertEquals(new String(value.getData()), new String(readValue.getData())); + bdbEnvironment.close(); + + bdbEnvironment.openReplicatedEnvironment(homeFile); + bdbEnvironment.openReplicatedEnvironment(homeFile); + Database db2 = bdbEnvironment.openDatabase(dbName); + DatabaseEntry readValue2 = new DatabaseEntry(); + Assertions.assertEquals(OperationStatus.SUCCESS, db2.get(null, key, readValue2, LockMode.READ_COMMITTED)); + Assertions.assertEquals(new String(value.getData()), new String(readValue2.getData())); + bdbEnvironment.close(); + } + /** * Test build a BDBEnvironment cluster (1 master + 2 follower + 1 observer) * @throws Exception */ - @Test + // @Test + @RepeatedTest(1) public void testCluster() throws Exception { int masterPort = findValidPort(); String masterNodeName = Env.genFeNodeName("127.0.0.1", masterPort, false); String masterNodeHostPort = "127.0.0.1:" + masterPort; LOG.debug("masterNodeName:{}, masterNodeHostPort:{}", masterNodeName, masterNodeHostPort); - BDBEnvironment masterEnvironment = new BDBEnvironment(); - File masterDir = createTmpDir(); - masterEnvironment.setup(masterDir, masterNodeName, masterNodeHostPort, masterNodeHostPort, true); + BDBEnvironment masterEnvironment = new BDBEnvironment(true, false); + File masterDir = new File(createTmpDir()); + masterEnvironment.setup(masterDir, masterNodeName, masterNodeHostPort, masterNodeHostPort); List followerEnvironments = new ArrayList<>(); List followerDirs = new ArrayList<>(); @@ -155,10 +294,10 @@ public class BDBEnvironmentTest { String followerNodeHostPort = "127.0.0.1:" + followerPort; LOG.debug("followerNodeName{}:{}, followerNodeHostPort{}:{}", i, i, followerNodeName, followerNodeHostPort); - BDBEnvironment followerEnvironment = new BDBEnvironment(); - File followerDir = createTmpDir(); + BDBEnvironment followerEnvironment = new BDBEnvironment(true, false); + File followerDir = new File(createTmpDir()); followerDirs.add(followerDir); - followerEnvironment.setup(followerDir, followerNodeName, followerNodeHostPort, masterNodeHostPort, true); + followerEnvironment.setup(followerDir, followerNodeName, followerNodeHostPort, masterNodeHostPort); followerEnvironments.add(followerEnvironment); } @@ -167,9 +306,9 @@ public class BDBEnvironmentTest { String observerNodeHostPort = "127.0.0.1:" + observerPort; LOG.debug("observerNodeName:{}, observerNodeHostPort:{}", observerNodeName, observerNodeHostPort); - BDBEnvironment observerEnvironment = new BDBEnvironment(); - File observerDir = createTmpDir(); - observerEnvironment.setup(observerDir, observerNodeName, observerNodeHostPort, masterNodeHostPort, false); + BDBEnvironment observerEnvironment = new BDBEnvironment(false, false); + File observerDir = new File(createTmpDir()); + observerEnvironment.setup(observerDir, observerNodeName, observerNodeHostPort, masterNodeHostPort); String dbName = "1234"; Database masterDb = masterEnvironment.openDatabase(dbName); @@ -207,4 +346,217 @@ public class BDBEnvironmentTest { environment.close(); }); masterEnvironment.close(); } + + class NodeInfo { + public String name; + public String hostPort; + public String dir; + + NodeInfo(String name, String hostPort, String dir) { + this.name = name; + this.hostPort = hostPort; + this.dir = dir; + } + } + + private Pair findMaster(List> followersInfo) + throws Exception { + NodeInfo masterNode = null; + BDBEnvironment masterEnvironment = null; + boolean electionSuccess = true; + for (int i = 0; i < 10; i++) { + electionSuccess = true; + for (Pair entryPair : followersInfo) { + if (entryPair.first.getReplicatedEnvironment().getState() + .equals(ReplicatedEnvironment.State.MASTER)) { + masterEnvironment = entryPair.first; + masterNode = entryPair.second; + } + if (!entryPair.first.getReplicatedEnvironment().getState() + .equals(ReplicatedEnvironment.State.MASTER) + && !entryPair.first.getReplicatedEnvironment().getState() + .equals(ReplicatedEnvironment.State.REPLICA)) { + electionSuccess = false; + } + } + if (!electionSuccess) { + Thread.sleep(1000); + } + } + Assertions.assertTrue(electionSuccess); + Assertions.assertNotNull(masterNode); + Assertions.assertNotNull(masterEnvironment); + return Pair.of(masterEnvironment, masterNode); + } + + // @Test + @RepeatedTest(1) + public void testRollbackException() throws Exception { + LOG.info("start"); + List> followersInfo = new ArrayList<>(); + + int masterPort = findValidPort(); + String masterNodeName = "fe1"; + String masterNodeHostPort = "127.0.0.1:" + masterPort; + + BDBEnvironment masterEnvironment = new BDBEnvironment(true, false); + String masterDir = createTmpDir(); + masterEnvironment.setup(new File(masterDir), masterNodeName, masterNodeHostPort, masterNodeHostPort); + followersInfo.add(Pair.of(masterEnvironment, new NodeInfo(masterNodeName, masterNodeHostPort, masterDir))); + + for (int i = 2; i <= 3; i++) { + int nodePort = findValidPort(); + String nodeName = "fe" + i; + String nodeHostPort = "127.0.0.1:" + nodePort; + + BDBEnvironment followerEnvironment = new BDBEnvironment(true, false); + String nodeDir = createTmpDir(); + followerEnvironment.setup(new File(nodeDir), nodeName, nodeHostPort, masterNodeHostPort); + followersInfo.add(Pair.of(followerEnvironment, new NodeInfo(nodeName, nodeHostPort, nodeDir))); + } + + Pair masterPair = findMaster(followersInfo); + String beginDbName = String.valueOf(0L); + Database masterDb = masterPair.first.openDatabase(beginDbName); + DatabaseEntry key = new DatabaseEntry(randomBytes()); + DatabaseEntry value = new DatabaseEntry(randomBytes()); + Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, key, value)); + Assertions.assertEquals(1, masterEnvironment.getDatabaseNames().size()); + LOG.info("master is {} | {}", masterPair.second.name, masterPair.second.dir); + + for (Pair entryPair : followersInfo) { + if (entryPair.second.dir.equals(masterPair.second.dir)) { + LOG.info("skip {}", entryPair.second.name); + return; + } + + Assertions.assertEquals(1, entryPair.first.getDatabaseNames().size()); + Database followerDb = entryPair.first.openDatabase(beginDbName); + DatabaseEntry readValue = new DatabaseEntry(); + Assertions.assertEquals(OperationStatus.SUCCESS, followerDb.get(null, key, readValue, LockMode.READ_COMMITTED)); + Assertions.assertEquals(new String(value.getData()), new String(readValue.getData())); + followerDb.close(); + } + + masterDb.close(); + masterEnvironment.getEpochDB().close(); + + followersInfo.stream().forEach(entryPair -> { + entryPair.first.close(); + LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir); + }); + + // all follower closed + for (Pair entryPair : followersInfo) { + String followerCopyDir = entryPair.second.dir + "_copy"; + LOG.info("Copy from {} to {}", entryPair.second.dir, followerCopyDir); + FileUtils.copyDirectory(new File(entryPair.second.dir), new File(followerCopyDir)); + } + + followersInfo.stream().forEach(entryPair -> { + entryPair.first.openReplicatedEnvironment(new File(entryPair.second.dir)); + LOG.info("open {} | {}", entryPair.second.name, entryPair.second.dir); + }); + + masterPair = findMaster(followersInfo); + + masterDb = masterPair.first.openDatabase(String.valueOf(1L)); + for (int i = 0; i < 2 * Config.txn_rollback_limit + 10; i++) { + // for (int i = 0; i < 10; i++) { + OperationStatus status = masterDb.put(null, new DatabaseEntry(randomBytes()), new DatabaseEntry(randomBytes())); + Assertions.assertEquals(OperationStatus.SUCCESS, status); + } + Assertions.assertEquals(2, masterPair.first.getDatabaseNames().size()); + Assertions.assertEquals(0, masterPair.first.getDatabaseNames().get(0)); + Assertions.assertEquals(1, masterPair.first.getDatabaseNames().get(1)); + + followersInfo.stream().forEach(entryPair -> { + entryPair.first.close(); + LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir); + }); + + // Restore follower's (not new master) bdbje dir + for (Pair entryPair : followersInfo) { + if (entryPair.second.dir.equals(masterDir)) { + String masterCopyDir = entryPair.second.dir + "_copy"; + FileUtils.deleteDirectory(new File(masterCopyDir)); + continue; + } + LOG.info("Delete followerDir {} ", entryPair.second.dir); + FileUtils.deleteDirectory(new File(entryPair.second.dir)); + // FileUtils.moveDirectory(new File(entryPair.second.dir), new File(entryPair.second.dir + "_copy2")); + String followerCopyDir = entryPair.second.dir + "_copy"; + LOG.info("Move {} to {}", followerCopyDir, entryPair.second.dir); + FileUtils.moveDirectory(new File(followerCopyDir), new File(entryPair.second.dir)); + } + + Thread.sleep(1000); + for (Pair entryPair : followersInfo) { + if (entryPair.second.dir.equals(masterPair.second.dir)) { + LOG.info("skip open {} | {}", entryPair.second.name, entryPair.second.dir); + continue; + } + entryPair.first.openReplicatedEnvironment(new File(entryPair.second.dir)); + LOG.info("open {} | {}", entryPair.second.name, entryPair.second.dir); + } + + BDBEnvironment newMasterEnvironment = null; + boolean found = false; + for (int i = 0; i < 300; i++) { + for (Pair entryPair : followersInfo) { + if (entryPair.second.dir.equals(masterPair.second.dir)) { + continue; + } + + LOG.info("name:{} state:{} dir:{}", entryPair.first.getReplicatedEnvironment().getNodeName(), + entryPair.first.getReplicatedEnvironment().getState(), + entryPair.second.dir); + if (entryPair.first.getReplicatedEnvironment().getState().equals(ReplicatedEnvironment.State.MASTER)) { + newMasterEnvironment = entryPair.first; + found = true; + break; + } + } + if (found) { + break; + } + Thread.sleep(1000); + } + Assertions.assertNotNull(newMasterEnvironment); + + masterDb = newMasterEnvironment.openDatabase(beginDbName); + Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, new DatabaseEntry(randomBytes()), new DatabaseEntry(randomBytes()))); + Assertions.assertEquals(1, newMasterEnvironment.getDatabaseNames().size()); + // // old master + masterEnvironment.openReplicatedEnvironment(new File(masterDir)); + followersInfo.stream().forEach(entryPair -> { + entryPair.first.close(); + LOG.info("close {} | {}", entryPair.second.name, entryPair.second.dir); + }); + LOG.info("end"); + } + + @RepeatedTest(1) + public void testGetSyncPolicy() throws Exception { + Assertions.assertEquals(Durability.SyncPolicy.NO_SYNC, + Deencapsulation.invoke(BDBEnvironment.class, "getSyncPolicy", "NO_SYNC")); + + Assertions.assertEquals(Durability.SyncPolicy.SYNC, + Deencapsulation.invoke(BDBEnvironment.class, "getSyncPolicy", "SYNC")); + + Assertions.assertEquals(Durability.SyncPolicy.WRITE_NO_SYNC, + Deencapsulation.invoke(BDBEnvironment.class, "getSyncPolicy", "default")); + } + + @RepeatedTest(1) + public void testGetAckPolicy() throws Exception { + Assertions.assertEquals(Durability.ReplicaAckPolicy.ALL, + Deencapsulation.invoke(BDBEnvironment.class, "getAckPolicy", "ALL")); + + Assertions.assertEquals(Durability.ReplicaAckPolicy.NONE, + Deencapsulation.invoke(BDBEnvironment.class, "getAckPolicy", "NONE")); + + Assertions.assertEquals(Durability.ReplicaAckPolicy.SIMPLE_MAJORITY, + Deencapsulation.invoke(BDBEnvironment.class, "getAckPolicy", "default")); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java index 93522a5308..ba81d6697b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java @@ -36,8 +36,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -// import org.junit.jupiter.api.RepeatedTest; only for debug +import org.junit.jupiter.api.RepeatedTest; import java.io.DataOutput; import java.io.File; @@ -74,7 +73,7 @@ public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS LINE: BDBJE should use @AfterAll public static void cleanUp() throws Exception { for (File dir : tmpDirs) { - LOG.info("deleteTmpDir path {}", dir.getAbsolutePath()); + LOG.debug("deleteTmpDir path {}", dir.getAbsolutePath()); FileUtils.deleteDirectory(dir); } } @@ -98,8 +97,7 @@ public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS LINE: BDBJE should use return port; } - // @RepeatedTest(100) only for debug - @Test + @RepeatedTest(1) public void testNormal() throws Exception { int port = findValidPort(); Preconditions.checkArgument(((port > 0) && (port < 65535))); @@ -197,15 +195,17 @@ public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS LINE: BDBJE should use Assertions.assertEquals(5, journal.getDatabaseNames().size()); Assertions.assertEquals(41, journal.getDatabaseNames().get(4)); - JournalCursor cursor = journal.read(1, 50); + JournalCursor cursor = journal.read(1, 51); Assertions.assertNotNull(cursor); - for (int i = 1; i < 50; i++) { + for (int i = 0; i < 50; i++) { Pair kv = cursor.next(); Assertions.assertNotNull(kv); JournalEntity entity = kv.second; Assertions.assertEquals(OperationType.OP_TIMESTAMP, entity.getOpCode()); } + Assertions.assertEquals(null, cursor.next()); + journal.close(); Assertions.assertEquals(null, journal.getBDBEnvironment()); @@ -223,7 +223,7 @@ public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS LINE: BDBJE should use Assertions.assertEquals(ReplicatedEnvironment.State.MASTER, journal.getBDBEnvironment().getReplicatedEnvironment().getState()); journal.deleteJournals(21); - LOG.info("journal.getDatabaseNames(): {}", journal.getDatabaseNames()); + LOG.debug("journal.getDatabaseNames(): {}", journal.getDatabaseNames()); Assertions.assertEquals(3, journal.getDatabaseNames().size()); Assertions.assertEquals(21, journal.getDatabaseNames().get(0)); journal.close(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJournalCursorTest.java b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJournalCursorTest.java new file mode 100644 index 0000000000..d68d616574 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJournalCursorTest.java @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.journal.bdbje; + +import org.apache.doris.catalog.Env; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.sleepycat.je.Database; +import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.RepeatedTest; + +import java.io.File; +import java.io.IOException; +import java.net.DatagramSocket; +import java.net.ServerSocket; +import java.net.SocketException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +public class BDBJournalCursorTest { + private static final Logger LOG = LogManager.getLogger(BDBEnvironmentTest.class); + private static List tmpDirs = new ArrayList<>(); + + public static String createTmpDir() throws Exception { + String dorisHome = System.getenv("DORIS_HOME"); + if (Strings.isNullOrEmpty(dorisHome)) { + dorisHome = Files.createTempDirectory("DORIS_HOME").toAbsolutePath().toString(); + } + Preconditions.checkArgument(!Strings.isNullOrEmpty(dorisHome)); + Path mockDir = Paths.get(dorisHome, "fe", "mocked"); + if (!Files.exists(mockDir)) { + Files.createDirectories(mockDir); + } + UUID uuid = UUID.randomUUID(); + File dir = Files.createDirectories(Paths.get(dorisHome, "fe", "mocked", "BDBEnvironmentTest-" + uuid.toString())).toFile(); + LOG.debug("createTmpDir path {}", dir.getAbsolutePath()); + tmpDirs.add(dir.getAbsolutePath()); + return dir.getAbsolutePath(); + } + + @AfterAll + public static void cleanUp() throws Exception { + for (String dir : tmpDirs) { + LOG.debug("deleteTmpDir path {}", dir); + FileUtils.deleteDirectory(new File(dir)); + } + } + + private int findValidPort() { + int port = 0; + for (int i = 0; i < 65535; i++) { + try (ServerSocket socket = new ServerSocket(0)) { + socket.setReuseAddress(true); + port = socket.getLocalPort(); + try (DatagramSocket datagramSocket = new DatagramSocket(port)) { + datagramSocket.setReuseAddress(true); + break; + } catch (SocketException e) { + LOG.info("The port {} is invalid and try another port", port); + } + } catch (IOException e) { + throw new IllegalStateException("Could not find a free TCP/IP port"); + } + } + Preconditions.checkArgument(((port > 0) && (port < 65536))); + return port; + } + + @RepeatedTest(1) + public void testNormal() throws Exception { + Assertions.assertTrue(BDBJournalCursor.getJournalCursor(null, -1, 20) == null); + Assertions.assertTrue(BDBJournalCursor.getJournalCursor(null, 21, 20) == null); + + int port = findValidPort(); + String selfNodeName = Env.genFeNodeName("127.0.0.1", port, false); + String selfNodeHostPort = "127.0.0.1:" + port; + LOG.debug("selfNodeName:{}, selfNodeHostPort:{}", selfNodeName, selfNodeHostPort); + + BDBEnvironment bdbEnvironment = new BDBEnvironment(true, false); + bdbEnvironment.setup(new File(createTmpDir()), selfNodeName, selfNodeHostPort, selfNodeHostPort); + + Database db = bdbEnvironment.openDatabase("1"); + db.close(); + + BDBJournalCursor bdbJournalCursor = BDBJournalCursor.getJournalCursor(bdbEnvironment, 1, 10); + Assertions.assertTrue(bdbJournalCursor != null); + Assertions.assertTrue(bdbJournalCursor.next() == null); + + bdbEnvironment.close(); + + bdbJournalCursor = BDBJournalCursor.getJournalCursor(bdbEnvironment, 1, 10); + Assertions.assertTrue(bdbJournalCursor == null); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/bdb/BDBToolOptionsTest.java b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBToolOptionsTest.java similarity index 93% rename from fe/fe-core/src/test/java/org/apache/doris/bdb/BDBToolOptionsTest.java rename to fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBToolOptionsTest.java index d6d93fa133..426f5e3e72 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/bdb/BDBToolOptionsTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBToolOptionsTest.java @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.bdb; +package org.apache.doris.journal.bdbje; import org.apache.doris.common.FeConstants; -import org.apache.doris.journal.bdbje.BDBToolOptions; import org.junit.Assert; import org.junit.Test; @@ -36,6 +35,7 @@ public class BDBToolOptionsTest { Assert.assertTrue(options.hasFromKey()); Assert.assertTrue(options.hasEndKey()); Assert.assertNotSame(FeConstants.meta_version, options.getMetaVersion()); + Assert.assertTrue(options.toString().contains("12345")); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/bdb/BDBToolTest.java b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBToolTest.java similarity index 97% rename from fe/fe-core/src/test/java/org/apache/doris/bdb/BDBToolTest.java rename to fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBToolTest.java index f9693fcd70..5940935a31 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/bdb/BDBToolTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBToolTest.java @@ -15,12 +15,10 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.bdb; +package org.apache.doris.journal.bdbje; import org.apache.doris.common.io.DataOutputBuffer; import org.apache.doris.journal.JournalEntity; -import org.apache.doris.journal.bdbje.BDBTool; -import org.apache.doris.journal.bdbje.BDBToolOptions; import org.apache.doris.persist.OperationType; import org.apache.doris.persist.ReplicaPersistInfo; diff --git a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/TimestampTest.java b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/TimestampTest.java new file mode 100644 index 0000000000..36c4bcacb5 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/TimestampTest.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.journal.bdbje; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.RepeatedTest; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +public class TimestampTest { + private static final Logger LOG = LogManager.getLogger(TimestampTest.class); + private static List testFiles = new ArrayList<>(); + + public static String createTestFile() throws Exception { + String dorisHome = System.getenv("DORIS_HOME"); + if (Strings.isNullOrEmpty(dorisHome)) { + dorisHome = Files.createTempDirectory("DORIS_HOME").toAbsolutePath().toString(); + } + Preconditions.checkArgument(!Strings.isNullOrEmpty(dorisHome)); + Path mockDir = Paths.get(dorisHome, "fe", "mocked"); + if (!Files.exists(mockDir)) { + Files.createDirectories(mockDir); + } + UUID uuid = UUID.randomUUID(); + File testFile = Files.createFile(Paths.get(dorisHome, "fe", "mocked", "TimestampTest-" + uuid.toString())).toFile(); + LOG.debug("createTmpFile path {}", testFile.getAbsolutePath()); + testFiles.add(testFile.getAbsolutePath()); + return testFile.getAbsolutePath(); + } + + @AfterAll + public static void cleanUp() throws Exception { + for (String testFile : testFiles) { + LOG.info("delete testFile path {}", testFile); + Files.deleteIfExists(Paths.get(testFile)); + } + } + + // @Test + @RepeatedTest(1) + public void testSerialization() throws Exception { + Timestamp timestamp = new Timestamp(); + long ts = timestamp.getTimestamp(); + Assertions.assertTrue(ts > 0); + + File testFile = new File(createTestFile()); + DataOutputStream out = new DataOutputStream(new FileOutputStream(testFile)); + timestamp.write(out); + out.flush(); + out.close(); + + DataInputStream in = new DataInputStream(new FileInputStream(testFile)); + Thread.sleep(1000); + Timestamp timestamp2 = new Timestamp(); + timestamp2.readFields(in); + + Assertions.assertEquals(ts, timestamp2.getTimestamp()); + Assertions.assertEquals("" + ts, timestamp2.toString()); + } +}