weak read proxy feedback

This commit is contained in:
obdev 2023-09-14 10:10:22 +00:00 committed by ob-robot
parent 3ec63a457a
commit 3a1422f11e
8 changed files with 157 additions and 7 deletions

View File

@ -254,6 +254,7 @@ enum ObSysVarClassType
SYS_VAR_RUNTIME_FILTER_MAX_IN_NUM = 10148,
SYS_VAR_RUNTIME_BLOOM_FILTER_MAX_SIZE = 10149,
SYS_VAR_OPTIMIZER_FEATURES_ENABLE = 10150,
SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK = 10151,
};
}

View File

@ -249,6 +249,7 @@ namespace share
static const char* const OB_SV_RUNTIME_FILTER_MAX_IN_NUM = "runtime_filter_max_in_num";
static const char* const OB_SV_RUNTIME_BLOOM_FILTER_MAX_SIZE = "runtime_bloom_filter_max_size";
static const char* const OB_SV_OPTIMIZER_FEATURES_ENABLE = "optimizer_features_enable";
static const char* const OB_SV__OB_PROXY_WEAKREAD_FEEDBACK = "_ob_proxy_weakread_feedback";
}
}

View File

@ -138,6 +138,7 @@ const char *ObSysVarFactory::SYS_VAR_NAMES_SORTED_BY_NAME[] = {
"_nlj_batching_enabled",
"_ob_ols_policy_session_labels",
"_ob_proxy_session_temporary_table_used",
"_ob_proxy_weakread_feedback",
"_ob_px_bcast_optimization",
"_ob_px_slave_mapping_threshold",
"_optimizer_gather_stats_on_load",
@ -374,6 +375,7 @@ const ObSysVarClassType ObSysVarFactory::SYS_VAR_IDS_SORTED_BY_NAME[] = {
SYS_VAR__NLJ_BATCHING_ENABLED,
SYS_VAR__OB_OLS_POLICY_SESSION_LABELS,
SYS_VAR__OB_PROXY_SESSION_TEMPORARY_TABLE_USED,
SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK,
SYS_VAR__OB_PX_BCAST_OPTIMIZATION,
SYS_VAR__OB_PX_SLAVE_MAPPING_THRESHOLD,
SYS_VAR__OPTIMIZER_GATHER_STATS_ON_LOAD,
@ -827,7 +829,8 @@ const char *ObSysVarFactory::SYS_VAR_NAMES_SORTED_BY_ID[] = {
"runtime_filter_wait_time_ms",
"runtime_filter_max_in_num",
"runtime_bloom_filter_max_size",
"optimizer_features_enable"
"optimizer_features_enable",
"_ob_proxy_weakread_feedback"
};
bool ObSysVarFactory::sys_var_name_case_cmp(const char *name1, const ObString &name2)
@ -1229,6 +1232,7 @@ int ObSysVarFactory::create_all_sys_vars()
+ sizeof(ObSysVarRuntimeFilterMaxInNum)
+ sizeof(ObSysVarRuntimeBloomFilterMaxSize)
+ sizeof(ObSysVarOptimizerFeaturesEnable)
+ sizeof(ObSysVarObProxyWeakreadFeedback)
;
void *ptr = NULL;
if (OB_ISNULL(ptr = allocator_.alloc(total_mem_size))) {
@ -3334,6 +3338,15 @@ int ObSysVarFactory::create_all_sys_vars()
ptr = (void *)((char *)ptr + sizeof(ObSysVarOptimizerFeaturesEnable));
}
}
if (OB_SUCC(ret)) {
if (OB_ISNULL(sys_var_ptr = new (ptr)ObSysVarObProxyWeakreadFeedback())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("fail to new ObSysVarObProxyWeakreadFeedback", K(ret));
} else {
store_buf_[ObSysVarsToIdxMap::get_store_idx(static_cast<int64_t>(SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK))] = sys_var_ptr;
ptr = (void *)((char *)ptr + sizeof(ObSysVarObProxyWeakreadFeedback));
}
}
}
return ret;
@ -5907,6 +5920,17 @@ int ObSysVarFactory::create_sys_var(ObIAllocator &allocator_, ObSysVarClassType
}
break;
}
case SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK: {
void *ptr = NULL;
if (OB_ISNULL(ptr = allocator_.alloc(sizeof(ObSysVarObProxyWeakreadFeedback)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("fail to alloc memory", K(ret), K(sizeof(ObSysVarObProxyWeakreadFeedback)));
} else if (OB_ISNULL(sys_var_ptr = new (ptr)ObSysVarObProxyWeakreadFeedback())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("fail to new ObSysVarObProxyWeakreadFeedback", K(ret));
}
break;
}
default: {
ret = OB_ERR_UNEXPECTED;

View File

@ -1684,6 +1684,13 @@ public:
inline virtual ObSysVarClassType get_type() const { return SYS_VAR_OPTIMIZER_FEATURES_ENABLE; }
inline virtual const common::ObObj &get_global_default_value() const { return ObSysVariables::get_default_value(232); }
};
class ObSysVarObProxyWeakreadFeedback : public ObIntSysVar
{
public:
ObSysVarObProxyWeakreadFeedback() : ObIntSysVar(NULL, NULL, NULL, NULL, NULL) {}
inline virtual ObSysVarClassType get_type() const { return SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK; }
inline virtual const common::ObObj &get_global_default_value() const { return ObSysVariables::get_default_value(233); }
};
class ObSysVarFactory
@ -1704,7 +1711,7 @@ public:
static const common::ObString get_sys_var_name_by_id(ObSysVarClassType sys_var_id);
const static int64_t MYSQL_SYS_VARS_COUNT = 97;
const static int64_t OB_SYS_VARS_COUNT = 136;
const static int64_t OB_SYS_VARS_COUNT = 137;
const static int64_t ALL_SYS_VARS_COUNT = MYSQL_SYS_VARS_COUNT + OB_SYS_VARS_COUNT;
const static int64_t INVALID_MAX_READ_STALE_TIME = -1;

View File

@ -3292,13 +3292,26 @@ static struct VarsInit{
ObSysVars[232].alias_ = "OB_SV_OPTIMIZER_FEATURES_ENABLE" ;
}();
[&] (){
ObSysVars[233].default_value_ = "0" ;
ObSysVars[233].info_ = "In the weak read state, the replica status of the current machine is fed back to the proxy." ;
ObSysVars[233].name_ = "_ob_proxy_weakread_feedback" ;
ObSysVars[233].data_type_ = ObIntType ;
ObSysVars[233].flags_ = ObSysVarFlag::READONLY | ObSysVarFlag::SESSION_SCOPE | ObSysVarFlag::INVISIBLE ;
ObSysVars[233].id_ = SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK ;
cur_max_var_id = MAX(cur_max_var_id, static_cast<int64_t>(SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK)) ;
ObSysVarsIdToArrayIdx[SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK] = 233 ;
ObSysVars[233].base_value_ = "0" ;
ObSysVars[233].alias_ = "OB_SV__OB_PROXY_WEAKREAD_FEEDBACK" ;
}();
if (cur_max_var_id >= ObSysVarFactory::OB_MAX_SYS_VAR_ID) {
HasInvalidSysVar = true;
}
}
}vars_init;
static int64_t var_amount = 233;
static int64_t var_amount = 234;
int64_t ObSysVariables::get_all_sys_var_count(){ return ObSysVarFactory::ALL_SYS_VARS_COUNT;}
ObSysVarClassType ObSysVariables::get_sys_var_id(int64_t i){ return ObSysVars[i].id_;}

