diff --git a/be/src/util/debug_points.h b/be/src/util/debug_points.h index 47b3aaa9cb..1106a548f8 100644 --- a/be/src/util/debug_points.h +++ b/be/src/util/debug_points.h @@ -26,7 +26,9 @@ #include "common/compiler_util.h" #include "common/config.h" +#include "fmt/format.h" +// more usage can see 'util/debug_points_test.cpp' #define DBUG_EXECUTE_IF(debug_point_name, code) \ if (UNLIKELY(config::enable_debug_points)) { \ auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ @@ -44,7 +46,7 @@ struct DebugPoint { std::map params; - template + template T param(const std::string& key, T default_value = T()) { auto it = params.find(key); if (it == params.end()) { @@ -60,25 +62,50 @@ struct DebugPoint { return boost::lexical_cast(it->second); } else if constexpr (std::is_arithmetic_v) { return boost::lexical_cast(it->second); + } else if constexpr (std::is_same_v) { + return it->second.c_str(); } else { static_assert(std::is_same_v); return it->second; } } - - std::string param(const std::string& key, const char* default_value) { - return param(key, std::string(default_value)); - } }; class DebugPoints { public: bool is_enable(const std::string& name); std::shared_ptr get_debug_point(const std::string& name); - void add(const std::string& name, std::shared_ptr debug_point); void remove(const std::string& name); void clear(); + // if not enable debug point or its params not contains `key`, then return `default_value` + // url: /api/debug_point/add/name?k1=v1&k2=v2&... + template + T get_debug_param_or_default(const std::string& name, const std::string& key, + const T& default_value) { + auto debug_point = get_debug_point(name); + return debug_point ? debug_point->param(key, default_value) : default_value; + } + + // url: /api/debug_point/add/name?value=v + template + T get_debug_param_or_default(const std::string& name, const T& default_value) { + return get_debug_param_or_default(name, "value", default_value); + } + + void add(const std::string& name, std::shared_ptr debug_point); + + // more 'add' functions for convenient use + void add(const std::string& name) { add(name, std::make_shared()); } + void add_with_params(const std::string& name, + const std::map& params) { + add(name, std::shared_ptr(new DebugPoint {.params = params})); + } + template + void add_with_value(const std::string& name, const T& value) { + add_with_params(name, {{"value", fmt::format("{}", value)}}); + } + static DebugPoints* instance(); private: diff --git a/be/test/util/debug_points_test.cpp b/be/test/util/debug_points_test.cpp index df86cf0d1b..76c4fd0078 100644 --- a/be/test/util/debug_points_test.cpp +++ b/be/test/util/debug_points_test.cpp @@ -60,7 +60,7 @@ TEST(DebugPointsTest, BaseTest) { DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1, dp->param("v1", 100))); DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param("v2"))); DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param("v2", std::string()))); - DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param("v2", "b"))); + DBUG_EXECUTE_IF("dbug5", EXPECT_STREQ("a", dp->param("v2", "b"))); DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param("v3"))); DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param("v3", 0.0))); DBUG_EXECUTE_IF("dbug5", EXPECT_TRUE(dp->param("v4", false))); @@ -68,7 +68,32 @@ TEST(DebugPointsTest, BaseTest) { DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param("v_not_exist"))); DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param("v_not_exist", 0L))); DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(123, dp->param("v_not_exist", 123))); - DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("abcd", dp->param("v_not_exist", "abcd"))); + DBUG_EXECUTE_IF("dbug5", EXPECT_STREQ("abcd", dp->param("v_not_exist", "abcd"))); + + EXPECT_EQ(1.2, DebugPoints::instance()->get_debug_param_or_default("dbug5", "v3", 0.0)); + EXPECT_EQ(100, + DebugPoints::instance()->get_debug_param_or_default("point_not_exists", "k", 100)); + + POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/dbug6?value=567"); + EXPECT_EQ(567, DebugPoints::instance()->get_debug_param_or_default("dbug6", 0)); +} + +TEST(DebugPointsTest, AddTest) { + config::enable_debug_points = true; + DebugPoints::instance()->clear(); + + DebugPoints::instance()->add("dbug1"); + EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug1")); + + DebugPoints::instance()->add_with_params("dbug2", {{"k1", "100"}}); + EXPECT_EQ(100, DebugPoints::instance()->get_debug_param_or_default("dbug2", "k1", 0)); + + DebugPoints::instance()->add_with_value("dbug3", 567); + EXPECT_EQ(567, DebugPoints::instance()->get_debug_param_or_default("dbug3", 567)); + + DebugPoints::instance()->add_with_value("dbug4", "hello"); + EXPECT_EQ("hello", + DebugPoints::instance()->get_debug_param_or_default("dbug4", "")); } } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java index 413a3b129f..158f2cde4a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/LoadStatisticForTag.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.clone.BackendLoadStatistic.Classification; import org.apache.doris.clone.BackendLoadStatistic.LoadScore; import org.apache.doris.common.Config; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.resource.Tag; import org.apache.doris.system.Backend; @@ -186,35 +187,58 @@ public class LoadStatisticForTag { int lowCounter = 0; int midCounter = 0; int highCounter = 0; + + long debugHighBeId = DebugPointUtil.getDebugParamOrDefault("FE.HIGH_LOAD_BE_ID", -1L); + if (debugHighBeId > 0) { + final long targetBeId = debugHighBeId; // debugHighBeId can not put in lambda cause it's updated later + if (!beLoadStatistics.stream().anyMatch(it -> it.getBeId() == targetBeId)) { + debugHighBeId = -1L; + } + } + for (BackendLoadStatistic beStat : beLoadStatistics) { if (!beStat.hasMedium(medium)) { continue; } - - if (Config.be_rebalancer_fuzzy_test) { + Classification clazz = Classification.MID; + if (debugHighBeId > 0) { + if (beStat.getBeId() == debugHighBeId) { + clazz = Classification.HIGH; + } else { + clazz = Classification.LOW; + } + } else if (Config.be_rebalancer_fuzzy_test) { if (beStat.getLoadScore(medium) > avgLoadScore) { - beStat.setClazz(medium, Classification.HIGH); - highCounter++; + clazz = Classification.HIGH; } else if (beStat.getLoadScore(medium) < avgLoadScore) { - beStat.setClazz(medium, Classification.LOW); - lowCounter++; + clazz = Classification.LOW; } } else { if (Math.abs(beStat.getLoadScore(medium) - avgLoadScore) / avgLoadScore > Config.balance_load_score_threshold) { if (beStat.getLoadScore(medium) > avgLoadScore) { - beStat.setClazz(medium, Classification.HIGH); - highCounter++; + clazz = Classification.HIGH; } else if (beStat.getLoadScore(medium) < avgLoadScore) { - beStat.setClazz(medium, Classification.LOW); - lowCounter++; + clazz = Classification.LOW; } - } else { - beStat.setClazz(medium, Classification.MID); - midCounter++; } } + + beStat.setClazz(medium, clazz); + switch (clazz) { + case HIGH: + highCounter++; + break; + case LOW: + lowCounter++; + break; + case MID: + midCounter++; + break; + default: + break; + } } LOG.debug("classify backend by load. medium: {} avg load score: {}. low/mid/high: {}/{}/{}", @@ -265,6 +289,11 @@ public class LoadStatisticForTag { return false; } + long debugHighBeId = DebugPointUtil.getDebugParamOrDefault("FE.HIGH_LOAD_BE_ID", -1L); + if (srcBeStat.getBeId() == debugHighBeId) { + return true; + } + currentSrcBeScore = srcBeStat.getLoadScore(medium); currentDestBeScore = destBeStat.getLoadScore(medium); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index c57bdc7762..11f338f122 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -666,6 +666,7 @@ public class TabletSchedCtx implements Comparable { List decommissionCand = Lists.newArrayList(); List colocateCand = Lists.newArrayList(); List notColocateCand = Lists.newArrayList(); + List furtherRepairs = Lists.newArrayList(); for (Replica replica : tablet.getReplicas()) { if (replica.isBad()) { LOG.debug("replica {} is bad, skip. tablet: {}", @@ -688,6 +689,11 @@ public class TabletSchedCtx implements Comparable { if (replica.getLastFailedVersion() <= 0 && replica.getVersion() >= visibleVersion) { + + if (tabletStatus == TabletStatus.NEED_FURTHER_REPAIR && replica.needFurtherRepair()) { + furtherRepairs.add(replica); + } + // skip healthy replica LOG.debug("replica {} version {} is healthy, visible version {}, replica state {}, skip. tablet: {}", replica.getId(), replica.getVersion(), visibleVersion, replica.getState(), tabletId); @@ -709,8 +715,24 @@ public class TabletSchedCtx implements Comparable { } else { candidates = decommissionCand; } + if (candidates.isEmpty()) { - throw new SchedException(Status.UNRECOVERABLE, "unable to choose dest replica"); + if (furtherRepairs.isEmpty()) { + throw new SchedException(Status.UNRECOVERABLE, "unable to choose dest replica"); + } + + boolean allCatchup = true; + for (Replica replica : furtherRepairs) { + if (checkFurthurRepairFinish(replica, visibleVersion)) { + replica.setNeedFurtherRepair(false); + replica.setFurtherRepairWatermarkTxnTd(-1); + } else { + allCatchup = false; + } + } + + throw new SchedException(Status.FINISHED, + allCatchup ? "further repair all catchup" : "further repair waiting catchup"); } Replica chosenReplica = null; @@ -782,6 +804,32 @@ public class TabletSchedCtx implements Comparable { setDest(chosenReplica.getBackendId(), chosenReplica.getPathHash()); } + private boolean checkFurthurRepairFinish(Replica replica, long version) { + if (replica.getVersion() < version || replica.getLastFailedVersion() > 0) { + return false; + } + + long furtherRepairWatermarkTxnTd = replica.getFurtherRepairWatermarkTxnTd(); + if (furtherRepairWatermarkTxnTd < 0) { + return true; + } + + try { + if (Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished( + furtherRepairWatermarkTxnTd, dbId, tblId, partitionId)) { + LOG.info("replica {} of tablet {} has catchup with further repair watermark id {}", + replica, tabletId, furtherRepairWatermarkTxnTd); + return true; + } else { + return false; + } + } catch (Exception e) { + LOG.warn("replica {} of tablet {} check catchup with further repair watermark id {} failed", + replica, tabletId, furtherRepairWatermarkTxnTd, e); + return true; + } + } + public void releaseResource(TabletScheduler tabletScheduler) { releaseResource(tabletScheduler, false); } @@ -1104,25 +1152,7 @@ public class TabletSchedCtx implements Comparable { // change from prepare to committed or visible, this replica will be fall behind and be removed // in REDUNDANT detection. // - boolean isCatchup = false; - if (replica.getVersion() >= partition.getVisibleVersion() && replica.getLastFailedVersion() < 0) { - long furtherRepairWatermarkTxnTd = replica.getFurtherRepairWatermarkTxnTd(); - if (furtherRepairWatermarkTxnTd < 0) { - isCatchup = true; - } else { - try { - if (Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished( - furtherRepairWatermarkTxnTd, dbId, tblId, partitionId)) { - isCatchup = true; - LOG.info("new replica {} of tablet {} has catchup with further repair watermark id {}", - replica, tabletId, furtherRepairWatermarkTxnTd); - } - } catch (Exception e) { - isCatchup = true; - } - } - } - + boolean isCatchup = checkFurthurRepairFinish(replica, partition.getVisibleVersion()); replica.incrFurtherRepairCount(); if (isCatchup || replica.getLeftFurtherRepairCount() <= 0) { replica.setNeedFurtherRepair(false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 572199f941..1d4592501f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -46,6 +46,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.persist.ReplicaPersistInfo; import org.apache.doris.resource.Tag; @@ -983,6 +984,7 @@ public class TabletScheduler extends MasterDaemon { boolean force, LoadStatisticForTag statistic) throws SchedException { Replica chosenReplica = null; double maxScore = 0; + long debugHighBeId = DebugPointUtil.getDebugParamOrDefault("FE.HIGH_LOAD_BE_ID", -1L); for (Replica replica : replicas) { BackendLoadStatistic beStatistic = statistic.getBackendLoadStatistic(replica.getBackendId()); if (beStatistic == null) { @@ -1007,6 +1009,11 @@ public class TabletScheduler extends MasterDaemon { maxScore = loadScore; chosenReplica = replica; } + + if (debugHighBeId > 0 && replica.getBackendId() == debugHighBeId) { + chosenReplica = replica; + break; + } } if (chosenReplica != null) { @@ -1535,6 +1542,16 @@ public class TabletScheduler extends MasterDaemon { statusPair.second = tabletCtx.getPriority(); } } + + if (statusPair.first == TabletStatus.NEED_FURTHER_REPAIR) { + // replica is just waiting for finishing txns before furtherRepairWatermarkTxnTd, + // no need to add it immediately + Replica replica = tablet.getReplicaByBackendId(tabletCtx.getDestBackendId()); + if (replica != null && replica.getVersion() >= partition.getVisibleVersion() + && replica.getLastFailedVersion() < 0) { + return; + } + } } finally { tbl.readUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java index d9cedb2d53..da06232f0c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java @@ -30,6 +30,9 @@ import java.util.concurrent.atomic.AtomicInteger; /** * Use for manage debug points. + * + * usage example can see DebugPointUtilTest.java + * **/ public class DebugPointUtil { private static final Logger LOG = LogManager.getLogger(DebugPointUtil.class); @@ -109,11 +112,34 @@ public class DebugPointUtil { return debugPoint; } + // if not enable debug point or its params not contains `key`, then return `defaultValue` + // url: /api/debug_point/add/name?k1=v1&k2=v2&... + public static E getDebugParamOrDefault(String debugPointName, String key, E defaultValue) { + DebugPoint debugPoint = getDebugPoint(debugPointName); + + return debugPoint != null ? debugPoint.param(key, defaultValue) : defaultValue; + } + + // url: /api/debug_point/add/name?value=v + public static E getDebugParamOrDefault(String debugPointName, E defaultValue) { + return getDebugParamOrDefault(debugPointName, "value", defaultValue); + } + public static void addDebugPoint(String name, DebugPoint debugPoint) { debugPoints.put(name, debugPoint); LOG.info("add debug point: name={}, params={}", name, debugPoint.params); } + public static void addDebugPoint(String name) { + addDebugPoint(name, new DebugPoint()); + } + + public static void addDebugPointWithValue(String name, E value) { + DebugPoint debugPoint = new DebugPoint(); + debugPoint.params.put("value", String.format("%s", value)); + addDebugPoint(name, debugPoint); + } + public static void removeDebugPoint(String name) { DebugPoint debugPoint = debugPoints.remove(name); LOG.info("remove debug point: name={}, exists={}", name, debugPoint != null); diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java index 5c6492b063..0a68885bf2 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java @@ -66,6 +66,12 @@ public class DebugPointUtilTest extends DorisHttpTestCase { Assert.assertTrue(debugPoint.param("v4", false)); Assert.assertFalse(debugPoint.param("v5", false)); Assert.assertEquals(123L, (long) debugPoint.param("v_no_exist", 123L)); + + Assert.assertEquals(1, (int) DebugPointUtil.getDebugParamOrDefault("dbug5", "v1", 0)); + Assert.assertEquals(100, (int) DebugPointUtil.getDebugParamOrDefault("point_not_exists", "v1", 100)); + + sendRequest("/api/debug_point/add/dbug6?value=100"); + Assert.assertEquals(100, (int) DebugPointUtil.getDebugParamOrDefault("dbug6", 0)); } private void sendRequest(String uri) throws Exception {