diff --git a/src/share/system_variable/ob_sys_var_class_type.h b/src/share/system_variable/ob_sys_var_class_type.h index 2d5f19d75..32685f6ed 100644 --- a/src/share/system_variable/ob_sys_var_class_type.h +++ b/src/share/system_variable/ob_sys_var_class_type.h @@ -254,6 +254,7 @@ enum ObSysVarClassType SYS_VAR_RUNTIME_FILTER_MAX_IN_NUM = 10148, SYS_VAR_RUNTIME_BLOOM_FILTER_MAX_SIZE = 10149, SYS_VAR_OPTIMIZER_FEATURES_ENABLE = 10150, + SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK = 10151, }; } diff --git a/src/share/system_variable/ob_system_variable_alias.h b/src/share/system_variable/ob_system_variable_alias.h index 0c05d7b52..a019e8bcb 100644 --- a/src/share/system_variable/ob_system_variable_alias.h +++ b/src/share/system_variable/ob_system_variable_alias.h @@ -249,6 +249,7 @@ namespace share static const char* const OB_SV_RUNTIME_FILTER_MAX_IN_NUM = "runtime_filter_max_in_num"; static const char* const OB_SV_RUNTIME_BLOOM_FILTER_MAX_SIZE = "runtime_bloom_filter_max_size"; static const char* const OB_SV_OPTIMIZER_FEATURES_ENABLE = "optimizer_features_enable"; + static const char* const OB_SV__OB_PROXY_WEAKREAD_FEEDBACK = "_ob_proxy_weakread_feedback"; } } diff --git a/src/share/system_variable/ob_system_variable_factory.cpp b/src/share/system_variable/ob_system_variable_factory.cpp index 4fc335db8..4cca45e36 100644 --- a/src/share/system_variable/ob_system_variable_factory.cpp +++ b/src/share/system_variable/ob_system_variable_factory.cpp @@ -138,6 +138,7 @@ const char *ObSysVarFactory::SYS_VAR_NAMES_SORTED_BY_NAME[] = { "_nlj_batching_enabled", "_ob_ols_policy_session_labels", "_ob_proxy_session_temporary_table_used", + "_ob_proxy_weakread_feedback", "_ob_px_bcast_optimization", "_ob_px_slave_mapping_threshold", "_optimizer_gather_stats_on_load", @@ -374,6 +375,7 @@ const ObSysVarClassType ObSysVarFactory::SYS_VAR_IDS_SORTED_BY_NAME[] = { SYS_VAR__NLJ_BATCHING_ENABLED, SYS_VAR__OB_OLS_POLICY_SESSION_LABELS, SYS_VAR__OB_PROXY_SESSION_TEMPORARY_TABLE_USED, + SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK, SYS_VAR__OB_PX_BCAST_OPTIMIZATION, SYS_VAR__OB_PX_SLAVE_MAPPING_THRESHOLD, SYS_VAR__OPTIMIZER_GATHER_STATS_ON_LOAD, @@ -827,7 +829,8 @@ const char *ObSysVarFactory::SYS_VAR_NAMES_SORTED_BY_ID[] = { "runtime_filter_wait_time_ms", "runtime_filter_max_in_num", "runtime_bloom_filter_max_size", - "optimizer_features_enable" + "optimizer_features_enable", + "_ob_proxy_weakread_feedback" }; bool ObSysVarFactory::sys_var_name_case_cmp(const char *name1, const ObString &name2) @@ -1229,6 +1232,7 @@ int ObSysVarFactory::create_all_sys_vars() + sizeof(ObSysVarRuntimeFilterMaxInNum) + sizeof(ObSysVarRuntimeBloomFilterMaxSize) + sizeof(ObSysVarOptimizerFeaturesEnable) + + sizeof(ObSysVarObProxyWeakreadFeedback) ; void *ptr = NULL; if (OB_ISNULL(ptr = allocator_.alloc(total_mem_size))) { @@ -3334,6 +3338,15 @@ int ObSysVarFactory::create_all_sys_vars() ptr = (void *)((char *)ptr + sizeof(ObSysVarOptimizerFeaturesEnable)); } } + if (OB_SUCC(ret)) { + if (OB_ISNULL(sys_var_ptr = new (ptr)ObSysVarObProxyWeakreadFeedback())) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_ERROR("fail to new ObSysVarObProxyWeakreadFeedback", K(ret)); + } else { + store_buf_[ObSysVarsToIdxMap::get_store_idx(static_cast(SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK))] = sys_var_ptr; + ptr = (void *)((char *)ptr + sizeof(ObSysVarObProxyWeakreadFeedback)); + } + } } return ret; @@ -5907,6 +5920,17 @@ int ObSysVarFactory::create_sys_var(ObIAllocator &allocator_, ObSysVarClassType } break; } + case SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK: { + void *ptr = NULL; + if (OB_ISNULL(ptr = allocator_.alloc(sizeof(ObSysVarObProxyWeakreadFeedback)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_ERROR("fail to alloc memory", K(ret), K(sizeof(ObSysVarObProxyWeakreadFeedback))); + } else if (OB_ISNULL(sys_var_ptr = new (ptr)ObSysVarObProxyWeakreadFeedback())) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_ERROR("fail to new ObSysVarObProxyWeakreadFeedback", K(ret)); + } + break; + } default: { ret = OB_ERR_UNEXPECTED; diff --git a/src/share/system_variable/ob_system_variable_factory.h b/src/share/system_variable/ob_system_variable_factory.h index bf871391e..f6f7a4e9d 100644 --- a/src/share/system_variable/ob_system_variable_factory.h +++ b/src/share/system_variable/ob_system_variable_factory.h @@ -1684,6 +1684,13 @@ public: inline virtual ObSysVarClassType get_type() const { return SYS_VAR_OPTIMIZER_FEATURES_ENABLE; } inline virtual const common::ObObj &get_global_default_value() const { return ObSysVariables::get_default_value(232); } }; +class ObSysVarObProxyWeakreadFeedback : public ObIntSysVar +{ +public: + ObSysVarObProxyWeakreadFeedback() : ObIntSysVar(NULL, NULL, NULL, NULL, NULL) {} + inline virtual ObSysVarClassType get_type() const { return SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK; } + inline virtual const common::ObObj &get_global_default_value() const { return ObSysVariables::get_default_value(233); } +}; class ObSysVarFactory @@ -1704,7 +1711,7 @@ public: static const common::ObString get_sys_var_name_by_id(ObSysVarClassType sys_var_id); const static int64_t MYSQL_SYS_VARS_COUNT = 97; - const static int64_t OB_SYS_VARS_COUNT = 136; + const static int64_t OB_SYS_VARS_COUNT = 137; const static int64_t ALL_SYS_VARS_COUNT = MYSQL_SYS_VARS_COUNT + OB_SYS_VARS_COUNT; const static int64_t INVALID_MAX_READ_STALE_TIME = -1; diff --git a/src/share/system_variable/ob_system_variable_init.cpp b/src/share/system_variable/ob_system_variable_init.cpp index b903140c3..f6b983bb4 100644 --- a/src/share/system_variable/ob_system_variable_init.cpp +++ b/src/share/system_variable/ob_system_variable_init.cpp @@ -3292,13 +3292,26 @@ static struct VarsInit{ ObSysVars[232].alias_ = "OB_SV_OPTIMIZER_FEATURES_ENABLE" ; }(); + [&] (){ + ObSysVars[233].default_value_ = "0" ; + ObSysVars[233].info_ = "In the weak read state, the replica status of the current machine is fed back to the proxy." ; + ObSysVars[233].name_ = "_ob_proxy_weakread_feedback" ; + ObSysVars[233].data_type_ = ObIntType ; + ObSysVars[233].flags_ = ObSysVarFlag::READONLY | ObSysVarFlag::SESSION_SCOPE | ObSysVarFlag::INVISIBLE ; + ObSysVars[233].id_ = SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK ; + cur_max_var_id = MAX(cur_max_var_id, static_cast(SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK)) ; + ObSysVarsIdToArrayIdx[SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK] = 233 ; + ObSysVars[233].base_value_ = "0" ; + ObSysVars[233].alias_ = "OB_SV__OB_PROXY_WEAKREAD_FEEDBACK" ; + }(); + if (cur_max_var_id >= ObSysVarFactory::OB_MAX_SYS_VAR_ID) { HasInvalidSysVar = true; } } }vars_init; -static int64_t var_amount = 233; +static int64_t var_amount = 234; int64_t ObSysVariables::get_all_sys_var_count(){ return ObSysVarFactory::ALL_SYS_VARS_COUNT;} ObSysVarClassType ObSysVariables::get_sys_var_id(int64_t i){ return ObSysVars[i].id_;} diff --git a/src/share/system_variable/ob_system_variable_init.json b/src/share/system_variable/ob_system_variable_init.json index 7bd5a664e..5c00ab757 100644 --- a/src/share/system_variable/ob_system_variable_init.json +++ b/src/share/system_variable/ob_system_variable_init.json @@ -3322,5 +3322,18 @@ "info_cn": "", "background_cn": "", "ref_url": "" + }, + "_ob_proxy_weakread_feedback": { + "id": 10151, + "name": "_ob_proxy_weakread_feedback", + "default_value": "0", + "base_value": "0", + "data_type": "int", + "info": "In the weak read state, the replica status of the current machine is fed back to the proxy.", + "flags": "READONLY | SESSION | INVISIBLE", + "publish_version": "420", + "info_cn": "", + "background_cn": "", + "ref_url": "" } } diff --git a/src/sql/optimizer/ob_log_plan.cpp b/src/sql/optimizer/ob_log_plan.cpp index 3c13c9362..6e5260f76 100644 --- a/src/sql/optimizer/ob_log_plan.cpp +++ b/src/sql/optimizer/ob_log_plan.cpp @@ -2076,6 +2076,7 @@ int ObLogPlan::select_replicas(ObExecContext &exec_ctx, ObSQLSessionInfo *session = exec_ctx.get_my_session(); ObTaskExecutorCtx &task_exec_ctx = exec_ctx.get_task_exec_ctx(); bool is_hit_partition = false; + int64_t proxy_stat = 0; ObFollowerFirstFeedbackType follower_first_feedback = FFF_HIT_MIN; int64_t route_policy_type = 0; bool proxy_priority_hit_support = false; @@ -2098,7 +2099,7 @@ int ObLogPlan::select_replicas(ObExecContext &exec_ctx, tenant_id, max_read_stale_time, phy_tbl_loc_info_list, - is_hit_partition, follower_first_feedback))) { + is_hit_partition, follower_first_feedback, proxy_stat))) { LOG_WARN("fail to weak select intersect replicas", K(ret), K(local_server), K(phy_tbl_loc_info_list.count())); } else { session->partition_hit().try_set_bool(is_hit_partition); @@ -2107,6 +2108,14 @@ int ObLogPlan::select_replicas(ObExecContext &exec_ctx, LOG_WARN("fail to set_follower_first_feedback", K(follower_first_feedback), K(ret)); } } + + if (OB_SUCC(ret) && proxy_stat != 0) { + ObObj val; + val.set_int(proxy_stat); + if (OB_FAIL(session->update_sys_variable(SYS_VAR__OB_PROXY_WEAKREAD_FEEDBACK, val))) { + LOG_WARN("replace user val failed", K(ret), K(val)); + } + } } } else { const bool sess_in_retry = session->get_is_in_retry_for_dup_tbl(); //重试状态下不优化复制表的副本选择 @@ -2183,9 +2192,11 @@ int ObLogPlan::weak_select_replicas(const ObAddr &local_server, int64_t max_read_stale_time, ObIArray &phy_tbl_loc_info_list, bool &is_hit_partition, - ObFollowerFirstFeedbackType &follower_first_feedback) + ObFollowerFirstFeedbackType &follower_first_feedback, + int64_t &proxy_stat) { int ret = OB_SUCCESS; + proxy_stat = 0; is_hit_partition = true;//当前没有办法来判断是否能选择在一台机器上,所以将该值设置为true ObCandiTableLoc * phy_tbl_loc_info = nullptr; ObArenaAllocator allocator(ObModIds::OB_SQL_OPTIMIZER_SELECT_REPLICA); @@ -2241,7 +2252,9 @@ int ObLogPlan::weak_select_replicas(const ObAddr &local_server, } else { ObArenaAllocator allocator(ObModIds::OB_SQL_OPTIMIZER_SELECT_REPLICA); ObAddrList intersect_servers(allocator); - if (OB_FAIL(calc_hit_partition_for_compat(phy_tbl_loc_info_list, local_server, is_hit_partition, intersect_servers))) { + if (OB_FAIL(calc_rwsplit_partition_feedback(phy_tbl_loc_info_list, local_server, proxy_stat))) { + LOG_WARN("fail to calc proxy partition feedback", K(ret)); + } else if (OB_FAIL(calc_hit_partition_for_compat(phy_tbl_loc_info_list, local_server, is_hit_partition, intersect_servers))) { LOG_WARN("fail to calc hit partition for compat", K(ret)); } else { if (is_hit_partition && route_policy.is_follower_first_route_policy_type(route_policy_ctx)) { @@ -2322,6 +2335,79 @@ int ObLogPlan::calc_follower_first_feedback(const ObIArray &ph return ret; } +int ObLogPlan::calc_rwsplit_partition_feedback(const common::ObIArray &phy_tbl_loc_info_list, + const common::ObAddr &local_server, + int64_t &proxy_stat) +{ + INIT_SUCC(ret); + + bool all_leader = false; + bool all_follower = false; + bool need_break = false; + for (int64_t i = 0; OB_SUCC(ret) && !need_break && (i < phy_tbl_loc_info_list.count()); ++i) { + // table + const ObCandiTableLoc *phy_tbl_loc_info = phy_tbl_loc_info_list.at(i); + if (OB_ISNULL(phy_tbl_loc_info)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("phy_tbl_loc_info is NULL", K(ret), K(i), K(phy_tbl_loc_info_list.count())); + } else { + const ObCandiTabletLocIArray &phy_part_loc_info_list = + phy_tbl_loc_info->get_phy_part_loc_info_list(); + if (phy_part_loc_info_list.empty()) { + // just defense, when partition location list is empty, treat as it's not leader replica + need_break = true; + } + + for (int64_t j = 0; OB_SUCC(ret) && !need_break && (j < phy_part_loc_info_list.count()); ++j) { + // partition + bool found_server = false; + const ObCandiTabletLoc &phy_part_loc_info = phy_part_loc_info_list.at(j); + const ObIArray &replica_loc_list = + phy_part_loc_info.get_partition_location().get_replica_locations(); + LOG_TRACE("weak read list", K(replica_loc_list), K(local_server)); + for (int64_t k = 0; !found_server && (k < replica_loc_list.count()); ++k) { + // replica + const ObRoutePolicy::CandidateReplica &tmp_replica = replica_loc_list.at(k); + if (local_server == tmp_replica.get_server()) { + found_server = true; + if (is_strong_leader(tmp_replica.get_role())) { + all_leader = true; + // part leader, part follower + need_break = all_follower ? true : false; + } else { + all_follower = true; + // part leader, part follower + need_break = all_leader ? true : false; + } + } + } + } + } + } + + LOG_TRACE("get feedback policy", K(all_leader), K(all_follower)); + //Design a kv pair (hidden user variable, __ob_proxy_weakread_feedback(bool)): + //state: + //1. The current machine does not have any replicas (returns true) + //2. Some/all copies involved are distributed on the current machine: + // a. ALL FOLLOWER (do not return) + // b. ALL LEADER (return true) + // c. PART LEADER/PART FOLLOWER (return true) + if (!all_leader && !all_follower) { + // Current machine has no replica (proxy sent incorrectly, refresh location cache). + proxy_stat = 1; + } else if (all_leader && all_follower) { + // part leader, part follower + proxy_stat = 3; + } else if (all_leader) { + // all leader (proxy sent incorrectly, refresh location cache) + proxy_stat = 2; + } else if (all_follower) { + // proxy not need refresh location cache + } + return ret; +} + //该函数是为了兼容老版本proxy的hit策略,当proxy更新后可以去掉该函数 int ObLogPlan::calc_hit_partition_for_compat(const ObIArray &phy_tbl_loc_info_list, const ObAddr &local_server, diff --git a/src/sql/optimizer/ob_log_plan.h b/src/sql/optimizer/ob_log_plan.h index c5920dfff..9525841f1 100644 --- a/src/sql/optimizer/ob_log_plan.h +++ b/src/sql/optimizer/ob_log_plan.h @@ -1705,7 +1705,8 @@ private: // member functions int64_t max_read_stale_time, common::ObIArray &phy_tbl_loc_info_list, bool &is_hit_partition, - share::ObFollowerFirstFeedbackType &follower_first_feedback); + share::ObFollowerFirstFeedbackType &follower_first_feedback, + int64_t &proxy_stat); static int calc_hit_partition_for_compat(const common::ObIArray &phy_tbl_loc_info_list, const common::ObAddr &local_server, bool &is_hit_partition, @@ -1715,6 +1716,10 @@ private: // member functions const ObAddrList &intersect_servers, share::ObFollowerFirstFeedbackType &follower_first_feedback); + static int calc_rwsplit_partition_feedback(const common::ObIArray &phy_tbl_loc_info_list, + const common::ObAddr &local_server, + int64_t &proxy_stat); + int set_connect_by_property(JoinPath *join_path, ObLogJoin &log_join); static int calc_intersect_servers(const ObIArray &phy_tbl_loc_info_list, ObList &candidate_server_list);