View File

@ -3322,5 +3322,18 @@
"info_cn": "",
"background_cn": "",
"ref_url": ""
},
"_ob_proxy_weakread_feedback": {
"id": 10151,
"name": "_ob_proxy_weakread_feedback",
"default_value": "0",
"base_value": "0",
"data_type": "int",
"info": "In the weak read state, the replica status of the current machine is fed back to the proxy.",
"flags": "READONLY | SESSION | INVISIBLE",
"publish_version": "420",
"info_cn": "",
"background_cn": "",
"ref_url": ""
}
}

View File

@ -2076,6 +2076,7 @@ int ObLogPlan::select_replicas(ObExecContext &exec_ctx,
ObSQLSessionInfo *session = exec_ctx.get_my_session();
ObTaskExecutorCtx &task_exec_ctx = exec_ctx.get_task_exec_ctx();
bool is_hit_partition = false;
int64_t proxy_stat = 0;
ObFollowerFirstFeedbackType follower_first_feedback = FFF_HIT_MIN;
int64_t route_policy_type = 0;
bool proxy_priority_hit_support = false;
@ -2098,7 +2099,7 @@ int ObLogPlan::select_replicas(ObExecContext &exec_ctx,
tenant_id,
max_read_stale_time,
phy_tbl_loc_info_list,
is_hit_partition, follower_first_feedback))) {
is_hit_partition, follower_first_feedback, proxy_stat))) {
LOG_WARN("fail to weak select intersect replicas", K(ret), K(local_server), K(phy_tbl_loc_info_list.count()));
} else {
session->partition_hit().try_set_bool(is_hit_partition);
@ -2107,6 +2108,14 @@ int ObLogPlan::select_replicas(ObExecContext &exec_ctx,
LOG_WARN("fail to set_follower_first_feedback", K(follower_first_feedback), K(ret));
}
}
if (OB_SUCC(ret) && proxy_stat != 0) {
ObObj val;
val.set_int(proxy_stat);
if (OB_FAIL(session->update_sys_variable(SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK, val))) {
LOG_WARN("replace user val failed", K(ret), K(val));
}
}
}
} else {
const bool sess_in_retry = session->get_is_in_retry_for_dup_tbl(); //重试状态下不优化复制表的副本选择
@ -2183,9 +2192,11 @@ int ObLogPlan::weak_select_replicas(const ObAddr &local_server,
int64_t max_read_stale_time,
ObIArray<ObCandiTableLoc*> &phy_tbl_loc_info_list,
bool &is_hit_partition,
ObFollowerFirstFeedbackType &follower_first_feedback)
ObFollowerFirstFeedbackType &follower_first_feedback,
int64_t &proxy_stat)
{
int ret = OB_SUCCESS;
proxy_stat = 0;
is_hit_partition = true;//当前没有办法来判断是否能选择在一台机器上,所以将该值设置为true
ObCandiTableLoc * phy_tbl_loc_info = nullptr;
ObArenaAllocator allocator(ObModIds::OB_SQL_OPTIMIZER_SELECT_REPLICA);
@ -2241,7 +2252,9 @@ int ObLogPlan::weak_select_replicas(const ObAddr &local_server,
} else {
ObArenaAllocator allocator(ObModIds::OB_SQL_OPTIMIZER_SELECT_REPLICA);
ObAddrList intersect_servers(allocator);
if (OB_FAIL(calc_hit_partition_for_compat(phy_tbl_loc_info_list, local_server, is_hit_partition, intersect_servers))) {
if (OB_FAIL(calc_rwsplit_partition_feedback(phy_tbl_loc_info_list, local_server, proxy_stat))) {
LOG_WARN("fail to calc proxy partition feedback", K(ret));
} else if (OB_FAIL(calc_hit_partition_for_compat(phy_tbl_loc_info_list, local_server, is_hit_partition, intersect_servers))) {
LOG_WARN("fail to calc hit partition for compat", K(ret));
} else {
if (is_hit_partition && route_policy.is_follower_first_route_policy_type(route_policy_ctx)) {
@ -2322,6 +2335,79 @@ int ObLogPlan::calc_follower_first_feedback(const ObIArray<ObCandiTableLoc*> &ph
return ret;
}
int ObLogPlan::calc_rwsplit_partition_feedback(const common::ObIArray<ObCandiTableLoc*> &phy_tbl_loc_info_list,
const common::ObAddr &local_server,
int64_t &proxy_stat)
{
INIT_SUCC(ret);
bool all_leader = false;
bool all_follower = false;
bool need_break = false;
for (int64_t i = 0; OB_SUCC(ret) && !need_break && (i < phy_tbl_loc_info_list.count()); ++i) {
// table
const ObCandiTableLoc *phy_tbl_loc_info = phy_tbl_loc_info_list.at(i);
if (OB_ISNULL(phy_tbl_loc_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("phy_tbl_loc_info is NULL", K(ret), K(i), K(phy_tbl_loc_info_list.count()));
} else {
const ObCandiTabletLocIArray &phy_part_loc_info_list =
phy_tbl_loc_info->get_phy_part_loc_info_list();
if (phy_part_loc_info_list.empty()) {
// just defense, when partition location list is empty, treat as it's not leader replica
need_break = true;
}
for (int64_t j = 0; OB_SUCC(ret) && !need_break && (j < phy_part_loc_info_list.count()); ++j) {
// partition
bool found_server = false;
const ObCandiTabletLoc &phy_part_loc_info = phy_part_loc_info_list.at(j);
const ObIArray<ObRoutePolicy::CandidateReplica> &replica_loc_list =
phy_part_loc_info.get_partition_location().get_replica_locations();
LOG_TRACE("weak read list", K(replica_loc_list), K(local_server));
for (int64_t k = 0; !found_server && (k < replica_loc_list.count()); ++k) {
// replica
const ObRoutePolicy::CandidateReplica &tmp_replica = replica_loc_list.at(k);
if (local_server == tmp_replica.get_server()) {
found_server = true;
if (is_strong_leader(tmp_replica.get_role())) {
all_leader = true;
// part leader, part follower
need_break = all_follower ? true : false;
} else {
all_follower = true;
// part leader, part follower
need_break = all_leader ? true : false;
}
}
}
}
}
}
LOG_TRACE("get feedback policy", K(all_leader), K(all_follower));
//Design a kv pair (hidden user variable, __ob_proxy_weakread_feedback(bool)):
//state:
//1. The current machine does not have any replicas (returns true)
//2. Some/all copies involved are distributed on the current machine:
// a. ALL FOLLOWER (do not return)
// b. ALL LEADER (return true)
// c. PART LEADER/PART FOLLOWER (return true)
if (!all_leader && !all_follower) {
// Current machine has no replica (proxy sent incorrectly, refresh location cache).
proxy_stat = 1;
} else if (all_leader && all_follower) {
// part leader, part follower
proxy_stat = 3;
} else if (all_leader) {
// all leader (proxy sent incorrectly, refresh location cache)
proxy_stat = 2;
} else if (all_follower) {
// proxy not need refresh location cache
}
return ret;
}
//该函数是为了兼容老版本proxy的hit策略,当proxy更新后可以去掉该函数
int ObLogPlan::calc_hit_partition_for_compat(const ObIArray<ObCandiTableLoc*> &phy_tbl_loc_info_list,
const ObAddr &local_server,

View File

@ -1705,7 +1705,8 @@ private: // member functions
int64_t max_read_stale_time,
common::ObIArray<ObCandiTableLoc*> &phy_tbl_loc_info_list,
bool &is_hit_partition,
share::ObFollowerFirstFeedbackType &follower_first_feedback);
share::ObFollowerFirstFeedbackType &follower_first_feedback,
int64_t &proxy_stat);
static int calc_hit_partition_for_compat(const common::ObIArray<ObCandiTableLoc*> &phy_tbl_loc_info_list,
const common::ObAddr &local_server,
bool &is_hit_partition,
@ -1715,6 +1716,10 @@ private: // member functions
const ObAddrList &intersect_servers,
share::ObFollowerFirstFeedbackType &follower_first_feedback);
static int calc_rwsplit_partition_feedback(const common::ObIArray<ObCandiTableLoc*> &phy_tbl_loc_info_list,
const common::ObAddr &local_server,
int64_t &proxy_stat);
int set_connect_by_property(JoinPath *join_path, ObLogJoin &log_join);
static int calc_intersect_servers(const ObIArray<ObCandiTableLoc*> &phy_tbl_loc_info_list,
ObList<ObAddr, ObArenaAllocator> &candidate_server_list);