diff --git a/fe/src/com/baidu/palo/common/Config.java b/fe/src/com/baidu/palo/common/Config.java index c0c3c05705..3a75bbd23d 100644 --- a/fe/src/com/baidu/palo/common/Config.java +++ b/fe/src/com/baidu/palo/common/Config.java @@ -159,6 +159,13 @@ public class Config extends ConfigBase { */ @ConfField public static boolean ignore_meta_check = false; + /* + * Set the maximum acceptable clock skew between non-master FE to Master FE host. + * This value is checked whenever a non-master FE establishes a connection to master FE via BDBJE. + * The connection is abandoned if the clock skew is larger than this value. + */ + @ConfField public static long max_bdbje_clock_delta_ms = 5000; // 5s + /* * Fe http port * Currently, all FEs' http port must be same. diff --git a/fe/src/com/baidu/palo/deploy/DeployManager.java b/fe/src/com/baidu/palo/deploy/DeployManager.java index a70e877064..e5a54d959e 100644 --- a/fe/src/com/baidu/palo/deploy/DeployManager.java +++ b/fe/src/com/baidu/palo/deploy/DeployManager.java @@ -149,7 +149,7 @@ public class DeployManager extends Daemon { if (!Strings.isNullOrEmpty(brokerServiceGroup)) { LOG.info("Broker service group is found"); - hasObserverService = true; + hasBrokerService = true; } LOG.info("get electableFeServiceGroup: {}, observerFeServiceGroup: {}, backendServiceGroup: {}" diff --git a/fe/src/com/baidu/palo/journal/bdbje/BDBEnvironment.java b/fe/src/com/baidu/palo/journal/bdbje/BDBEnvironment.java index 7d13f2064c..70bbd4b590 100644 --- a/fe/src/com/baidu/palo/journal/bdbje/BDBEnvironment.java +++ b/fe/src/com/baidu/palo/journal/bdbje/BDBEnvironment.java @@ -15,43 +15,44 @@ package com.baidu.palo.journal.bdbje; -import com.baidu.palo.catalog.Catalog; -import com.baidu.palo.common.Config; -import com.baidu.palo.ha.BDBHA; -import com.baidu.palo.ha.BDBStateChangeListener; -import com.baidu.palo.ha.HAProtocol; -import com.sleepycat.je.Database; -import com.sleepycat.je.DatabaseConfig; -import com.sleepycat.je.DatabaseException; -import com.sleepycat.je.DatabaseNotFoundException; -import com.sleepycat.je.Durability; -import com.sleepycat.je.Durability.ReplicaAckPolicy; -import com.sleepycat.je.Durability.SyncPolicy; -import com.sleepycat.je.Environment; -import com.sleepycat.je.EnvironmentConfig; -import com.sleepycat.je.EnvironmentFailureException; -import com.sleepycat.je.rep.InsufficientLogException; -import com.sleepycat.je.rep.NetworkRestore; -import com.sleepycat.je.rep.NetworkRestoreConfig; -import com.sleepycat.je.rep.NoConsistencyRequiredPolicy; -import com.sleepycat.je.rep.NodeType; -import com.sleepycat.je.rep.ReplicatedEnvironment; -import com.sleepycat.je.rep.ReplicationConfig; -import com.sleepycat.je.rep.StateChangeListener; -import com.sleepycat.je.rep.util.DbResetRepGroup; -import com.sleepycat.je.rep.util.ReplicationGroupAdmin; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.File; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; +import com.baidu.palo.catalog.Catalog; +import com.baidu.palo.common.Config; +import com.baidu.palo.ha.BDBHA; +import com.baidu.palo.ha.BDBStateChangeListener; +import com.baidu.palo.ha.HAProtocol; + +import com.sleepycat.je.Database; +import com.sleepycat.je.DatabaseConfig; +import com.sleepycat.je.DatabaseException; +import com.sleepycat.je.DatabaseNotFoundException; +import com.sleepycat.je.Durability; +import com.sleepycat.je.Durability.ReplicaAckPolicy; +import com.sleepycat.je.Durability.SyncPolicy; +import com.sleepycat.je.Environment; +import com.sleepycat.je.EnvironmentConfig; +import com.sleepycat.je.EnvironmentFailureException; +import com.sleepycat.je.rep.InsufficientLogException; +import com.sleepycat.je.rep.NetworkRestore; +import com.sleepycat.je.rep.NetworkRestoreConfig; +import com.sleepycat.je.rep.NoConsistencyRequiredPolicy; +import com.sleepycat.je.rep.NodeType; +import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.ReplicationConfig; +import com.sleepycat.je.rep.StateChangeListener; +import com.sleepycat.je.rep.util.DbResetRepGroup; +import com.sleepycat.je.rep.util.ReplicationGroupAdmin; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.File; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; /* this class contains the reference to bdb environment. @@ -101,7 +102,9 @@ public class BDBEnvironment { replicationConfig.setNodeHostPort(selfNodeHostPort); replicationConfig.setHelperHosts(helperHostPort); replicationConfig.setGroupName(PALO_JOURNAL_GROUP); - replicationConfig.setConfigParam(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "10"); + replicationConfig.setConfigParam(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "10"); + replicationConfig.setMaxClockDelta(Config.max_bdbje_clock_delta_ms, TimeUnit.MILLISECONDS); + if (isElectable) { replicationConfig.setConsistencyPolicy(new NoConsistencyRequiredPolicy()); replicationConfig.setReplicaAckTimeout(2, TimeUnit.SECONDS);