[improvement](tablet clone) furthur repair replicas should be check even if they are versions catchup (#25551)
This commit is contained in:
@ -26,7 +26,9 @@
|
||||
|
||||
#include "common/compiler_util.h"
|
||||
#include "common/config.h"
|
||||
#include "fmt/format.h"
|
||||
|
||||
// more usage can see 'util/debug_points_test.cpp'
|
||||
#define DBUG_EXECUTE_IF(debug_point_name, code) \
|
||||
if (UNLIKELY(config::enable_debug_points)) { \
|
||||
auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
|
||||
@ -44,7 +46,7 @@ struct DebugPoint {
|
||||
|
||||
std::map<std::string, std::string> params;
|
||||
|
||||
template <typename T = int>
|
||||
template <typename T>
|
||||
T param(const std::string& key, T default_value = T()) {
|
||||
auto it = params.find(key);
|
||||
if (it == params.end()) {
|
||||
@ -60,25 +62,50 @@ struct DebugPoint {
|
||||
return boost::lexical_cast<T>(it->second);
|
||||
} else if constexpr (std::is_arithmetic_v<T>) {
|
||||
return boost::lexical_cast<T>(it->second);
|
||||
} else if constexpr (std::is_same_v<T, const char*>) {
|
||||
return it->second.c_str();
|
||||
} else {
|
||||
static_assert(std::is_same_v<T, std::string>);
|
||||
return it->second;
|
||||
}
|
||||
}
|
||||
|
||||
std::string param(const std::string& key, const char* default_value) {
|
||||
return param<std::string>(key, std::string(default_value));
|
||||
}
|
||||
};
|
||||
|
||||
class DebugPoints {
|
||||
public:
|
||||
bool is_enable(const std::string& name);
|
||||
std::shared_ptr<DebugPoint> get_debug_point(const std::string& name);
|
||||
void add(const std::string& name, std::shared_ptr<DebugPoint> debug_point);
|
||||
void remove(const std::string& name);
|
||||
void clear();
|
||||
|
||||
// if not enable debug point or its params not contains `key`, then return `default_value`
|
||||
// url: /api/debug_point/add/name?k1=v1&k2=v2&...
|
||||
template <typename T>
|
||||
T get_debug_param_or_default(const std::string& name, const std::string& key,
|
||||
const T& default_value) {
|
||||
auto debug_point = get_debug_point(name);
|
||||
return debug_point ? debug_point->param(key, default_value) : default_value;
|
||||
}
|
||||
|
||||
// url: /api/debug_point/add/name?value=v
|
||||
template <typename T>
|
||||
T get_debug_param_or_default(const std::string& name, const T& default_value) {
|
||||
return get_debug_param_or_default(name, "value", default_value);
|
||||
}
|
||||
|
||||
void add(const std::string& name, std::shared_ptr<DebugPoint> debug_point);
|
||||
|
||||
// more 'add' functions for convenient use
|
||||
void add(const std::string& name) { add(name, std::make_shared<DebugPoint>()); }
|
||||
void add_with_params(const std::string& name,
|
||||
const std::map<std::string, std::string>& params) {
|
||||
add(name, std::shared_ptr<DebugPoint>(new DebugPoint {.params = params}));
|
||||
}
|
||||
template <typename T>
|
||||
void add_with_value(const std::string& name, const T& value) {
|
||||
add_with_params(name, {{"value", fmt::format("{}", value)}});
|
||||
}
|
||||
|
||||
static DebugPoints* instance();
|
||||
|
||||
private:
|
||||
|
||||
@ -60,7 +60,7 @@ TEST(DebugPointsTest, BaseTest) {
|
||||
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1, dp->param<int>("v1", 100)));
|
||||
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param<std::string>("v2")));
|
||||
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param("v2", std::string())));
|
||||
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param("v2", "b")));
|
||||
DBUG_EXECUTE_IF("dbug5", EXPECT_STREQ("a", dp->param("v2", "b")));
|
||||
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param<double>("v3")));
|
||||
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param("v3", 0.0)));
|
||||
DBUG_EXECUTE_IF("dbug5", EXPECT_TRUE(dp->param("v4", false)));
|
||||
@ -68,7 +68,32 @@ TEST(DebugPointsTest, BaseTest) {
|
||||
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param<int64_t>("v_not_exist")));
|
||||
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param("v_not_exist", 0L)));
|
||||
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(123, dp->param("v_not_exist", 123)));
|
||||
DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("abcd", dp->param("v_not_exist", "abcd")));
|
||||
DBUG_EXECUTE_IF("dbug5", EXPECT_STREQ("abcd", dp->param("v_not_exist", "abcd")));
|
||||
|
||||
EXPECT_EQ(1.2, DebugPoints::instance()->get_debug_param_or_default("dbug5", "v3", 0.0));
|
||||
EXPECT_EQ(100,
|
||||
DebugPoints::instance()->get_debug_param_or_default("point_not_exists", "k", 100));
|
||||
|
||||
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/dbug6?value=567");
|
||||
EXPECT_EQ(567, DebugPoints::instance()->get_debug_param_or_default("dbug6", 0));
|
||||
}
|
||||
|
||||
TEST(DebugPointsTest, AddTest) {
|
||||
config::enable_debug_points = true;
|
||||
DebugPoints::instance()->clear();
|
||||
|
||||
DebugPoints::instance()->add("dbug1");
|
||||
EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug1"));
|
||||
|
||||
DebugPoints::instance()->add_with_params("dbug2", {{"k1", "100"}});
|
||||
EXPECT_EQ(100, DebugPoints::instance()->get_debug_param_or_default("dbug2", "k1", 0));
|
||||
|
||||
DebugPoints::instance()->add_with_value("dbug3", 567);
|
||||
EXPECT_EQ(567, DebugPoints::instance()->get_debug_param_or_default("dbug3", 567));
|
||||
|
||||
DebugPoints::instance()->add_with_value("dbug4", "hello");
|
||||
EXPECT_EQ("hello",
|
||||
DebugPoints::instance()->get_debug_param_or_default<std::string>("dbug4", ""));
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.catalog.TabletInvertedIndex;
|
||||
import org.apache.doris.clone.BackendLoadStatistic.Classification;
|
||||
import org.apache.doris.clone.BackendLoadStatistic.LoadScore;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.DebugPointUtil;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.resource.Tag;
|
||||
import org.apache.doris.system.Backend;
|
||||
@ -186,35 +187,58 @@ public class LoadStatisticForTag {
|
||||
int lowCounter = 0;
|
||||
int midCounter = 0;
|
||||
int highCounter = 0;
|
||||
|
||||
long debugHighBeId = DebugPointUtil.getDebugParamOrDefault("FE.HIGH_LOAD_BE_ID", -1L);
|
||||
if (debugHighBeId > 0) {
|
||||
final long targetBeId = debugHighBeId; // debugHighBeId can not put in lambda cause it's updated later
|
||||
if (!beLoadStatistics.stream().anyMatch(it -> it.getBeId() == targetBeId)) {
|
||||
debugHighBeId = -1L;
|
||||
}
|
||||
}
|
||||
|
||||
for (BackendLoadStatistic beStat : beLoadStatistics) {
|
||||
if (!beStat.hasMedium(medium)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
if (Config.be_rebalancer_fuzzy_test) {
|
||||
Classification clazz = Classification.MID;
|
||||
if (debugHighBeId > 0) {
|
||||
if (beStat.getBeId() == debugHighBeId) {
|
||||
clazz = Classification.HIGH;
|
||||
} else {
|
||||
clazz = Classification.LOW;
|
||||
}
|
||||
} else if (Config.be_rebalancer_fuzzy_test) {
|
||||
if (beStat.getLoadScore(medium) > avgLoadScore) {
|
||||
beStat.setClazz(medium, Classification.HIGH);
|
||||
highCounter++;
|
||||
clazz = Classification.HIGH;
|
||||
} else if (beStat.getLoadScore(medium) < avgLoadScore) {
|
||||
beStat.setClazz(medium, Classification.LOW);
|
||||
lowCounter++;
|
||||
clazz = Classification.LOW;
|
||||
}
|
||||
} else {
|
||||
if (Math.abs(beStat.getLoadScore(medium) - avgLoadScore) / avgLoadScore
|
||||
> Config.balance_load_score_threshold) {
|
||||
if (beStat.getLoadScore(medium) > avgLoadScore) {
|
||||
beStat.setClazz(medium, Classification.HIGH);
|
||||
highCounter++;
|
||||
clazz = Classification.HIGH;
|
||||
} else if (beStat.getLoadScore(medium) < avgLoadScore) {
|
||||
beStat.setClazz(medium, Classification.LOW);
|
||||
lowCounter++;
|
||||
clazz = Classification.LOW;
|
||||
}
|
||||
} else {
|
||||
beStat.setClazz(medium, Classification.MID);
|
||||
midCounter++;
|
||||
}
|
||||
}
|
||||
|
||||
beStat.setClazz(medium, clazz);
|
||||
switch (clazz) {
|
||||
case HIGH:
|
||||
highCounter++;
|
||||
break;
|
||||
case LOW:
|
||||
lowCounter++;
|
||||
break;
|
||||
case MID:
|
||||
midCounter++;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
LOG.debug("classify backend by load. medium: {} avg load score: {}. low/mid/high: {}/{}/{}",
|
||||
@ -265,6 +289,11 @@ public class LoadStatisticForTag {
|
||||
return false;
|
||||
}
|
||||
|
||||
long debugHighBeId = DebugPointUtil.getDebugParamOrDefault("FE.HIGH_LOAD_BE_ID", -1L);
|
||||
if (srcBeStat.getBeId() == debugHighBeId) {
|
||||
return true;
|
||||
}
|
||||
|
||||
currentSrcBeScore = srcBeStat.getLoadScore(medium);
|
||||
currentDestBeScore = destBeStat.getLoadScore(medium);
|
||||
|
||||
|
||||
@ -666,6 +666,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
List<Replica> decommissionCand = Lists.newArrayList();
|
||||
List<Replica> colocateCand = Lists.newArrayList();
|
||||
List<Replica> notColocateCand = Lists.newArrayList();
|
||||
List<Replica> furtherRepairs = Lists.newArrayList();
|
||||
for (Replica replica : tablet.getReplicas()) {
|
||||
if (replica.isBad()) {
|
||||
LOG.debug("replica {} is bad, skip. tablet: {}",
|
||||
@ -688,6 +689,11 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
|
||||
if (replica.getLastFailedVersion() <= 0
|
||||
&& replica.getVersion() >= visibleVersion) {
|
||||
|
||||
if (tabletStatus == TabletStatus.NEED_FURTHER_REPAIR && replica.needFurtherRepair()) {
|
||||
furtherRepairs.add(replica);
|
||||
}
|
||||
|
||||
// skip healthy replica
|
||||
LOG.debug("replica {} version {} is healthy, visible version {}, replica state {}, skip. tablet: {}",
|
||||
replica.getId(), replica.getVersion(), visibleVersion, replica.getState(), tabletId);
|
||||
@ -709,8 +715,24 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
} else {
|
||||
candidates = decommissionCand;
|
||||
}
|
||||
|
||||
if (candidates.isEmpty()) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "unable to choose dest replica");
|
||||
if (furtherRepairs.isEmpty()) {
|
||||
throw new SchedException(Status.UNRECOVERABLE, "unable to choose dest replica");
|
||||
}
|
||||
|
||||
boolean allCatchup = true;
|
||||
for (Replica replica : furtherRepairs) {
|
||||
if (checkFurthurRepairFinish(replica, visibleVersion)) {
|
||||
replica.setNeedFurtherRepair(false);
|
||||
replica.setFurtherRepairWatermarkTxnTd(-1);
|
||||
} else {
|
||||
allCatchup = false;
|
||||
}
|
||||
}
|
||||
|
||||
throw new SchedException(Status.FINISHED,
|
||||
allCatchup ? "further repair all catchup" : "further repair waiting catchup");
|
||||
}
|
||||
|
||||
Replica chosenReplica = null;
|
||||
@ -782,6 +804,32 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
setDest(chosenReplica.getBackendId(), chosenReplica.getPathHash());
|
||||
}
|
||||
|
||||
private boolean checkFurthurRepairFinish(Replica replica, long version) {
|
||||
if (replica.getVersion() < version || replica.getLastFailedVersion() > 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
long furtherRepairWatermarkTxnTd = replica.getFurtherRepairWatermarkTxnTd();
|
||||
if (furtherRepairWatermarkTxnTd < 0) {
|
||||
return true;
|
||||
}
|
||||
|
||||
try {
|
||||
if (Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
|
||||
furtherRepairWatermarkTxnTd, dbId, tblId, partitionId)) {
|
||||
LOG.info("replica {} of tablet {} has catchup with further repair watermark id {}",
|
||||
replica, tabletId, furtherRepairWatermarkTxnTd);
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("replica {} of tablet {} check catchup with further repair watermark id {} failed",
|
||||
replica, tabletId, furtherRepairWatermarkTxnTd, e);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public void releaseResource(TabletScheduler tabletScheduler) {
|
||||
releaseResource(tabletScheduler, false);
|
||||
}
|
||||
@ -1104,25 +1152,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
|
||||
// change from prepare to committed or visible, this replica will be fall behind and be removed
|
||||
// in REDUNDANT detection.
|
||||
//
|
||||
boolean isCatchup = false;
|
||||
if (replica.getVersion() >= partition.getVisibleVersion() && replica.getLastFailedVersion() < 0) {
|
||||
long furtherRepairWatermarkTxnTd = replica.getFurtherRepairWatermarkTxnTd();
|
||||
if (furtherRepairWatermarkTxnTd < 0) {
|
||||
isCatchup = true;
|
||||
} else {
|
||||
try {
|
||||
if (Env.getCurrentGlobalTransactionMgr().isPreviousTransactionsFinished(
|
||||
furtherRepairWatermarkTxnTd, dbId, tblId, partitionId)) {
|
||||
isCatchup = true;
|
||||
LOG.info("new replica {} of tablet {} has catchup with further repair watermark id {}",
|
||||
replica, tabletId, furtherRepairWatermarkTxnTd);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
isCatchup = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
boolean isCatchup = checkFurthurRepairFinish(replica, partition.getVisibleVersion());
|
||||
replica.incrFurtherRepairCount();
|
||||
if (isCatchup || replica.getLeftFurtherRepairCount() <= 0) {
|
||||
replica.setNeedFurtherRepair(false);
|
||||
|
||||
@ -46,6 +46,7 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.util.DebugPointUtil;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.persist.ReplicaPersistInfo;
|
||||
import org.apache.doris.resource.Tag;
|
||||
@ -983,6 +984,7 @@ public class TabletScheduler extends MasterDaemon {
|
||||
boolean force, LoadStatisticForTag statistic) throws SchedException {
|
||||
Replica chosenReplica = null;
|
||||
double maxScore = 0;
|
||||
long debugHighBeId = DebugPointUtil.getDebugParamOrDefault("FE.HIGH_LOAD_BE_ID", -1L);
|
||||
for (Replica replica : replicas) {
|
||||
BackendLoadStatistic beStatistic = statistic.getBackendLoadStatistic(replica.getBackendId());
|
||||
if (beStatistic == null) {
|
||||
@ -1007,6 +1009,11 @@ public class TabletScheduler extends MasterDaemon {
|
||||
maxScore = loadScore;
|
||||
chosenReplica = replica;
|
||||
}
|
||||
|
||||
if (debugHighBeId > 0 && replica.getBackendId() == debugHighBeId) {
|
||||
chosenReplica = replica;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (chosenReplica != null) {
|
||||
@ -1535,6 +1542,16 @@ public class TabletScheduler extends MasterDaemon {
|
||||
statusPair.second = tabletCtx.getPriority();
|
||||
}
|
||||
}
|
||||
|
||||
if (statusPair.first == TabletStatus.NEED_FURTHER_REPAIR) {
|
||||
// replica is just waiting for finishing txns before furtherRepairWatermarkTxnTd,
|
||||
// no need to add it immediately
|
||||
Replica replica = tablet.getReplicaByBackendId(tabletCtx.getDestBackendId());
|
||||
if (replica != null && replica.getVersion() >= partition.getVisibleVersion()
|
||||
&& replica.getLastFailedVersion() < 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
tbl.readUnlock();
|
||||
}
|
||||
|
||||
@ -30,6 +30,9 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Use for manage debug points.
|
||||
*
|
||||
* usage example can see DebugPointUtilTest.java
|
||||
*
|
||||
**/
|
||||
public class DebugPointUtil {
|
||||
private static final Logger LOG = LogManager.getLogger(DebugPointUtil.class);
|
||||
@ -109,11 +112,34 @@ public class DebugPointUtil {
|
||||
return debugPoint;
|
||||
}
|
||||
|
||||
// if not enable debug point or its params not contains `key`, then return `defaultValue`
|
||||
// url: /api/debug_point/add/name?k1=v1&k2=v2&...
|
||||
public static <E> E getDebugParamOrDefault(String debugPointName, String key, E defaultValue) {
|
||||
DebugPoint debugPoint = getDebugPoint(debugPointName);
|
||||
|
||||
return debugPoint != null ? debugPoint.param(key, defaultValue) : defaultValue;
|
||||
}
|
||||
|
||||
// url: /api/debug_point/add/name?value=v
|
||||
public static <E> E getDebugParamOrDefault(String debugPointName, E defaultValue) {
|
||||
return getDebugParamOrDefault(debugPointName, "value", defaultValue);
|
||||
}
|
||||
|
||||
public static void addDebugPoint(String name, DebugPoint debugPoint) {
|
||||
debugPoints.put(name, debugPoint);
|
||||
LOG.info("add debug point: name={}, params={}", name, debugPoint.params);
|
||||
}
|
||||
|
||||
public static void addDebugPoint(String name) {
|
||||
addDebugPoint(name, new DebugPoint());
|
||||
}
|
||||
|
||||
public static <E> void addDebugPointWithValue(String name, E value) {
|
||||
DebugPoint debugPoint = new DebugPoint();
|
||||
debugPoint.params.put("value", String.format("%s", value));
|
||||
addDebugPoint(name, debugPoint);
|
||||
}
|
||||
|
||||
public static void removeDebugPoint(String name) {
|
||||
DebugPoint debugPoint = debugPoints.remove(name);
|
||||
LOG.info("remove debug point: name={}, exists={}", name, debugPoint != null);
|
||||
|
||||
@ -66,6 +66,12 @@ public class DebugPointUtilTest extends DorisHttpTestCase {
|
||||
Assert.assertTrue(debugPoint.param("v4", false));
|
||||
Assert.assertFalse(debugPoint.param("v5", false));
|
||||
Assert.assertEquals(123L, (long) debugPoint.param("v_no_exist", 123L));
|
||||
|
||||
Assert.assertEquals(1, (int) DebugPointUtil.getDebugParamOrDefault("dbug5", "v1", 0));
|
||||
Assert.assertEquals(100, (int) DebugPointUtil.getDebugParamOrDefault("point_not_exists", "v1", 100));
|
||||
|
||||
sendRequest("/api/debug_point/add/dbug6?value=100");
|
||||
Assert.assertEquals(100, (int) DebugPointUtil.getDebugParamOrDefault("dbug6", 0));
|
||||
}
|
||||
|
||||
private void sendRequest(String uri) throws Exception {
|
||||
|
||||
Reference in New Issue
Block a user