From f6664ffdf86094d37afdf7a3418dc19beba8b127 Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 15 Jun 2023 12:42:16 +0000 Subject: [PATCH] [Physical Standby] Support for region-specific to fetch log --- src/logservice/CMakeLists.txt | 1 + .../libobcdc/src/ob_log_fetcher.cpp | 7 +- src/logservice/logfetcher/ob_log_config.h | 2 +- src/logservice/logfetcher/ob_log_fetcher.cpp | 63 +++-- src/logservice/logfetcher/ob_log_fetcher.h | 9 + .../logfetcher/ob_log_fetcher_switch_info.h | 4 - .../logfetcher/ob_log_ls_fetch_ctx.cpp | 9 +- .../logfetcher/ob_log_ls_fetch_stream.cpp | 14 +- .../logrouteservice/ob_all_units_info.cpp | 70 ++++++ .../logrouteservice/ob_all_units_info.h | 79 +++++++ .../logrouteservice/ob_all_zone_info.h | 3 +- .../logrouteservice/ob_log_all_svr_cache.cpp | 222 ++++++++++++++---- .../logrouteservice/ob_log_all_svr_cache.h | 48 +++- .../logrouteservice/ob_log_route_service.cpp | 27 ++- .../logrouteservice/ob_log_route_service.h | 12 +- .../ob_log_systable_queryer.cpp | 79 ++++++- .../logrouteservice/ob_log_systable_queryer.h | 12 + .../logrouteservice/ob_ls_server_list.cpp | 16 +- .../logrouteservice/ob_ls_server_list.h | 4 +- .../ob_log_restore_net_driver.cpp | 14 ++ .../ob_log_restore_net_driver.h | 4 + src/share/parameter/ob_parameter_seed.ipp | 7 + .../all_virtual_sys_parameter_stat.result | 1 + 23 files changed, 602 insertions(+), 105 deletions(-) create mode 100644 src/logservice/logrouteservice/ob_all_units_info.cpp create mode 100644 src/logservice/logrouteservice/ob_all_units_info.h diff --git a/src/logservice/CMakeLists.txt b/src/logservice/CMakeLists.txt index 1c150193f..598d40c90 100644 --- a/src/logservice/CMakeLists.txt +++ b/src/logservice/CMakeLists.txt @@ -75,6 +75,7 @@ ob_set_subtarget(ob_logservice leader_coordinator ob_set_subtarget(ob_logservice logrouteservice logrouteservice/ob_all_server_info.cpp logrouteservice/ob_all_zone_info.cpp + logrouteservice/ob_all_units_info.cpp logrouteservice/ob_external_server_blacklist.cpp logrouteservice/ob_log_all_svr_cache.cpp logrouteservice/ob_log_route_key.cpp diff --git a/src/logservice/libobcdc/src/ob_log_fetcher.cpp b/src/logservice/libobcdc/src/ob_log_fetcher.cpp index a45ce9103..09e15aa81 100644 --- a/src/logservice/libobcdc/src/ob_log_fetcher.cpp +++ b/src/logservice/libobcdc/src/ob_log_fetcher.cpp @@ -127,7 +127,8 @@ int ObLogFetcher::init( cfg.blacklist_survival_time_penalty_period_min, cfg.blacklist_history_overdue_time_min, cfg.blacklist_history_clear_interval_min, - is_tenant_mode))) { + is_tenant_mode, + OB_INVALID_TENANT_ID))) { LOG_ERROR("ObLogRouterService init failer", KR(ret), K(prefer_region), K(cluster_id)); } else if (OB_FAIL(progress_controller_.init(cfg.ls_count_upper_limit))) { LOG_ERROR("init progress controller fail", KR(ret)); @@ -546,8 +547,8 @@ void ObLogFetcher::configure(const ObLogConfig &cfg) K(blacklist_survival_time_penalty_period_min), K(blacklist_history_overdue_time_min), K(blacklist_history_clear_interval_min)); - } else if (OB_FAIL(log_route_service_.update_assign_region(cfg.region.str()))) { - LOG_ERROR("update_assign_region failed", KR(ret), "region", cfg.region); + } else if (OB_FAIL(log_route_service_.update_preferred_upstream_log_region(cfg.region.str()))) { + LOG_ERROR("update_preferred_upsteam_log_region failed", KR(ret), "region", cfg.region); } else if (OB_FAIL(log_route_service_.update_cache_update_interval( all_server_cache_update_interval_sec, all_zone_cache_update_interval_sec))) { diff --git a/src/logservice/logfetcher/ob_log_config.h b/src/logservice/logfetcher/ob_log_config.h index 2f1adb77e..a7a9cfd21 100644 --- a/src/logservice/logfetcher/ob_log_config.h +++ b/src/logservice/logfetcher/ob_log_config.h @@ -195,7 +195,7 @@ public: T_DEF_INT_INFT(blacklist_history_clear_interval_min, OB_CLUSTER_PARAMETER, 20, 10, "blacklist history clear interval in minute"); // Check the need for active cut-off cycles, in minutes - T_DEF_INT_INFT(check_switch_server_interval_min, OB_CLUSTER_PARAMETER, 30, 1, "check switch server interval in minute"); + T_DEF_INT_INFT(check_switch_server_interval_min, OB_CLUSTER_PARAMETER, 10, 1, "check switch server interval in minute"); // Print the number of LSs with the slowest progress of the Fetcher module T_DEF_INT_INFT(print_fetcher_slowest_ls_num, OB_CLUSTER_PARAMETER, 10, 1, "print fetcher slowest ls num"); diff --git a/src/logservice/logfetcher/ob_log_fetcher.cpp b/src/logservice/logfetcher/ob_log_fetcher.cpp index d7b6aca2e..d0eb8987c 100644 --- a/src/logservice/logfetcher/ob_log_fetcher.cpp +++ b/src/logservice/logfetcher/ob_log_fetcher.cpp @@ -98,11 +98,11 @@ int ObLogFetcher::init( cfg_ = &cfg; // Before the LogFetcher module is initialized, the following configuration items need to be loaded configure(cfg); - const common::ObRegion prefer_region(cfg.region.str()); + const common::ObRegion region(cfg.region.str()); if (is_integrated_fetching_mode(fetching_mode) && OB_FAIL(log_route_service_.init( proxy, - prefer_region, + region, cluster_id, false/*is_across_cluster*/, err_handler, @@ -115,8 +115,9 @@ int ObLogFetcher::init( cfg.blacklist_survival_time_penalty_period_min, cfg.blacklist_history_overdue_time_min, cfg.blacklist_history_clear_interval_min, - true/*is_tenant_mode*/))) { - LOG_ERROR("ObLogRouterService init failer", KR(ret), K(prefer_region), K(cluster_id)); + true/*is_tenant_mode*/, + source_tenant_id))) { + LOG_ERROR("ObLogRouterService init failer", KR(ret), K(region), K(cluster_id), K(source_tenant_id)); } else if (OB_FAIL(progress_controller_.init(cfg.ls_count_upper_limit))) { LOG_ERROR("init progress controller fail", KR(ret)); } else if (OB_FAIL(large_buffer_pool_.init("ObLogFetcher", 1L * 1024 * 1024 * 1024))) { @@ -239,7 +240,7 @@ int ObLogFetcher::start() if (IS_NOT_INIT) { ret = OB_NOT_INIT; - LOG_ERROR("not inited", KR(ret)); + LOG_ERROR("ObLogFetcher has not been inited", KR(ret)); } else if (OB_UNLIKELY(! stop_flag_)) { LOG_ERROR("fetcher has been started", K(stop_flag_)); ret = OB_INIT_TWICE; @@ -335,6 +336,34 @@ void ObLogFetcher::mark_stop_flag() } } +int ObLogFetcher::update_preferred_upstream_log_region(const common::ObRegion ®ion) +{ + int ret = OB_SUCCESS; + + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_ERROR("ObLogFetcher has not been inited", KR(ret)); + } else if (OB_FAIL(log_route_service_.update_preferred_upstream_log_region(region))) { + LOG_WARN("ObLogRouteService update_preferred_upstream_log_region failed", KR(ret), K(region)); + } + + return ret; +} + +int ObLogFetcher::get_preferred_upstream_log_region(common::ObRegion ®ion) +{ + int ret = OB_SUCCESS; + + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_ERROR("ObLogFetcher has not been inited", KR(ret)); + } else if (OB_FAIL(log_route_service_.get_preferred_upstream_log_region(region))) { + LOG_WARN("ObLogRouteService get_preferred_upstream_log_region failed", KR(ret), K(region)); + } + + return ret; +} + int ObLogFetcher::add_ls( const share::ObLSID &ls_id, const ObLogFetcherStartParameters &start_parameters) @@ -354,7 +383,7 @@ int ObLogFetcher::add_ls( if (IS_NOT_INIT) { ret = OB_NOT_INIT; - LOG_ERROR("not inited", KR(ret)); + LOG_ERROR("ObLogFetcher has not been inited", KR(ret)); } // Requires a valid start-up timestamp else if (OB_UNLIKELY(start_tstamp_ns <= -1)) { @@ -395,7 +424,7 @@ int ObLogFetcher::recycle_ls(const share::ObLSID &ls_id) if (IS_NOT_INIT) { ret = OB_NOT_INIT; - LOG_ERROR("Fetcher not inited", KR(ret)); + LOG_ERROR("ObLogFetcher has not been inited", KR(ret)); } else if (OB_FAIL(ls_fetch_mgr_.recycle_ls(tls_id))) { if (OB_ENTRY_NOT_EXIST == ret) { LOG_INFO("ls has been recycled in fetcher", K(tls_id)); @@ -418,7 +447,7 @@ int ObLogFetcher::remove_ls(const share::ObLSID &ls_id) if (IS_NOT_INIT) { ret = OB_NOT_INIT; - LOG_ERROR("Fetcher not inited", KR(ret)); + LOG_ERROR("ObLogFetcher has not been inited", KR(ret)); } else if (OB_FAIL(recycle_ls(ls_id))) { LOG_ERROR("recycle_ls failed", KR(ret), K(ls_id)); } else { @@ -452,7 +481,7 @@ int ObLogFetcher::remove_ls_physically(const share::ObLSID &ls_id) if (IS_NOT_INIT) { ret = OB_NOT_INIT; - LOG_ERROR("LogFetcher is not inited", KR(ret)); + LOG_ERROR("ObLogFetcher has not been inited", KR(ret)); } else if (OB_FAIL(fs_container_mgr_.remove_fsc(removed_tls_id))) { LOG_ERROR("fs_container_mgr_ remove_fsc failed", KR(ret), K(removed_tls_id)); } else if (OB_FAIL(ls_fetch_mgr_.remove_ls(removed_tls_id))) { @@ -470,7 +499,7 @@ int ObLogFetcher::get_all_ls(ObIArray &ls_ids) if (IS_NOT_INIT) { ret = OB_NOT_INIT; - LOG_ERROR("LogFetcher is not inited", KR(ret)); + LOG_ERROR("ObLogFetcher has not been inited", KR(ret)); } else { FetchCtxMapLSGetter ls_getter(ls_ids); @@ -495,7 +524,7 @@ int ObLogFetcher::get_ls_proposal_id(const share::ObLSID &ls_id, int64_t &propos if (IS_NOT_INIT) { ret = OB_NOT_INIT; - LOG_ERROR("LogFetcher is not inited", KR(ret)); + LOG_ERROR("ObLogFetcher has not been inited", KR(ret)); } else if (OB_UNLIKELY(!ls_id.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ls_id)); @@ -515,7 +544,7 @@ int ObLogFetcher::update_fetching_log_upper_limit(const share::SCN &upper_limit_ if (IS_NOT_INIT) { ret = OB_NOT_INIT; - LOG_ERROR("LogFetcher is not inited", KR(ret)); + LOG_ERROR("ObLogFetcher has not been inited", KR(ret)); } else if (OB_UNLIKELY(!upper_limit_scn.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(upper_limit_scn)); @@ -533,7 +562,7 @@ int ObLogFetcher::update_compressor_type(const common::ObCompressorType &compres if (IS_NOT_INIT) { ret = OB_NOT_INIT; - LOG_ERROR("LogFetcher is not inited", KR(ret)); + LOG_ERROR("ObLogFetcher has not been inited", KR(ret)); } else if (OB_FAIL(rpc_.update_compressor_type(compressor_type))) { LOG_WARN("ObLogRpc update_compressor_type failed", K(compressor_type)); } @@ -547,7 +576,7 @@ int ObLogFetcher::get_progress_info(ProgressInfo &progress_info) if (IS_NOT_INIT) { ret = OB_NOT_INIT; - LOG_ERROR("LogFetcher is not inited", KR(ret)); + LOG_ERROR("ObLogFetcher has not been inited", KR(ret)); } else { if (OB_FAIL(ls_fetch_mgr_.for_each_ls(progress_info))) { LOG_ERROR("for each part fetch ctx fail", KR(ret)); @@ -563,7 +592,7 @@ int ObLogFetcher::wait_for_all_ls_to_be_removed(const int64_t timeout) if (IS_NOT_INIT) { ret = OB_NOT_INIT; - LOG_ERROR("Fetcher not inited", KR(ret)); + LOG_ERROR("ObLogFetcher has not been inited", KR(ret)); } else { const int64_t start_time = get_timestamp(); @@ -619,8 +648,8 @@ void ObLogFetcher::configure(const ObLogFetcherConfig &cfg) K(blacklist_survival_time_penalty_period_min), K(blacklist_history_overdue_time_min), K(blacklist_history_clear_interval_min)); - } else if (OB_FAIL(log_route_service_.update_assign_region(cfg.region.str()))) { - LOG_ERROR("update_assign_region failed", KR(ret), "region", cfg.region); + } else if (OB_FAIL(log_route_service_.update_preferred_upstream_log_region(cfg.region.str()))) { + LOG_ERROR("update_preferred_upstream_log_region failed", KR(ret), "region", cfg.region); } else if (OB_FAIL(log_route_service_.update_cache_update_interval( all_server_cache_update_interval_sec, all_zone_cache_update_interval_sec))) { diff --git a/src/logservice/logfetcher/ob_log_fetcher.h b/src/logservice/logfetcher/ob_log_fetcher.h index 030da1e72..7379c29ab 100644 --- a/src/logservice/logfetcher/ob_log_fetcher.h +++ b/src/logservice/logfetcher/ob_log_fetcher.h @@ -68,6 +68,12 @@ public: // Get Tenant ID virtual uint64_t get_source_tenant_id() const = 0; + // Update assign region to fetch log + virtual int update_preferred_upstream_log_region(const common::ObRegion ®ion) = 0; + + // Get assign region + virtual int get_preferred_upstream_log_region(common::ObRegion ®ion) = 0; + // Add the log stream // // @param [in] ls_id LS ID @@ -206,6 +212,9 @@ public: virtual int64_t get_cluster_id() const { return cluster_id_; } virtual uint64_t get_source_tenant_id() const { return source_tenant_id_; } + virtual int update_preferred_upstream_log_region(const common::ObRegion ®ion); + virtual int get_preferred_upstream_log_region(common::ObRegion ®ion); + virtual int add_ls( const share::ObLSID &ls_id, const ObLogFetcherStartParameters &start_parameters); diff --git a/src/logservice/logfetcher/ob_log_fetcher_switch_info.h b/src/logservice/logfetcher/ob_log_fetcher_switch_info.h index c8f10cba7..109a5d5ac 100644 --- a/src/logservice/logfetcher/ob_log_fetcher_switch_info.h +++ b/src/logservice/logfetcher/ob_log_fetcher_switch_info.h @@ -51,10 +51,6 @@ struct KickOutInfo KickOutReason kick_out_reason_; KickOutInfo() : tls_id_(), kick_out_reason_(NONE) {} - explicit KickOutInfo(const logservice::TenantLSID &tls_id) : - tls_id_(tls_id), - kick_out_reason_(NONE) - {} KickOutInfo(const logservice::TenantLSID &tls_id, KickOutReason kick_out_reason) : tls_id_(tls_id), kick_out_reason_(kick_out_reason) diff --git a/src/logservice/logfetcher/ob_log_ls_fetch_ctx.cpp b/src/logservice/logfetcher/ob_log_ls_fetch_ctx.cpp index 960747bcb..9c370a7d0 100644 --- a/src/logservice/logfetcher/ob_log_ls_fetch_ctx.cpp +++ b/src/logservice/logfetcher/ob_log_ls_fetch_ctx.cpp @@ -956,11 +956,10 @@ bool LSFetchCtx::need_switch_server(const common::ObAddr &cur_svr) if (OB_FAIL(get_log_route_service_(log_route_service))) { LOG_ERROR("get_log_route_service_ failed", KR(ret)); - } else if (OB_FAIL(log_route_service->need_switch_server(tls_id_.get_tenant_id(), tls_id_.get_ls_id(), - next_lsn, cur_svr))) { - LOG_ERROR("ObLogRouteService need_switch_server failed", KR(ret), K(tls_id_), K(next_lsn), - K(cur_svr)); - } else {} + } else { + bool_ret = log_route_service->need_switch_server(tls_id_.get_tenant_id(), tls_id_.get_ls_id(), + next_lsn, cur_svr); + } return bool_ret; } diff --git a/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp b/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp index f1d2d18ce..d200bfed1 100644 --- a/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp +++ b/src/logservice/logfetcher/ob_log_ls_fetch_stream.cpp @@ -730,7 +730,7 @@ int FetchStream::handle_fetch_log_task_(volatile bool &stop_flag) bool rpc_is_flying = false; bool is_stream_valid = true; FetchLogARpcResult *result = NULL; - KickOutInfo kickout_info(ls_fetch_ctx_->get_tls_id()); + KickOutInfo kickout_info; // Whether the log stream is taken over by RPC, default is false bool stream_been_taken_over_by_rpc = false; @@ -1241,7 +1241,7 @@ int FetchStream::handle_fetch_log_error_( ls_fetch_ctx_->get_next_lsn(), rcode.rcode_, "%s"); - LOG_ERROR("fetch log fail on rpc, need_switch_server", K(svr_), K(rcode), "fetch_stream", this); + LOG_ERROR("fetch log fail on rpc, need_switch_server", K(svr_), K(rcode), "fetch_stream", this); } } // server return error @@ -1260,7 +1260,6 @@ int FetchStream::handle_fetch_log_error_( "svr_err", resp.get_err(), "svr_debug_err", resp.get_debug_err(), K(rcode), K(resp)); } - } else { need_kick_out = false; } @@ -1500,7 +1499,7 @@ int FetchStream::update_fetch_task_state_(KickOutInfo &kick_out_info, LOG_ERROR("ls_fetch_ctx_ is NULL", KR(ret), K(ls_fetch_ctx_)); } else { LSFetchCtx *task = ls_fetch_ctx_; - bool need_check_switch_server = check_need_switch_server_(); + const bool need_check_switch_server = check_need_switch_server_(); // If the task is deleted, it is kicked out directly if (OB_UNLIKELY(task->is_discarded())) { @@ -1522,7 +1521,7 @@ int FetchStream::update_fetch_task_state_(KickOutInfo &kick_out_info, task->update_touch_tstamp_if_progress_beyond_upper_limit(upper_limit_); } - // Update each partition's progress to the global + // Update each LS's progress to the global if (OB_SUCCESS == ret && OB_FAIL(publish_progress_(*task))) { LOG_ERROR("update progress fail", KR(ret), K(task), KPC(task)); } @@ -1637,10 +1636,13 @@ int FetchStream::check_fetch_timeout_(LSFetchCtx &task, KickOutInfo &kick_out_in int FetchStream::check_switch_server_(LSFetchCtx &task, KickOutInfo &kick_out_info) { int ret = OB_SUCCESS; + const char *branch_str = nullptr; if (exist_(kick_out_info, task.get_tls_id())) { // Do not check for LS already located in kick_out_info + branch_str = "exist_kick_out_info"; } else if (task.need_switch_server(svr_)) { + branch_str = "need_switch_server"; LOG_TRACE("exist higher priority server, need switch server", KR(ret), "key", task.get_tls_id(), K_(svr)); // If need to switch the stream, add it to the kick out collection @@ -1653,8 +1655,10 @@ int FetchStream::check_switch_server_(LSFetchCtx &task, KickOutInfo &kick_out_in } } else { // do nothing + branch_str = "no_need_switch_server"; } + LOG_TRACE("check_switch_server", "ls_id", ls_fetch_ctx_->get_tls_id(), K(branch_str)); return ret; } diff --git a/src/logservice/logrouteservice/ob_all_units_info.cpp b/src/logservice/logrouteservice/ob_all_units_info.cpp new file mode 100644 index 000000000..d5b68fa69 --- /dev/null +++ b/src/logservice/logrouteservice/ob_all_units_info.cpp @@ -0,0 +1,70 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#define USING_LOG_PREFIX OBLOG +#include "share/ob_errno.h" // KR +#include "ob_all_units_info.h" + +using namespace oceanbase::common; + +namespace oceanbase +{ +namespace logservice +{ +int ObUnitsRecord::init( + const common::ObAddr &server, + ObString &zone, + common::ObZoneType &zone_type, + ObString ®ion) +{ + int ret = OB_SUCCESS; + server_ = server; + zone_type_ = zone_type; + + if (OB_FAIL(zone_.assign(zone))) { + LOG_ERROR("zone assign fail", KR(ret), K(zone)); + } else if (OB_FAIL(region_.assign(region))) { + LOG_ERROR("zone assign fail", KR(ret), K(region)); + } else {} + + return ret; +} + +void ObUnitsRecordInfo::reset() +{ + cluster_id_ = 0; + units_record_array_.reset(); +} + +int ObUnitsRecordInfo::init(const int64_t cluster_id) +{ + int ret = OB_SUCCESS; + + cluster_id_ = cluster_id; + units_record_array_.reset(); + + return ret; +} + +int ObUnitsRecordInfo::add(ObUnitsRecord &record) +{ + int ret = OB_SUCCESS; + + if (OB_FAIL(units_record_array_.push_back(record))) { + LOG_ERROR("units_record_array_ push_back failed", KR(ret), K(record)); + } + + return ret; +} + +} // namespace logservice +} // namespace oceanbase diff --git a/src/logservice/logrouteservice/ob_all_units_info.h b/src/logservice/logrouteservice/ob_all_units_info.h new file mode 100644 index 000000000..caed46467 --- /dev/null +++ b/src/logservice/logrouteservice/ob_all_units_info.h @@ -0,0 +1,79 @@ +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_OB_UNITS_INFO_H_ +#define OCEANBASE_OB_UNITS_INFO_H_ + +#include "lib/container/ob_se_array.h" // ObSEArray +#include "common/ob_zone.h" // ObZone +#include "common/ob_region.h" // ObRegin +#include "common/ob_zone_type.h" // ObZoneType + +namespace oceanbase +{ +namespace logservice +{ +// Records in table GV$OB_UNITS +struct ObUnitsRecord +{ + common::ObAddr server_; + common::ObZone zone_; + common::ObZoneType zone_type_; + common::ObRegion region_; + + ObUnitsRecord() { reset(); } + + void reset() + { + server_.reset(); + zone_.reset(); + zone_type_ = common::ZONE_TYPE_INVALID; + region_.reset(); + } + + int init( + const common::ObAddr &server, + ObString &zone, + common::ObZoneType &zone_type, + ObString ®ion); + + TO_STRING_KV(K_(server), K_(zone), K_(zone_type), K_(region)); +}; + +class ObUnitsRecordInfo +{ +public: + static const int64_t ALL_SERVER_DEFAULT_RECORDS_NUM = 26; + typedef common::ObSEArray ObUnitsRecordArray; + + ObUnitsRecordInfo() { reset(); } + virtual ~ObUnitsRecordInfo() { reset(); } + + int init(const int64_t cluster_id); + void reset(); + inline int64_t get_cluster_id() { return cluster_id_; } + inline ObUnitsRecordArray &get_units_record_array() { return units_record_array_; } + int add(ObUnitsRecord &record); + + TO_STRING_KV(K_(cluster_id), K_(units_record_array)); + +private: + int64_t cluster_id_; + ObUnitsRecordArray units_record_array_; + + DISALLOW_COPY_AND_ASSIGN(ObUnitsRecordInfo); +}; + +} // namespace logservice +} // namespace oceanbase + +#endif diff --git a/src/logservice/logrouteservice/ob_all_zone_info.h b/src/logservice/logrouteservice/ob_all_zone_info.h index 50e81bb74..f9877ef80 100644 --- a/src/logservice/logrouteservice/ob_all_zone_info.h +++ b/src/logservice/logrouteservice/ob_all_zone_info.h @@ -37,7 +37,8 @@ struct AllZoneRecord storage_type_ = share::ObZoneInfo::StorageType::STORAGE_TYPE_LOCAL; } - int init(ObString &zone, + int init( + ObString &zone, ObString ®ion); void set_storage_type_by_str(common::ObString &storage_type_str) { diff --git a/src/logservice/logrouteservice/ob_log_all_svr_cache.cpp b/src/logservice/logrouteservice/ob_log_all_svr_cache.cpp index 9f3114672..51a891efa 100644 --- a/src/logservice/logrouteservice/ob_log_all_svr_cache.cpp +++ b/src/logservice/logrouteservice/ob_log_all_svr_cache.cpp @@ -18,7 +18,7 @@ #include "lib/allocator/ob_mod_define.h" // ObModIds #include "lib/utility/ob_macro_utils.h" // OB_ISNULL, ... #include "lib/oblog/ob_log_module.h" // LOG_* -#include "logservice/common_util/ob_log_time_utils.h" // +#include "logservice/common_util/ob_log_time_utils.h" using namespace oceanbase::common; using namespace oceanbase::share; @@ -28,6 +28,8 @@ namespace oceanbase namespace logservice { ObLogAllSvrCache::ObLogAllSvrCache() : + is_tenant_mode_(false), + tenant_id_(OB_INVALID_TENANT_ID), systable_queryer_(NULL), all_server_cache_update_interval_(0), all_zone_cache_update_interval_(0), @@ -38,7 +40,8 @@ ObLogAllSvrCache::ObLogAllSvrCache() : zone_need_update_(false), zone_last_update_tstamp_(OB_INVALID_TIMESTAMP), svr_map_(), - zone_map_() + zone_map_(), + units_map_() { } @@ -64,28 +67,45 @@ bool ObLogAllSvrCache::is_svr_avail( bool bool_ret = false; region_priority = REGION_PRIORITY_UNKNOWN; - SvrItem svr_item; - ZoneItem zone_item; - bool find_agent = false; - if (!svr.is_valid()) { LOG_WARN("svr is not valid!", K(svr)); bool_ret = false; - } else if (OB_FAIL(get_svr_item_(svr, svr_item))) { - LOG_WARN("is_svr_avail: failed to get svr item", KR(ret), K(svr), K(svr_item)); - bool_ret = false; - } else if (OB_FAIL(get_zone_item_(svr_item.zone_, zone_item))) { - LOG_ERROR("failed to get zone item", KR(ret), K(svr_item), K(zone_item)); - bool_ret = false; - } else if (is_svr_serve_(svr_item, zone_item)) { - bool_ret = true; - // get region priority of server if server is available - region_priority = svr_item.region_priority_; - LOG_DEBUG("is svr avail", K(svr), K(region_priority), K(zone_item)); + } else if (! is_tenant_mode_) { + SvrItem svr_item; + ZoneItem zone_item; + + if (OB_FAIL(get_svr_item_(svr, svr_item))) { + LOG_WARN("is_svr_avail: failed to get svr item", KR(ret), K(svr), K(svr_item)); + bool_ret = false; + } else if (OB_FAIL(get_zone_item_(svr_item.zone_, zone_item))) { + LOG_ERROR("failed to get zone item", KR(ret), K(svr_item), K(zone_item)); + bool_ret = false; + } else if (is_svr_serve_(svr_item, zone_item)) { + bool_ret = true; + // get region priority of server if server is available + region_priority = svr_item.region_priority_; + LOG_DEBUG("is svr avail", K(svr), K(region_priority), K(zone_item)); + } else { + bool_ret = false; + region_priority = REGION_PRIORITY_UNKNOWN; + LOG_DEBUG("svr not avail", K(svr), K(zone_item), K(svr_item)); + } } else { - bool_ret = false; - region_priority = REGION_PRIORITY_UNKNOWN; - LOG_DEBUG("svr not avail", K(svr), K(zone_item), K(svr_item)); + UnitsRecordItem units_record_item; + + if (OB_FAIL(get_units_record_item_(svr, units_record_item))) { + if (OB_ENTRY_NOT_EXIST != ret) { + bool_ret = false; + LOG_WARN("get_units_record_item_ failed", KR(ret), K(svr), K(units_record_item)); + } else { + bool_ret = true; + } + } else if (is_svr_serve_(units_record_item)) { + bool_ret = true; + // get the region priority of the server if server is available + region_priority = units_record_item.region_priority_; + LOG_TRACE("is svr avail", K(svr), K(region_priority), K(units_record_item)); + } else {} } return bool_ret; @@ -204,10 +224,20 @@ bool ObLogAllSvrCache::is_svr_serve_(const SvrItem &svr_item, const ZoneItem &zo return bool_ret; } -bool ObLogAllSvrCache::is_assign_region_(const common::ObRegion ®ion) const +bool ObLogAllSvrCache::is_svr_serve_(const UnitsRecordItem &units_record_item) const { bool bool_ret = false; + bool_ret = ObZoneType::ZONE_TYPE_ENCRYPTION != units_record_item.get_zone_type(); + + return bool_ret; +} + +bool ObLogAllSvrCache::is_assign_region_(const common::ObRegion ®ion) +{ + bool bool_ret = false; + ObByteLockGuard guard(region_lock_); + // ignore case bool_ret = (0 == strncasecmp(assign_region_.ptr(), region.ptr(), @@ -218,19 +248,32 @@ bool ObLogAllSvrCache::is_assign_region_(const common::ObRegion ®ion) const int ObLogAllSvrCache::init( ObLogSysTableQueryer &systable_queryer, + const bool is_tenant_mode, + const uint64_t tenant_id, const common::ObRegion &prefer_region, const int64_t all_server_cache_update_interval_sec, const int64_t all_zone_cache_update_interval_sec) { int ret = OB_SUCCESS; - if (OB_FAIL(svr_map_.init(ObModIds::OB_LOG_ALL_SERVER_CACHE))) { - LOG_ERROR("init svr map fail", KR(ret)); - } else if (OB_FAIL(zone_map_.init(ObModIds::OB_LOG_ALL_SERVER_CACHE))) { - LOG_ERROR("init zone map fail", KR(ret)); + if (! is_tenant_mode) { + if (OB_FAIL(svr_map_.init(ObModIds::OB_LOG_ALL_SERVER_CACHE))) { + LOG_ERROR("init svr map fail", KR(ret)); + } else if (OB_FAIL(zone_map_.init(ObModIds::OB_LOG_ALL_SERVER_CACHE))) { + LOG_ERROR("init zone map fail", KR(ret)); + } + } else { + if (OB_FAIL(units_map_.init("UnitCache"))) { + LOG_ERROR("init units map fail", KR(ret)); + } + } + + if (OB_FAIL(ret)) { } else if (OB_FAIL(assign_region_.assign(prefer_region))) { LOG_ERROR("assign_region assign fail", KR(ret), K(assign_region_), K(prefer_region)); } else { + is_tenant_mode_ = is_tenant_mode; + tenant_id_ = tenant_id; systable_queryer_ = &systable_queryer; all_server_cache_update_interval_ = all_server_cache_update_interval_sec * _SEC_; all_zone_cache_update_interval_ = all_zone_cache_update_interval_sec * _SEC_; @@ -239,7 +282,7 @@ int ObLogAllSvrCache::init( zone_need_update_ = false; zone_last_update_tstamp_ = OB_INVALID_TIMESTAMP; - LOG_INFO("ObLogAllSvrCache init succ", K(prefer_region)); + LOG_INFO("ObLogAllSvrCache init succ", K(prefer_region), K(is_tenant_mode)); } return ret; @@ -247,6 +290,7 @@ int ObLogAllSvrCache::init( void ObLogAllSvrCache::destroy() { + LOG_INFO("destroy all svr cache begin"); systable_queryer_ = NULL; all_server_cache_update_interval_ = 0; all_zone_cache_update_interval_ = 0; @@ -254,37 +298,55 @@ void ObLogAllSvrCache::destroy() cur_zone_version_ = 0; zone_need_update_ = false; zone_last_update_tstamp_ = OB_INVALID_TIMESTAMP; - (void)svr_map_.destroy(); - (void)zone_map_.destroy(); + if (! is_tenant_mode_) { + (void)svr_map_.destroy(); + (void)zone_map_.destroy(); + } else { + (void)units_map_.destroy(); + } + is_tenant_mode_ = false; + tenant_id_ = OB_INVALID_TENANT_ID; - LOG_INFO("destroy all svr cache succ"); + LOG_INFO("destroy all svr cache success"); } void ObLogAllSvrCache::query_and_update() { int ret = OB_SUCCESS; - if (need_update_zone_()) { - if (OB_FAIL(update_zone_cache_())) { - LOG_ERROR("update zone cache error", KR(ret)); - } else if (OB_FAIL(purge_stale_zone_records_())) { - LOG_ERROR("purge stale records fail", KR(ret)); - } else { - // do nothing - } - } - - if (OB_SUCC(ret)) { - int64_t all_svr_cache_update_interval = ATOMIC_LOAD(&all_server_cache_update_interval_); - if (REACH_TIME_INTERVAL_THREAD_LOCAL(all_svr_cache_update_interval)) { - if (OB_FAIL(update_server_cache_())) { - LOG_ERROR("update server cache error", KR(ret)); - } else if (OB_FAIL(purge_stale_records_())) { + if (! is_tenant_mode_) { + if (need_update_zone_()) { + if (OB_FAIL(update_zone_cache_())) { + LOG_ERROR("update zone cache error", KR(ret)); + } else if (OB_FAIL(purge_stale_zone_records_())) { LOG_ERROR("purge stale records fail", KR(ret)); } else { // do nothing } } + + if (OB_SUCC(ret)) { + const int64_t all_svr_cache_update_interval = ATOMIC_LOAD(&all_server_cache_update_interval_); + + if (REACH_TIME_INTERVAL_THREAD_LOCAL(all_svr_cache_update_interval)) { + if (OB_FAIL(update_server_cache_())) { + LOG_ERROR("update server cache error", KR(ret)); + } else if (OB_FAIL(purge_stale_records_())) { + LOG_ERROR("purge stale records fail", KR(ret)); + } else { + // do nothing + } + } + } + } else { + // is_tenant_mode_ = true + const int64_t all_svr_cache_update_interval = ATOMIC_LOAD(&all_server_cache_update_interval_); + + if (REACH_TIME_INTERVAL_THREAD_LOCAL(all_svr_cache_update_interval)) { + if (OB_FAIL(update_unit_info_cache_())) { + LOG_WARN("update_unit_info_cache_ failed", KR(ret)); + } + } } } @@ -452,6 +514,76 @@ int ObLogAllSvrCache::update_server_cache_() return ret; } +int ObLogAllSvrCache::update_unit_info_cache_() +{ + int ret = OB_SUCCESS; + ObUnitsRecordInfo units_record_info; + + if (OB_ISNULL(systable_queryer_)) { + ret = OB_INVALID_ARGUMENT; + LOG_ERROR("invalid systable queryer", KR(ret), K(systable_queryer_)); + } else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id_)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("invalid tenant_id", KR(ret), K(tenant_id_)); + } else if (OB_FAIL(systable_queryer_->get_all_units_info(tenant_id_, units_record_info))) { + if (OB_NEED_RETRY == ret) { + LOG_WARN("query the GV$OB_UNITS failed, need retry", KR(ret)); + ret = OB_SUCCESS; + } else { + LOG_ERROR("query the GV$OB_UNITS failed", KR(ret)); + } + } else { + int64_t next_version = cur_version_ + 1; + ObUnitsRecordInfo::ObUnitsRecordArray &units_record_array = units_record_info.get_units_record_array(); + + ARRAY_FOREACH_N(units_record_array, idx, count) { + ObUnitsRecord &record = units_record_array.at(idx); + ObAddr &svr = record.server_; + RegionPriority region_priority = REGION_PRIORITY_UNKNOWN; + + if (OB_FAIL(get_region_priority_(record.region_, region_priority))) { + LOG_ERROR("get priority based region fail", KR(ret), K(svr), + "region", record.region_, + "region_priority", print_region_priority(region_priority)); + } else { + UnitsRecordItem units_record_item; + units_record_item.reset(next_version, record.zone_, record.zone_type_, region_priority); + + if (OB_FAIL(units_map_.insert_or_update(svr, units_record_item))) { + LOG_ERROR("units_map_ insert_or_update fail", KR(ret), K(svr), K(units_record_item)); + } + + _LOG_INFO("[STAT] [ALL_SERVER_LIST] INDEX=%ld/%ld SERVER=%s " + "ZONE=%s REGION=%s(%s) VERSION=%lu", + idx, count, to_cstring(svr), + to_cstring(record.zone_), to_cstring(record.region_), + print_region_priority(region_priority), next_version); + } + } + + ATOMIC_INC(&cur_version_); + _LOG_INFO("[STAT] [ALL_SERVER_LIST] COUNT=%ld VERSION=%lu", units_record_array.count(), cur_version_); + } + + return ret; +} + +int ObLogAllSvrCache::get_units_record_item_( + const common::ObAddr &svr, + UnitsRecordItem &item) +{ + int ret = OB_SUCCESS; + item.reset(); + + if (OB_FAIL(units_map_.get(svr, item))) { + if (OB_ENTRY_NOT_EXIST != ret) { + LOG_ERROR("get UnitsRecordItem from map fail", KR(ret), K(svr)); + } + } + + return ret; +} + int ObLogAllSvrCache::get_zone_item_(const common::ObZone &zone, ZoneItem &zone_item) { diff --git a/src/logservice/logrouteservice/ob_log_all_svr_cache.h b/src/logservice/logrouteservice/ob_log_all_svr_cache.h index 49bc01979..286960fc0 100644 --- a/src/logservice/logrouteservice/ob_log_all_svr_cache.h +++ b/src/logservice/logrouteservice/ob_log_all_svr_cache.h @@ -63,6 +63,8 @@ public: public: int init(ObLogSysTableQueryer &systable_queryer, + const bool is_tenant_mode, + const uint64_t tenant_id, const common::ObRegion &prefer_region, const int64_t all_server_cache_update_interval_sec, const int64_t all_zone_cache_update_interval_sec); @@ -81,18 +83,27 @@ private: // 2. other region or empty region(no retion info for lower version of observer) // region_priority = REGION_PRIORITY_LOW int get_region_priority_(const common::ObRegion ®ion, RegionPriority &priority); - bool is_assign_region_(const common::ObRegion ®ion) const; + bool is_assign_region_(const common::ObRegion ®ion); + + struct UnitsRecordItem; + int get_units_record_item_(const common::ObAddr &svr, UnitsRecordItem &item); bool need_update_zone_(); int update_zone_cache_(); int update_server_cache_(); int purge_stale_records_(); int purge_stale_zone_records_(); + int update_unit_info_cache_(); + // NOTE: server serve in such cases: // 1. server status is ACTIVE or DELETING // 2. server not in ENCRYPTION zone bool is_svr_serve_(const SvrItem &svr_item, const ZoneItem &zone_item) const; + // NOTE: server serve in such cases: + // 1. server not in ENCRYPTION zone + bool is_svr_serve_(const UnitsRecordItem &units_record_item) const; + private: typedef share::ObServerStatus::DisplayStatus StatusType; typedef share::ObZoneInfo::StorageType ZoneStorageType; @@ -187,10 +198,44 @@ private: bool operator()(const common::ObZone &zone, const ZoneItem &zone_item); }; + struct UnitsRecordItem + { + uint64_t version_; + common::ObZone zone_; + common::ObZoneType zone_type_; + RegionPriority region_priority_; + + void reset() + { + version_ = -1; + zone_.reset(); + zone_type_ = common::ZONE_TYPE_INVALID; + region_priority_ = REGION_PRIORITY_UNKNOWN; + } + + void reset( + const uint64_t version, + const common::ObZone &zone, + const common::ObZoneType &zone_type, + const RegionPriority region_priority) + { + version_ = version; + zone_ = zone; + zone_type_ = zone_type; + region_priority_ = region_priority; + } + const common::ObZoneType& get_zone_type() const { return zone_type_; } + + TO_STRING_KV(K_(zone), K_(zone_type), K_(region_priority)); + }; + typedef common::ObLinearHashMap UnitsMap; + // set all_server_cache_update_interval for unitest void set_update_interval_(const int64_t time); private: + bool is_tenant_mode_; + uint64_t tenant_id_; ObLogSysTableQueryer *systable_queryer_; int64_t all_server_cache_update_interval_; int64_t all_zone_cache_update_interval_; @@ -205,6 +250,7 @@ private: SvrMap svr_map_; ZoneMap zone_map_; + UnitsMap units_map_; private: DISALLOW_COPY_AND_ASSIGN(ObLogAllSvrCache); diff --git a/src/logservice/logrouteservice/ob_log_route_service.cpp b/src/logservice/logrouteservice/ob_log_route_service.cpp index d287e6975..4760ea020 100644 --- a/src/logservice/logrouteservice/ob_log_route_service.cpp +++ b/src/logservice/logrouteservice/ob_log_route_service.cpp @@ -27,9 +27,10 @@ namespace logservice { ObLogRouteService::ObLogRouteService() : is_inited_(false), - is_tenant_mode_(false), - is_stopped_(true), cluster_id_(OB_INVALID_CLUSTER_ID), + is_tenant_mode_(false), + tenant_id_(OB_INVALID_TENANT_ID), + is_stopped_(true), ls_route_key_set_(), ls_router_map_(), log_router_allocator_(), @@ -70,7 +71,8 @@ int ObLogRouteService::init(ObISQLClient *proxy, const int64_t blacklist_survival_time_penalty_period_min, const int64_t blacklist_history_overdue_time_min, const int64_t blacklist_history_clear_interval_min, - const bool is_tenant_mode) + const bool is_tenant_mode, + const uint64_t tenant_id) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; @@ -100,7 +102,7 @@ int ObLogRouteService::init(ObISQLClient *proxy, K(external_server_blacklist)); } else if (OB_FAIL(systable_queryer_.init(cluster_id, is_across_cluster, *proxy, err_handler))) { LOG_WARN("systable_queryer_ init failed", KR(ret), K(cluster_id), K(is_across_cluster)); - } else if (OB_FAIL(all_svr_cache_.init(systable_queryer_, prefer_region, + } else if (OB_FAIL(all_svr_cache_.init(systable_queryer_, is_tenant_mode, tenant_id, prefer_region, all_server_cache_update_interval_sec, all_zone_cache_update_interval_sec))) { LOG_WARN("all_svr_cache_ init failed", KR(ret), K(is_tenant_mode), K(prefer_region), K(all_server_cache_update_interval_sec), K(all_zone_cache_update_interval_sec)); @@ -112,6 +114,7 @@ int ObLogRouteService::init(ObISQLClient *proxy, LOG_WARN("TG_SET_HANDLER_AND_START failed", KR(ret), K(tg_id_)); } else { cluster_id_ = cluster_id; + tenant_id_ = tenant_id; log_router_allocator_.set_nway(NWAY); asyn_task_allocator_.set_nway(NWAY); timer_id_ = lib::TGDefIDs::LogRouterTimer; @@ -201,6 +204,7 @@ void ObLogRouteService::destroy() err_handler_ = NULL; cluster_id_ = OB_INVALID_CLUSTER_ID; + tenant_id_ = OB_INVALID_TENANT_ID; background_refresh_time_sec_ = 0; blacklist_survival_time_sec_ = 0; blacklist_survival_time_upper_limit_min_ = 0; @@ -314,7 +318,7 @@ int ObLogRouteService::get_background_refresh_time(int64_t &background_refresh_t return ret; } -int ObLogRouteService::update_assign_region(const common::ObRegion &prefer_region) +int ObLogRouteService::update_preferred_upstream_log_region(const common::ObRegion &prefer_region) { int ret = OB_SUCCESS; @@ -328,7 +332,7 @@ int ObLogRouteService::update_assign_region(const common::ObRegion &prefer_regio return ret; } -int ObLogRouteService::get_assign_region(common::ObRegion &prefer_region) +int ObLogRouteService::get_preferred_upstream_log_region(common::ObRegion &prefer_region) { int ret = OB_SUCCESS; @@ -924,10 +928,9 @@ int ObLogRouteService::update_server_list_( const ObAddr &server = record.server_; RegionPriority region_priority = REGION_PRIORITY_UNKNOWN; - // TODO support wait GV$UNIT - // if (! all_svr_cache_.is_svr_avail(server, region_priority)) { - // ignore server not in __all_server table - if (OB_FAIL(ls_svr_list.add_server_or_update(server, + if (! all_svr_cache_.is_svr_avail(server, region_priority)) { + // ignore the server which is not available + } else if (OB_FAIL(ls_svr_list.add_server_or_update(server, record.begin_lsn_, record.end_lsn_, region_priority, (LEADER == record.role_)))) { LOG_WARN("ObLogRouteService add_server_or_update failed", KR(ret), K(router_key), K(router_value)); @@ -976,9 +979,7 @@ int ObLogRouteService::update_all_server_and_zone_cache_() ret = OB_NOT_INIT; LOG_ERROR("ObLogRouteService has not been inited", KR(ret)); } else { - if (! is_tenant_mode_) { - all_svr_cache_.query_and_update(); - } + all_svr_cache_.query_and_update(); } return ret; diff --git a/src/logservice/logrouteservice/ob_log_route_service.h b/src/logservice/logrouteservice/ob_log_route_service.h index 9e6d1bdeb..57fa51017 100644 --- a/src/logservice/logrouteservice/ob_log_route_service.h +++ b/src/logservice/logrouteservice/ob_log_route_service.h @@ -85,7 +85,8 @@ public: const int64_t blacklist_survival_time_penalty_period_min = 1, const int64_t blacklist_history_overdue_time_min = 30, const int64_t blacklist_history_clear_interval_min = 20, - const bool is_tenant_mode = false); + const bool is_tenant_mode = false, + const uint64_t tenant_id = OB_INVALID_TENANT_ID); int start(); void stop(); void wait(); @@ -99,8 +100,8 @@ public: int get_background_refresh_time(int64_t &background_refresh_time_sec); // Region - int update_assign_region(const common::ObRegion &prefer_region); - int get_assign_region(common::ObRegion &prefer_region); + int update_preferred_upstream_log_region(const common::ObRegion &prefer_region); + int get_preferred_upstream_log_region(common::ObRegion &prefer_region); // Cache interval int update_cache_update_interval(const int64_t all_server_cache_update_interval_sec, @@ -350,9 +351,10 @@ private: private: bool is_inited_; - bool is_tenant_mode_; - volatile bool is_stopped_ CACHE_ALIGNED; int64_t cluster_id_; + bool is_tenant_mode_; + uint64_t tenant_id_; + volatile bool is_stopped_ CACHE_ALIGNED; LSRouteKeySet ls_route_key_set_; LSRouterMap ls_router_map_; ObSliceAlloc log_router_allocator_; diff --git a/src/logservice/logrouteservice/ob_log_systable_queryer.cpp b/src/logservice/logrouteservice/ob_log_systable_queryer.cpp index f509a3a91..352a9e7fe 100644 --- a/src/logservice/logrouteservice/ob_log_systable_queryer.cpp +++ b/src/logservice/logrouteservice/ob_log_systable_queryer.cpp @@ -100,7 +100,7 @@ int ObLogSysTableQueryer::get_ls_log_info( tenant_id, ls_id.id()))) { LOG_WARN("assign sql string failed", KR(ret), K(tenant_id), K(ls_id)); - // Use OB_SYS_TENANT_ID to query GV$OB_LOG_STAT + // Use OB_SYS_TENANT_ID to query the GV$OB_LOG_STAT } else if (OB_FAIL(do_query_(OB_SYS_TENANT_ID, sql, result))) { LOG_WARN("do_query_ failed", KR(ret), K(cluster_id_), K(tenant_id), "sql", sql.ptr()); } else if (OB_ISNULL(result.get_result())) { @@ -119,6 +119,46 @@ int ObLogSysTableQueryer::get_ls_log_info( return ret; } +////////////////////////////////////// QueryAllUnitsInfo ///////////////////////////////// +int ObLogSysTableQueryer::get_all_units_info( + const uint64_t tenant_id, + ObUnitsRecordInfo &units_record_info) +{ + int ret = OB_SUCCESS; + const char *select_fields = "SVR_IP, SVR_PORT, ZONE, ZONE_TYPE, REGION"; + + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("ObLogSysTableQueryer not init", KR(ret)); + } else if (OB_FAIL(units_record_info.init(cluster_id_))) { + LOG_WARN("fail to init units_record_info", KR(ret), K(cluster_id_)); + } else { + ObSqlString sql; + int64_t record_count; + + SMART_VAR(ObISQLClient::ReadResult, result) { + if (OB_FAIL(sql.assign_fmt( + "SELECT %s FROM %s" + " WHERE tenant_id = %lu", + select_fields, OB_GV_OB_UNITS_TNAME, tenant_id))) { + LOG_WARN("assign sql string failed", KR(ret), K(cluster_id_), K(tenant_id)); + // Use OB_SYS_TENANT_ID to query the GV$OB_UNITS + } else if (OB_FAIL(do_query_(OB_SYS_TENANT_ID, sql, result))) { + LOG_WARN("do_query_ failed", KR(ret), K(cluster_id_), K(tenant_id), "sql", sql.ptr()); + } else if (OB_ISNULL(result.get_result())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get mysql result failed", KR(ret)); + } else if (OB_FAIL(get_records_template_(*result.get_result(), units_record_info, + "ObUnitsRecordInfo", record_count))) { + LOG_WARN("construct units record info failed", KR(ret), K(units_record_info)); + } + } + } + + return ret; +} + + ////////////////////////////////////// QueryAllServerInfo ///////////////////////////////// int ObLogSysTableQueryer::get_all_server_info( const uint64_t tenant_id, @@ -308,6 +348,43 @@ int ObLogSysTableQueryer::get_records_template_(common::sqlclient::ObMySQLResult return ret; } +////////////////////////////////////// QueryAllUnitsInfo - parse ///////////////////////////////// +int ObLogSysTableQueryer::parse_record_from_row_(common::sqlclient::ObMySQLResult &res, + ObUnitsRecordInfo &units_record_info) +{ + int ret = OB_SUCCESS; + ObUnitsRecord units_record; + ObString ip; + int64_t port = 0; + common::ObAddr server; + ObString zone; + ObString region; + ObString zone_type_str; + + (void)GET_COL_IGNORE_NULL(res.get_varchar, "SVR_IP", ip); + (void)GET_COL_IGNORE_NULL(res.get_int, "SVR_PORT", port); + (void)GET_COL_IGNORE_NULL(res.get_varchar, "ZONE", zone); + (void)GET_COL_IGNORE_NULL(res.get_varchar, "ZONE_TYPE", zone_type_str); + (void)GET_COL_IGNORE_NULL(res.get_varchar, "REGION", region); + + if (false == server.set_ip_addr(ip, static_cast(port))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid server address", K(ip), K(port)); + } else { + common::ObZoneType zone_type = str_to_zone_type(zone_type_str.ptr()); + + if (OB_FAIL(units_record.init(server, zone, zone_type, region))) { + LOG_ERROR("units_record init failed", KR(ret), K(server), K(zone), K(zone_type), K(region)); + } else if (OB_FAIL(units_record_info.add(units_record))) { + LOG_WARN("units_record_info add failed", KR(ret), K(units_record)); + } else { + LOG_TRACE("units_record_info add success", K(units_record)); + } + } + + return ret; +} + int ObLogSysTableQueryer::parse_record_from_row_(common::sqlclient::ObMySQLResult &res, ObLSLogInfo &ls_log_info) { diff --git a/src/logservice/logrouteservice/ob_log_systable_queryer.h b/src/logservice/logrouteservice/ob_log_systable_queryer.h index 3adabe6c6..6f978936f 100644 --- a/src/logservice/logrouteservice/ob_log_systable_queryer.h +++ b/src/logservice/logrouteservice/ob_log_systable_queryer.h @@ -17,6 +17,7 @@ #include "ob_all_server_info.h" // ObAllServerInfo #include "ob_all_zone_info.h" // ObAllZoneInfo, ObAllZoneTypeInfo #include "src/logservice/logfetcher/ob_log_fetcher_err_handler.h" +#include "ob_all_units_info.h" // ObUnitsRecordInfo namespace oceanbase { @@ -49,6 +50,11 @@ public: const share::ObLSID &ls_id, ObLSLogInfo &ls_log_info); + // SELECT SVR_IP, SVR_PORT, ZONE, ZONE_TYPE, REGION from GV$OB_UNITS; + int get_all_units_info( + const uint64_t tenant_id, + ObUnitsRecordInfo &units_record_info); + int get_all_server_info( const uint64_t tenant_id, ObAllServerInfo &all_server_info); @@ -72,6 +78,12 @@ private: const char *event, int64_t &record_count); + // ObUnitsRecordInfo + // @param [in] res, result read from the GV$OB_UNITS table + // @param [out] units_record_info, items in the GV$OB_UNITS table + int parse_record_from_row_(common::sqlclient::ObMySQLResult &res, + ObUnitsRecordInfo &units_record_info); + // ObLSLogInfo // @param [in] res, result read from __all_virtual_log_stat table // @param [out] ls_log_info, meta/user tenant's LS Palf Info diff --git a/src/logservice/logrouteservice/ob_ls_server_list.cpp b/src/logservice/logrouteservice/ob_ls_server_list.cpp index 69141b1cb..325f79f28 100644 --- a/src/logservice/logrouteservice/ob_ls_server_list.cpp +++ b/src/logservice/logrouteservice/ob_ls_server_list.cpp @@ -233,7 +233,10 @@ bool LSSvrList::need_switch_server(const ObLSRouterKey &key, bool is_svr_invalid = false; SvrItem &svr_item = svr_items_.at(svr_idx); - svr_item.check_and_update_serve_info(next_lsn, is_log_served, is_svr_invalid); + // Switch the Server scenario and consider that the Server is always serving + svr_item.check_and_update_serve_info(true/*is_always_serving*/, next_lsn, is_log_served, is_svr_invalid); + + LOG_INFO("need_switch_server", K(key), K(next_lsn), K(cur_svr), K(svr_item), K(is_log_served), K(is_svr_invalid)); if (is_log_served && !is_svr_invalid && !blacklist.exist(svr_item.svr_)) { if (cur_svr == svr_item.svr_) { @@ -326,7 +329,7 @@ int LSSvrList::get_next_server_based_on_blacklist_(const palf::LSN &next_lsn, // Automatically modify next_svr_index_ to move to the next server int64_t svr_idx = next_svr_index_++ % svr_items_.count(); SvrItem &svr_item = svr_items_.at(svr_idx); - svr_item.check_and_update_serve_info(next_lsn, is_log_served, is_svr_invalid); + svr_item.check_and_update_serve_info(false/*is_always_serving*/,next_lsn, is_log_served, is_svr_invalid); LOG_DEBUG("next_server-debug 2", K(next_lsn), K(svr_items_), K(is_log_served)); @@ -414,7 +417,9 @@ void LSSvrList::SvrItem::update(const palf::LSN &start_lsn, is_leader_ = is_leader; } -void LSSvrList::SvrItem::check_and_update_serve_info(const palf::LSN &lsn, +void LSSvrList::SvrItem::check_and_update_serve_info( + const bool is_always_serving, + const palf::LSN &lsn, bool &is_log_served, bool &is_server_invalid) { @@ -426,6 +431,11 @@ void LSSvrList::SvrItem::check_and_update_serve_info(const palf::LSN &lsn, if (! is_log_served) { is_server_invalid = true; } + + if (is_always_serving) { + is_log_served = true; + is_server_invalid = false; + } } bool LSSvrList::SvrItem::is_priority_equal(const SvrItem &svr_item) const diff --git a/src/logservice/logrouteservice/ob_ls_server_list.h b/src/logservice/logrouteservice/ob_ls_server_list.h index da26e04a1..0b5814a26 100644 --- a/src/logservice/logrouteservice/ob_ls_server_list.h +++ b/src/logservice/logrouteservice/ob_ls_server_list.h @@ -186,7 +186,9 @@ private: // @param [in] lsn target lsn // @param [out] is_log_served whether serve target log id // @param [out] is_server_invalid whether the server is no longer valid (no more valid ranges) - void check_and_update_serve_info(const palf::LSN &lsn, + void check_and_update_serve_info( + const bool is_always_serving, + const palf::LSN &lsn, bool &is_log_served, bool &is_server_invalid); diff --git a/src/logservice/restoreservice/ob_log_restore_net_driver.cpp b/src/logservice/restoreservice/ob_log_restore_net_driver.cpp index a3854f8c1..98bb17dab 100644 --- a/src/logservice/restoreservice/ob_log_restore_net_driver.cpp +++ b/src/logservice/restoreservice/ob_log_restore_net_driver.cpp @@ -220,6 +220,7 @@ int ObLogRestoreNetDriver::scan_ls(const share::ObLogRestoreSourceType &type) } delete_fetcher_if_needed_with_lock_(); update_config_(); + update_standby_preferred_upstream_log_region_(); return ret; } @@ -461,6 +462,19 @@ int64_t ObLogRestoreNetDriver::get_rpc_timeout_sec_() return rpc_timeout; } +void ObLogRestoreNetDriver::update_standby_preferred_upstream_log_region_() +{ + omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id_)); + + if (! tenant_config.is_valid()) { + } else { + if (nullptr != fetcher_) { + const char *region_string = tenant_config->standby_db_preferred_upstream_log_region; + fetcher_->update_preferred_upstream_log_region(common::ObRegion(region_string)); + } + } +} + bool ObLogRestoreNetDriver::is_fetcher_stale_(const int64_t cluster_id, const uint64_t tenant_id) { bool bret = false; diff --git a/src/logservice/restoreservice/ob_log_restore_net_driver.h b/src/logservice/restoreservice/ob_log_restore_net_driver.h index 4fbf7b06d..1497a49ed 100644 --- a/src/logservice/restoreservice/ob_log_restore_net_driver.h +++ b/src/logservice/restoreservice/ob_log_restore_net_driver.h @@ -17,6 +17,7 @@ #include "lib/ob_errno.h" #include "lib/utility/ob_macro_utils.h" #include "lib/compress/ob_compress_util.h" // ObCompressorType +#include "common/ob_region.h" // ObRegion #include "logservice/logfetcher/ob_log_fetcher_ls_ctx_additional_info_factory.h" #include "logservice/logfetcher/ob_log_fetcher_err_handler.h" #include "logservice/logfetcher/ob_log_fetcher_ls_ctx_default_factory.h" @@ -94,6 +95,9 @@ private: void delete_fetcher_if_needed_with_lock_(); void update_config_(); int64_t get_rpc_timeout_sec_(); + // update standby_fetch_log_specified_region + void update_standby_preferred_upstream_log_region_(); + int refresh_proxy_(const share::ObRestoreSourceServiceAttr &source); diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index 596f39b5f..aed103824 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -604,6 +604,13 @@ DEF_INT(_log_writer_parallelism, OB_TENANT_PARAMETER, "3", "the number of parallel log writer threads that can be used to write redo log entries to disk. ", ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::STATIC_EFFECTIVE)); +DEF_STR(standby_db_preferred_upstream_log_region, OB_TENANT_PARAMETER, "", + "The preferred upstream log region for Standby db. " + "The Standby db will give priority to the preferred upstream log region to fetch log. " + "For high availability,the Standby db will also switch to the other region " + "when the preferred upstream log region can not fetch log because of exception etc.", + ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); + // ========================= LogService Config End ===================== DEF_INT(resource_hard_limit, OB_CLUSTER_PARAMETER, "100", "[100, 10000]", "system utilization should not be large than resource_hard_limit", diff --git a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result index ef1984f6c..c4a62a636 100644 --- a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result +++ b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result @@ -204,6 +204,7 @@ ssl_client_authentication ssl_external_kms_info stack_size standby_db_fetch_log_rpc_timeout +standby_db_preferred_upstream_log_region standby_fetch_log_bandwidth_limit syslog_io_bandwidth_limit syslog_level