From 9d6659b95c7e93ac2929a9ad4ea5cacb59fc5f17 Mon Sep 17 00:00:00 2001 From: zxlzxlzxlzxlzxl Date: Fri, 9 Jun 2023 07:12:38 +0000 Subject: [PATCH] [CdcService] CdcService support multiple standby tenant in same cluster fetch log from the primary tenant. --- src/logservice/cdcservice/ob_cdc_fetcher.cpp | 4 +-- src/logservice/cdcservice/ob_cdc_req.cpp | 18 ++++++++++-- src/logservice/cdcservice/ob_cdc_req.h | 26 ++++++++++++++++- src/logservice/cdcservice/ob_cdc_struct.cpp | 17 ++++++++--- src/logservice/cdcservice/ob_cdc_struct.h | 13 +++++++-- src/logservice/logfetcher/ob_log_fetcher.cpp | 28 +++++++++++-------- src/logservice/logfetcher/ob_log_fetcher.h | 10 ++++--- src/logservice/logfetcher/ob_log_rpc.cpp | 6 ++++ src/logservice/logfetcher/ob_log_rpc.h | 2 ++ .../ob_log_restore_net_driver.cpp | 4 +-- 10 files changed, 98 insertions(+), 30 deletions(-) diff --git a/src/logservice/cdcservice/ob_cdc_fetcher.cpp b/src/logservice/cdcservice/ob_cdc_fetcher.cpp index acd19c4487..5eddd5659e 100644 --- a/src/logservice/cdcservice/ob_cdc_fetcher.cpp +++ b/src/logservice/cdcservice/ob_cdc_fetcher.cpp @@ -96,7 +96,7 @@ int ObCdcFetcher::fetch_log(const ObCdcLSFetchLogReq &req, PalfHandleGuard palf_handle_guard; PalfGroupBufferIterator group_iter; const ObCdcRpcId &rpc_id = req.get_client_id(); - ClientLSKey ls_key(rpc_id.get_addr(), rpc_id.get_pid(), ls_id); + ClientLSKey ls_key(rpc_id.get_addr(), rpc_id.get_pid(), req.get_tenant_id(), ls_id); ClientLSCtxMap &ctx_map = MTL(ObLogService*)->get_cdc_service()->get_ls_ctx_map(); ClientLSCtx *ls_ctx = NULL; int8_t fetch_log_flag = req.get_flag(); @@ -179,7 +179,7 @@ int ObCdcFetcher::fetch_missing_log(const obrpc::ObCdcLSFetchMissLogReq &req, const ObLSID &ls_id = req.get_ls_id(); PalfHandleGuard palf_handle_guard; const ObCdcRpcId &rpc_id = req.get_client_id(); - ClientLSKey ls_key(rpc_id.get_addr(), rpc_id.get_pid(), ls_id); + ClientLSKey ls_key(rpc_id.get_addr(), rpc_id.get_pid(), req.get_tenant_id(), ls_id); ClientLSCtxMap &ctx_map = MTL(ObLogService*)->get_cdc_service()->get_ls_ctx_map(); ClientLSCtx *ls_ctx = NULL; diff --git a/src/logservice/cdcservice/ob_cdc_req.cpp b/src/logservice/cdcservice/ob_cdc_req.cpp index 098ada3eb3..dab867d24d 100644 --- a/src/logservice/cdcservice/ob_cdc_req.cpp +++ b/src/logservice/cdcservice/ob_cdc_req.cpp @@ -249,7 +249,7 @@ OB_DEF_DESERIALIZE(ObCdcReqStartLSNByTsResp) * */ OB_SERIALIZE_MEMBER(ObCdcLSFetchLogReq, rpc_ver_, ls_id_, start_lsn_, - upper_limit_ts_, client_pid_, client_id_, progress_, flag_, compressor_type_); + upper_limit_ts_, client_pid_, client_id_, progress_, flag_, compressor_type_, tenant_id_); OB_SERIALIZE_MEMBER(ObCdcFetchStatus, is_reach_max_lsn_, is_reach_upper_limit_ts_, @@ -339,6 +339,7 @@ void ObCdcLSFetchLogReq::reset() client_id_.reset(); progress_ = OB_INVALID_TIMESTAMP; flag_ = 0; + tenant_id_ = OB_INVALID_TENANT_ID; } ObCdcLSFetchLogReq& ObCdcLSFetchLogReq::operator=(const ObCdcLSFetchLogReq &other) @@ -347,6 +348,11 @@ ObCdcLSFetchLogReq& ObCdcLSFetchLogReq::operator=(const ObCdcLSFetchLogReq &othe ls_id_ = other.ls_id_; start_lsn_ = other.start_lsn_; upper_limit_ts_ = other.upper_limit_ts_; + client_pid_ = other.client_pid_; + client_id_ = other.client_id_; + progress_ = other.progress_; + flag_ = other.flag_; + tenant_id_ = other.tenant_id_; return *this; } @@ -356,7 +362,12 @@ bool ObCdcLSFetchLogReq::operator==(const ObCdcLSFetchLogReq &that) const return rpc_ver_ == that.rpc_ver_ && ls_id_ == that.ls_id_ && start_lsn_ == that.start_lsn_ - && upper_limit_ts_ == that.upper_limit_ts_; + && upper_limit_ts_ == that.upper_limit_ts_ + && client_pid_ == that.client_pid_ + && client_id_ == that.client_id_ + && progress_ == that.progress_ + && flag_ == that.flag_ + && tenant_id_ == that.tenant_id_; } bool ObCdcLSFetchLogReq::operator!=(const ObCdcLSFetchLogReq &that) const @@ -475,7 +486,7 @@ void ObCdcLSFetchLogResp::reset() */ OB_SERIALIZE_MEMBER(ObCdcLSFetchMissLogReq::MissLogParam, miss_lsn_); OB_SERIALIZE_MEMBER(ObCdcLSFetchMissLogReq, rpc_ver_, ls_id_, miss_log_array_, - client_pid_, client_id_, flag_, compressor_type_); + client_pid_, client_id_, flag_, compressor_type_, tenant_id_); void ObCdcLSFetchMissLogReq::reset() { @@ -485,6 +496,7 @@ void ObCdcLSFetchMissLogReq::reset() client_pid_ = 0; client_id_.reset(); flag_ = 0; + tenant_id_ = OB_INVALID_TENANT_ID; } int ObCdcLSFetchMissLogReq::append_miss_log(const MissLogParam ¶m) diff --git a/src/logservice/cdcservice/ob_cdc_req.h b/src/logservice/cdcservice/ob_cdc_req.h index 3807448827..3c1b51b60d 100644 --- a/src/logservice/cdcservice/ob_cdc_req.h +++ b/src/logservice/cdcservice/ob_cdc_req.h @@ -77,6 +77,21 @@ public: client_addr_.reset(); } + bool operator==(const ObCdcRpcId &that) const { + return client_pid_ == that.client_pid_ && + client_addr_ == that.client_addr_; + } + + bool operator!=(const ObCdcRpcId &that) const { + return !(*this == that); + } + + ObCdcRpcId &operator=(const ObCdcRpcId &that) { + client_pid_ = that.client_pid_; + client_addr_ = that.client_addr_; + return *this; + } + void set_addr(ObAddr &addr) { client_addr_ = addr; } const ObAddr& get_addr() const { return client_addr_; } @@ -237,6 +252,9 @@ public: void set_flag(int8_t flag) { flag_ |= flag; } int8_t get_flag() const { return flag_; } + void set_tenant_id(const uint64_t tenant_id) { tenant_id_ = tenant_id; } + uint64_t get_tenant_id() const { return tenant_id_; } + void set_compressor_type(const common::ObCompressorType &compressor_type) { compressor_type_ = compressor_type; } common::ObCompressorType get_compressor_type() const { return compressor_type_; } @@ -248,7 +266,8 @@ public: K_(client_id), K_(progress), K_(flag), - K_(compressor_type)); + K_(compressor_type), + K_(tenant_id)); OB_UNIS_VERSION(1); @@ -268,6 +287,7 @@ private: int64_t progress_; int8_t flag_; common::ObCompressorType compressor_type_; + uint64_t tenant_id_; }; // Statistics for LS @@ -484,6 +504,9 @@ public: void set_flag(int8_t flag) { flag_ |= flag; } int8_t get_flag() const { return flag_; } + void set_tenant_id(const uint64_t tenant_id) { tenant_id_ = tenant_id; } + uint64_t get_tenant_id() const { return tenant_id_; } + void set_compressor_type(const common::ObCompressorType &compressor_type) { compressor_type_ = compressor_type; } common::ObCompressorType get_compressor_type() const { return compressor_type_; } @@ -503,6 +526,7 @@ private: ObCdcRpcId client_id_; int8_t flag_; common::ObCompressorType compressor_type_; + uint64_t tenant_id_; }; } // namespace obrpc diff --git a/src/logservice/cdcservice/ob_cdc_struct.cpp b/src/logservice/cdcservice/ob_cdc_struct.cpp index 6427ed23c9..96cd6348dd 100644 --- a/src/logservice/cdcservice/ob_cdc_struct.cpp +++ b/src/logservice/cdcservice/ob_cdc_struct.cpp @@ -22,10 +22,15 @@ namespace cdc { ///////////////////////////////////////////ClientLSKey/////////////////////////////////////////// -ClientLSKey::ClientLSKey(const common::ObAddr &client_addr, const uint64_t client_pid, const share::ObLSID &ls_id) - : client_addr_(client_addr), - client_pid_(client_pid), - ls_id_(ls_id) +ClientLSKey::ClientLSKey( + const common::ObAddr &client_addr, + const uint64_t client_pid, + const uint64_t tenant_id, + const share::ObLSID &ls_id) + : client_addr_(client_addr), + client_pid_(client_pid), + tenant_id_(tenant_id), + ls_id_(ls_id) { } @@ -34,6 +39,7 @@ uint64_t ClientLSKey::hash() const uint64_t hash_val = client_pid_; hash_val = murmurhash(&hash_val , sizeof(hash_val), client_addr_.hash()); hash_val = murmurhash(&hash_val, sizeof(hash_val), ls_id_.hash()); + hash_val = murmurhash(&hash_val, sizeof(hash_val), tenant_id_); return hash_val; } @@ -47,6 +53,7 @@ bool ClientLSKey::operator==(const ClientLSKey &that) const { return client_addr_ == that.client_addr_ && client_pid_ == that.client_pid_ && + tenant_id_ == that.tenant_id_ && ls_id_ == that.ls_id_; } @@ -59,6 +66,7 @@ ClientLSKey &ClientLSKey::operator=(const ClientLSKey &that) { client_addr_ = that.client_addr_; client_pid_ = that.client_pid_; + tenant_id_ = that.tenant_id_; ls_id_ = that.ls_id_; return *this; } @@ -85,6 +93,7 @@ void ClientLSKey::reset() { client_addr_.reset(); client_pid_ = 0; + tenant_id_ = OB_INVALID_TENANT_ID; ls_id_ = ObLSID::INVALID_LS_ID; } ///////////////////////////////////////////ClientLSCtx/////////////////////////////////////////// diff --git a/src/logservice/cdcservice/ob_cdc_struct.h b/src/logservice/cdcservice/ob_cdc_struct.h index 37a10f7b3b..e50d87516d 100644 --- a/src/logservice/cdcservice/ob_cdc_struct.h +++ b/src/logservice/cdcservice/ob_cdc_struct.h @@ -58,8 +58,16 @@ public: class ClientLSKey { public: - ClientLSKey(): client_addr_(), client_pid_(0), ls_id_(share::ObLSID::INVALID_LS_ID) {} - ClientLSKey(const common::ObAddr &client_addr, const uint64_t client_pid, const share::ObLSID &ls_id); + ClientLSKey(): + client_addr_(), + client_pid_(0), + tenant_id_(OB_INVALID_TENANT_ID), + ls_id_(share::ObLSID::INVALID_LS_ID) + { } + ClientLSKey(const common::ObAddr &client_addr, + const uint64_t client_pid, + const uint64_t tenant_id, + const share::ObLSID &ls_id); ~ClientLSKey() { reset(); } uint64_t hash() const; int hash(uint64_t &hash_val) const; @@ -74,6 +82,7 @@ public: private: common::ObAddr client_addr_; uint64_t client_pid_; + uint64_t tenant_id_; share::ObLSID ls_id_; }; diff --git a/src/logservice/logfetcher/ob_log_fetcher.cpp b/src/logservice/logfetcher/ob_log_fetcher.cpp index 4d767726e6..30e32c947d 100644 --- a/src/logservice/logfetcher/ob_log_fetcher.cpp +++ b/src/logservice/logfetcher/ob_log_fetcher.cpp @@ -39,7 +39,8 @@ ObLogFetcher::ObLogFetcher() : is_inited_(false), log_fetcher_user_(LogFetcherUser::UNKNOWN), cluster_id_(OB_INVALID_CLUSTER_ID), - tenant_id_(OB_INVALID_TENANT_ID), + source_tenant_id_(OB_INVALID_TENANT_ID), + self_tenant_id_(OB_INVALID_TENANT_ID), cfg_(nullptr), is_loading_data_dict_baseline_data_(false), fetching_mode_(ClientFetchingMode::FETCHING_MODE_UNKNOWN), @@ -71,7 +72,8 @@ ObLogFetcher::~ObLogFetcher() int ObLogFetcher::init( const LogFetcherUser &log_fetcher_user, const int64_t cluster_id, - const uint64_t tenant_id, + const uint64_t source_tenant_id, + const uint64_t self_tenant_id, const bool is_loading_data_dict_baseline_data, const ClientFetchingMode fetching_mode, const ObBackupPathString &archive_dest, @@ -127,7 +129,7 @@ int ObLogFetcher::init( LOG_ERROR("init part fetch mgr fail", KR(ret)); } else if (OB_FAIL(init_self_addr_())) { LOG_ERROR("init_self_addr_ fail", KR(ret)); - } else if (OB_FAIL(rpc_.init(cluster_id, cfg.io_thread_num, cfg))) { + } else if (OB_FAIL(rpc_.init(cluster_id, self_tenant_id, cfg.io_thread_num, cfg))) { LOG_ERROR("init rpc handler fail", KR(ret)); } else if (is_cdc(log_fetcher_user) && OB_FAIL(start_lsn_locator_.init( cfg.start_lsn_locator_thread_num, @@ -159,7 +161,7 @@ int ObLogFetcher::init( *err_handler))) { LOG_ERROR("init stream worker fail", KR(ret)); } else if (OB_FAIL(fs_container_mgr_.init( - tenant_id, + source_tenant_id, cfg.svr_stream_cached_count, cfg.fetch_stream_cached_count, cfg.rpc_result_cached_count, @@ -170,7 +172,8 @@ int ObLogFetcher::init( } else { log_fetcher_user_ = log_fetcher_user; cluster_id_ = cluster_id; - tenant_id_ = tenant_id; + source_tenant_id_ = source_tenant_id; + self_tenant_id_ = self_tenant_id; is_loading_data_dict_baseline_data_ = is_loading_data_dict_baseline_data; fetching_mode_ = fetching_mode; archive_dest_ = archive_dest; @@ -181,7 +184,8 @@ int ObLogFetcher::init( stop_flag_ = true; is_inited_ = true; - LOG_INFO("LogFetcher init succ", K_(cluster_id), K_(tenant_id), K_(is_loading_data_dict_baseline_data), + LOG_INFO("LogFetcher init succ", K_(cluster_id), K_(source_tenant_id), + K_(self_tenant_id), K_(is_loading_data_dict_baseline_data), "fetching_mode", print_fetching_mode(fetching_mode_), K(archive_dest_)); } } @@ -336,7 +340,7 @@ int ObLogFetcher::add_ls( const ObLogFetcherStartParameters &start_parameters) { int ret = OB_SUCCESS; - logservice::TenantLSID tls_id(tenant_id_, ls_id); + logservice::TenantLSID tls_id(source_tenant_id_, ls_id); LSFetchCtx *ls_fetch_ctx = NULL; FetchStreamType type = FETCH_STREAM_TYPE_UNKNOWN; const int64_t start_tstamp_ns = start_parameters.get_start_tstamp_ns(); @@ -387,7 +391,7 @@ int ObLogFetcher::add_ls( int ObLogFetcher::recycle_ls(const share::ObLSID &ls_id) { int ret = OB_SUCCESS; - logservice::TenantLSID tls_id(tenant_id_, ls_id); + logservice::TenantLSID tls_id(source_tenant_id_, ls_id); if (IS_NOT_INIT) { ret = OB_NOT_INIT; @@ -410,7 +414,7 @@ int ObLogFetcher::remove_ls(const share::ObLSID &ls_id) { int ret = OB_SUCCESS; // Copy the logservice::TenantLSID to avoid recycle - const logservice::TenantLSID removed_tls_id(tenant_id_, ls_id); + const logservice::TenantLSID removed_tls_id(source_tenant_id_, ls_id); if (IS_NOT_INIT) { ret = OB_NOT_INIT; @@ -444,7 +448,7 @@ int ObLogFetcher::remove_ls_physically(const share::ObLSID &ls_id) { int ret = OB_SUCCESS; // Copy the logservice::TenantLSID to avoid recycle - const logservice::TenantLSID removed_tls_id(tenant_id_, ls_id); + const logservice::TenantLSID removed_tls_id(source_tenant_id_, ls_id); if (IS_NOT_INIT) { ret = OB_NOT_INIT; @@ -480,14 +484,14 @@ int ObLogFetcher::get_all_ls(ObIArray &ls_ids) bool ObLogFetcher::is_ls_exist(const share::ObLSID &ls_id) const { - logservice::TenantLSID tls_id(tenant_id_, ls_id); + logservice::TenantLSID tls_id(source_tenant_id_, ls_id); return ls_fetch_mgr_.is_tls_exist(tls_id); } int ObLogFetcher::get_ls_proposal_id(const share::ObLSID &ls_id, int64_t &proposal_id) { int ret = OB_SUCCESS; - const logservice::TenantLSID tls_id(tenant_id_, ls_id); + const logservice::TenantLSID tls_id(source_tenant_id_, ls_id); if (IS_NOT_INIT) { ret = OB_NOT_INIT; diff --git a/src/logservice/logfetcher/ob_log_fetcher.h b/src/logservice/logfetcher/ob_log_fetcher.h index a2f94b3ed2..030da1e725 100644 --- a/src/logservice/logfetcher/ob_log_fetcher.h +++ b/src/logservice/logfetcher/ob_log_fetcher.h @@ -66,7 +66,7 @@ public: virtual int64_t get_cluster_id() const = 0; // Get Tenant ID - virtual uint64_t get_tenant_id() const = 0; + virtual uint64_t get_source_tenant_id() const = 0; // Add the log stream // @@ -181,7 +181,8 @@ public: int init( const LogFetcherUser &log_fetcher_user, const int64_t cluster_id, - const uint64_t tenant_id, + const uint64_t source_tenant_id, + const uint64_t self_tenant_id, const bool is_loading_data_dict_baseline_data, const ClientFetchingMode fetching_mode, const ObBackupPathString &archive_dest, @@ -203,7 +204,7 @@ public: virtual void configure(const ObLogFetcherConfig &cfg); virtual int64_t get_cluster_id() const { return cluster_id_; } - virtual uint64_t get_tenant_id() const { return tenant_id_; } + virtual uint64_t get_source_tenant_id() const { return source_tenant_id_; } virtual int add_ls( const share::ObLSID &ls_id, @@ -266,7 +267,8 @@ private: bool is_inited_; LogFetcherUser log_fetcher_user_; int64_t cluster_id_; - uint64_t tenant_id_; + uint64_t source_tenant_id_; + uint64_t self_tenant_id_; const ObLogFetcherConfig *cfg_; bool is_loading_data_dict_baseline_data_; diff --git a/src/logservice/logfetcher/ob_log_rpc.cpp b/src/logservice/logfetcher/ob_log_rpc.cpp index aa43dd5db4..c921a60393 100644 --- a/src/logservice/logfetcher/ob_log_rpc.cpp +++ b/src/logservice/logfetcher/ob_log_rpc.cpp @@ -77,6 +77,7 @@ int64_t ObLogRpc::g_rpc_process_handler_time_upper_limit = ObLogRpc::ObLogRpc() : is_inited_(false), cluster_id_(OB_INVALID_CLUSTER_ID), + self_tenant_id_(OB_INVALID_TENANT_ID), net_client_(), last_ssl_info_hash_(UINT64_MAX), ssl_key_expired_time_(0), @@ -117,6 +118,7 @@ int ObLogRpc::async_stream_fetch_log(const uint64_t tenant_id, { int ret = OB_SUCCESS; req.set_client_id(client_id_); + req.set_tenant_id(self_tenant_id_); if (1 == cfg_->test_mode_force_fetch_archive) { req.set_flag(ObCdcRpcTestFlag::OBCDC_RPC_FETCH_ARCHIVE); } @@ -137,6 +139,7 @@ int ObLogRpc::async_stream_fetch_missing_log(const uint64_t tenant_id, { int ret = OB_SUCCESS; req.set_client_id(client_id_); + req.set_tenant_id(self_tenant_id_); if (1 == cfg_->test_mode_force_fetch_archive) { req.set_flag(ObCdcRpcTestFlag::OBCDC_RPC_FETCH_ARCHIVE); } @@ -148,6 +151,7 @@ int ObLogRpc::async_stream_fetch_missing_log(const uint64_t tenant_id, int ObLogRpc::init( const int64_t cluster_id, + const uint64_t self_tenant_id, const int64_t io_thread_num, const ObLogFetcherConfig &cfg) { @@ -172,6 +176,7 @@ int ObLogRpc::init( LOG_ERROR("reload_ssl_config succ", KR(ret)); } else { cluster_id_ = cluster_id; + self_tenant_id_ = self_tenant_id; is_inited_ = true; LOG_INFO("init ObLogRpc succ", K(cluster_id), K(io_thread_num)); } @@ -183,6 +188,7 @@ void ObLogRpc::destroy() { is_inited_ = false; cluster_id_ = OB_INVALID_CLUSTER_ID; + self_tenant_id_ = OB_INVALID_TENANT_ID; net_client_.destroy(); last_ssl_info_hash_ = UINT64_MAX; ssl_key_expired_time_ = 0; diff --git a/src/logservice/logfetcher/ob_log_rpc.h b/src/logservice/logfetcher/ob_log_rpc.h index fef496a569..e0f2d00079 100644 --- a/src/logservice/logfetcher/ob_log_rpc.h +++ b/src/logservice/logfetcher/ob_log_rpc.h @@ -99,6 +99,7 @@ public: public: int init( const int64_t cluster_id, + const uint64_t self_tenant_id, const int64_t io_thread_num, const ObLogFetcherConfig &cfg); void destroy(); @@ -111,6 +112,7 @@ private: private: bool is_inited_; int64_t cluster_id_; + uint64_t self_tenant_id_; obrpc::ObNetClient net_client_; uint64_t last_ssl_info_hash_; int64_t ssl_key_expired_time_; diff --git a/src/logservice/restoreservice/ob_log_restore_net_driver.cpp b/src/logservice/restoreservice/ob_log_restore_net_driver.cpp index 2789fb4d44..a3854f8c10 100644 --- a/src/logservice/restoreservice/ob_log_restore_net_driver.cpp +++ b/src/logservice/restoreservice/ob_log_restore_net_driver.cpp @@ -401,7 +401,7 @@ int ObLogRestoreNetDriver::init_fetcher_if_needed_(const int64_t cluster_id, con if (OB_FAIL(proxy_.get_sql_proxy(proxy))) { LOG_WARN("get sql proxy failed"); - } else if (OB_FAIL(fetcher_->init(log_fetcher_user, cluster_id, tenant_id, + } else if (OB_FAIL(fetcher_->init(log_fetcher_user, cluster_id, tenant_id, MTL_ID(), is_loading_data_dict_baseline_data, fetching_mode, archive_dest, cfg_, ls_ctx_factory_, ls_ctx_add_info_factory_, restore_function_, proxy, &error_handler_))) { @@ -467,7 +467,7 @@ bool ObLogRestoreNetDriver::is_fetcher_stale_(const int64_t cluster_id, const ui if (NULL == fetcher_) { bret = true; } else { - bret = cluster_id != fetcher_->get_cluster_id() || tenant_id != fetcher_->get_tenant_id(); + bret = cluster_id != fetcher_->get_cluster_id() || tenant_id != fetcher_->get_source_tenant_id(); } return bret; }