[CdcService] CdcService support multiple standby tenant in same cluster fetch log from the primary tenant.

This commit is contained in:
zxlzxlzxlzxlzxl
2023-06-09 07:12:38 +00:00
committed by ob-robot
parent a11f594a34
commit 9d6659b95c
10 changed files with 98 additions and 30 deletions

View File

@ -96,7 +96,7 @@ int ObCdcFetcher::fetch_log(const ObCdcLSFetchLogReq &req,
PalfHandleGuard palf_handle_guard;
PalfGroupBufferIterator group_iter;
const ObCdcRpcId &rpc_id = req.get_client_id();
ClientLSKey ls_key(rpc_id.get_addr(), rpc_id.get_pid(), ls_id);
ClientLSKey ls_key(rpc_id.get_addr(), rpc_id.get_pid(), req.get_tenant_id(), ls_id);
ClientLSCtxMap &ctx_map = MTL(ObLogService*)->get_cdc_service()->get_ls_ctx_map();
ClientLSCtx *ls_ctx = NULL;
int8_t fetch_log_flag = req.get_flag();
@ -179,7 +179,7 @@ int ObCdcFetcher::fetch_missing_log(const obrpc::ObCdcLSFetchMissLogReq &req,
const ObLSID &ls_id = req.get_ls_id();
PalfHandleGuard palf_handle_guard;
const ObCdcRpcId &rpc_id = req.get_client_id();
ClientLSKey ls_key(rpc_id.get_addr(), rpc_id.get_pid(), ls_id);
ClientLSKey ls_key(rpc_id.get_addr(), rpc_id.get_pid(), req.get_tenant_id(), ls_id);
ClientLSCtxMap &ctx_map = MTL(ObLogService*)->get_cdc_service()->get_ls_ctx_map();
ClientLSCtx *ls_ctx = NULL;

View File

@ -249,7 +249,7 @@ OB_DEF_DESERIALIZE(ObCdcReqStartLSNByTsResp)
*
*/
OB_SERIALIZE_MEMBER(ObCdcLSFetchLogReq, rpc_ver_, ls_id_, start_lsn_,
upper_limit_ts_, client_pid_, client_id_, progress_, flag_, compressor_type_);
upper_limit_ts_, client_pid_, client_id_, progress_, flag_, compressor_type_, tenant_id_);
OB_SERIALIZE_MEMBER(ObCdcFetchStatus,
is_reach_max_lsn_,
is_reach_upper_limit_ts_,
@ -339,6 +339,7 @@ void ObCdcLSFetchLogReq::reset()
client_id_.reset();
progress_ = OB_INVALID_TIMESTAMP;
flag_ = 0;
tenant_id_ = OB_INVALID_TENANT_ID;
}
ObCdcLSFetchLogReq& ObCdcLSFetchLogReq::operator=(const ObCdcLSFetchLogReq &other)
@ -347,6 +348,11 @@ ObCdcLSFetchLogReq& ObCdcLSFetchLogReq::operator=(const ObCdcLSFetchLogReq &othe
ls_id_ = other.ls_id_;
start_lsn_ = other.start_lsn_;
upper_limit_ts_ = other.upper_limit_ts_;
client_pid_ = other.client_pid_;
client_id_ = other.client_id_;
progress_ = other.progress_;
flag_ = other.flag_;
tenant_id_ = other.tenant_id_;
return *this;
}
@ -356,7 +362,12 @@ bool ObCdcLSFetchLogReq::operator==(const ObCdcLSFetchLogReq &that) const
return rpc_ver_ == that.rpc_ver_
&& ls_id_ == that.ls_id_
&& start_lsn_ == that.start_lsn_
&& upper_limit_ts_ == that.upper_limit_ts_;
&& upper_limit_ts_ == that.upper_limit_ts_
&& client_pid_ == that.client_pid_
&& client_id_ == that.client_id_
&& progress_ == that.progress_
&& flag_ == that.flag_
&& tenant_id_ == that.tenant_id_;
}
bool ObCdcLSFetchLogReq::operator!=(const ObCdcLSFetchLogReq &that) const
@ -475,7 +486,7 @@ void ObCdcLSFetchLogResp::reset()
*/
OB_SERIALIZE_MEMBER(ObCdcLSFetchMissLogReq::MissLogParam, miss_lsn_);
OB_SERIALIZE_MEMBER(ObCdcLSFetchMissLogReq, rpc_ver_, ls_id_, miss_log_array_,
client_pid_, client_id_, flag_, compressor_type_);
client_pid_, client_id_, flag_, compressor_type_, tenant_id_);
void ObCdcLSFetchMissLogReq::reset()
{
@ -485,6 +496,7 @@ void ObCdcLSFetchMissLogReq::reset()
client_pid_ = 0;
client_id_.reset();
flag_ = 0;
tenant_id_ = OB_INVALID_TENANT_ID;
}
int ObCdcLSFetchMissLogReq::append_miss_log(const MissLogParam &param)

View File

@ -77,6 +77,21 @@ public:
client_addr_.reset();
}
bool operator==(const ObCdcRpcId &that) const {
return client_pid_ == that.client_pid_ &&
client_addr_ == that.client_addr_;
}
bool operator!=(const ObCdcRpcId &that) const {
return !(*this == that);
}
ObCdcRpcId &operator=(const ObCdcRpcId &that) {
client_pid_ = that.client_pid_;
client_addr_ = that.client_addr_;
return *this;
}
void set_addr(ObAddr &addr) { client_addr_ = addr; }
const ObAddr& get_addr() const { return client_addr_; }
@ -237,6 +252,9 @@ public:
void set_flag(int8_t flag) { flag_ |= flag; }
int8_t get_flag() const { return flag_; }
void set_tenant_id(const uint64_t tenant_id) { tenant_id_ = tenant_id; }
uint64_t get_tenant_id() const { return tenant_id_; }
void set_compressor_type(const common::ObCompressorType &compressor_type) { compressor_type_ = compressor_type; }
common::ObCompressorType get_compressor_type() const { return compressor_type_; }
@ -248,7 +266,8 @@ public:
K_(client_id),
K_(progress),
K_(flag),
K_(compressor_type));
K_(compressor_type),
K_(tenant_id));
OB_UNIS_VERSION(1);
@ -268,6 +287,7 @@ private:
int64_t progress_;
int8_t flag_;
common::ObCompressorType compressor_type_;
uint64_t tenant_id_;
};
// Statistics for LS
@ -484,6 +504,9 @@ public:
void set_flag(int8_t flag) { flag_ |= flag; }
int8_t get_flag() const { return flag_; }
void set_tenant_id(const uint64_t tenant_id) { tenant_id_ = tenant_id; }
uint64_t get_tenant_id() const { return tenant_id_; }
void set_compressor_type(const common::ObCompressorType &compressor_type) { compressor_type_ = compressor_type; }
common::ObCompressorType get_compressor_type() const { return compressor_type_; }
@ -503,6 +526,7 @@ private:
ObCdcRpcId client_id_;
int8_t flag_;
common::ObCompressorType compressor_type_;
uint64_t tenant_id_;
};
} // namespace obrpc

View File

@ -22,10 +22,15 @@ namespace cdc
{
///////////////////////////////////////////ClientLSKey///////////////////////////////////////////
ClientLSKey::ClientLSKey(const common::ObAddr &client_addr, const uint64_t client_pid, const share::ObLSID &ls_id)
: client_addr_(client_addr),
client_pid_(client_pid),
ls_id_(ls_id)
ClientLSKey::ClientLSKey(
const common::ObAddr &client_addr,
const uint64_t client_pid,
const uint64_t tenant_id,
const share::ObLSID &ls_id)
: client_addr_(client_addr),
client_pid_(client_pid),
tenant_id_(tenant_id),
ls_id_(ls_id)
{
}
@ -34,6 +39,7 @@ uint64_t ClientLSKey::hash() const
uint64_t hash_val = client_pid_;
hash_val = murmurhash(&hash_val , sizeof(hash_val), client_addr_.hash());
hash_val = murmurhash(&hash_val, sizeof(hash_val), ls_id_.hash());
hash_val = murmurhash(&hash_val, sizeof(hash_val), tenant_id_);
return hash_val;
}
@ -47,6 +53,7 @@ bool ClientLSKey::operator==(const ClientLSKey &that) const
{
return client_addr_ == that.client_addr_ &&
client_pid_ == that.client_pid_ &&
tenant_id_ == that.tenant_id_ &&
ls_id_ == that.ls_id_;
}
@ -59,6 +66,7 @@ ClientLSKey &ClientLSKey::operator=(const ClientLSKey &that)
{
client_addr_ = that.client_addr_;
client_pid_ = that.client_pid_;
tenant_id_ = that.tenant_id_;
ls_id_ = that.ls_id_;
return *this;
}
@ -85,6 +93,7 @@ void ClientLSKey::reset()
{
client_addr_.reset();
client_pid_ = 0;
tenant_id_ = OB_INVALID_TENANT_ID;
ls_id_ = ObLSID::INVALID_LS_ID;
}
///////////////////////////////////////////ClientLSCtx///////////////////////////////////////////

View File

@ -58,8 +58,16 @@ public:
class ClientLSKey
{
public:
ClientLSKey(): client_addr_(), client_pid_(0), ls_id_(share::ObLSID::INVALID_LS_ID) {}
ClientLSKey(const common::ObAddr &client_addr, const uint64_t client_pid, const share::ObLSID &ls_id);
ClientLSKey():
client_addr_(),
client_pid_(0),
tenant_id_(OB_INVALID_TENANT_ID),
ls_id_(share::ObLSID::INVALID_LS_ID)
{ }
ClientLSKey(const common::ObAddr &client_addr,
const uint64_t client_pid,
const uint64_t tenant_id,
const share::ObLSID &ls_id);
~ClientLSKey() { reset(); }
uint64_t hash() const;
int hash(uint64_t &hash_val) const;
@ -74,6 +82,7 @@ public:
private:
common::ObAddr client_addr_;
uint64_t client_pid_;
uint64_t tenant_id_;
share::ObLSID ls_id_;
};

View File

@ -39,7 +39,8 @@ ObLogFetcher::ObLogFetcher() :
is_inited_(false),
log_fetcher_user_(LogFetcherUser::UNKNOWN),
cluster_id_(OB_INVALID_CLUSTER_ID),
tenant_id_(OB_INVALID_TENANT_ID),
source_tenant_id_(OB_INVALID_TENANT_ID),
self_tenant_id_(OB_INVALID_TENANT_ID),
cfg_(nullptr),
is_loading_data_dict_baseline_data_(false),
fetching_mode_(ClientFetchingMode::FETCHING_MODE_UNKNOWN),
@ -71,7 +72,8 @@ ObLogFetcher::~ObLogFetcher()
int ObLogFetcher::init(
const LogFetcherUser &log_fetcher_user,
const int64_t cluster_id,
const uint64_t tenant_id,
const uint64_t source_tenant_id,
const uint64_t self_tenant_id,
const bool is_loading_data_dict_baseline_data,
const ClientFetchingMode fetching_mode,
const ObBackupPathString &archive_dest,
@ -127,7 +129,7 @@ int ObLogFetcher::init(
LOG_ERROR("init part fetch mgr fail", KR(ret));
} else if (OB_FAIL(init_self_addr_())) {
LOG_ERROR("init_self_addr_ fail", KR(ret));
} else if (OB_FAIL(rpc_.init(cluster_id, cfg.io_thread_num, cfg))) {
} else if (OB_FAIL(rpc_.init(cluster_id, self_tenant_id, cfg.io_thread_num, cfg))) {
LOG_ERROR("init rpc handler fail", KR(ret));
} else if (is_cdc(log_fetcher_user) && OB_FAIL(start_lsn_locator_.init(
cfg.start_lsn_locator_thread_num,
@ -159,7 +161,7 @@ int ObLogFetcher::init(
*err_handler))) {
LOG_ERROR("init stream worker fail", KR(ret));
} else if (OB_FAIL(fs_container_mgr_.init(
tenant_id,
source_tenant_id,
cfg.svr_stream_cached_count,
cfg.fetch_stream_cached_count,
cfg.rpc_result_cached_count,
@ -170,7 +172,8 @@ int ObLogFetcher::init(
} else {
log_fetcher_user_ = log_fetcher_user;
cluster_id_ = cluster_id;
tenant_id_ = tenant_id;
source_tenant_id_ = source_tenant_id;
self_tenant_id_ = self_tenant_id;
is_loading_data_dict_baseline_data_ = is_loading_data_dict_baseline_data;
fetching_mode_ = fetching_mode;
archive_dest_ = archive_dest;
@ -181,7 +184,8 @@ int ObLogFetcher::init(
stop_flag_ = true;
is_inited_ = true;
LOG_INFO("LogFetcher init succ", K_(cluster_id), K_(tenant_id), K_(is_loading_data_dict_baseline_data),
LOG_INFO("LogFetcher init succ", K_(cluster_id), K_(source_tenant_id),
K_(self_tenant_id), K_(is_loading_data_dict_baseline_data),
"fetching_mode", print_fetching_mode(fetching_mode_), K(archive_dest_));
}
}
@ -336,7 +340,7 @@ int ObLogFetcher::add_ls(
const ObLogFetcherStartParameters &start_parameters)
{
int ret = OB_SUCCESS;
logservice::TenantLSID tls_id(tenant_id_, ls_id);
logservice::TenantLSID tls_id(source_tenant_id_, ls_id);
LSFetchCtx *ls_fetch_ctx = NULL;
FetchStreamType type = FETCH_STREAM_TYPE_UNKNOWN;
const int64_t start_tstamp_ns = start_parameters.get_start_tstamp_ns();
@ -387,7 +391,7 @@ int ObLogFetcher::add_ls(
int ObLogFetcher::recycle_ls(const share::ObLSID &ls_id)
{
int ret = OB_SUCCESS;
logservice::TenantLSID tls_id(tenant_id_, ls_id);
logservice::TenantLSID tls_id(source_tenant_id_, ls_id);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -410,7 +414,7 @@ int ObLogFetcher::remove_ls(const share::ObLSID &ls_id)
{
int ret = OB_SUCCESS;
// Copy the logservice::TenantLSID to avoid recycle
const logservice::TenantLSID removed_tls_id(tenant_id_, ls_id);
const logservice::TenantLSID removed_tls_id(source_tenant_id_, ls_id);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -444,7 +448,7 @@ int ObLogFetcher::remove_ls_physically(const share::ObLSID &ls_id)
{
int ret = OB_SUCCESS;
// Copy the logservice::TenantLSID to avoid recycle
const logservice::TenantLSID removed_tls_id(tenant_id_, ls_id);
const logservice::TenantLSID removed_tls_id(source_tenant_id_, ls_id);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -480,14 +484,14 @@ int ObLogFetcher::get_all_ls(ObIArray<share::ObLSID> &ls_ids)
bool ObLogFetcher::is_ls_exist(const share::ObLSID &ls_id) const
{
logservice::TenantLSID tls_id(tenant_id_, ls_id);
logservice::TenantLSID tls_id(source_tenant_id_, ls_id);
return ls_fetch_mgr_.is_tls_exist(tls_id);
}
int ObLogFetcher::get_ls_proposal_id(const share::ObLSID &ls_id, int64_t &proposal_id)
{
int ret = OB_SUCCESS;
const logservice::TenantLSID tls_id(tenant_id_, ls_id);
const logservice::TenantLSID tls_id(source_tenant_id_, ls_id);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;

View File

@ -66,7 +66,7 @@ public:
virtual int64_t get_cluster_id() const = 0;
// Get Tenant ID
virtual uint64_t get_tenant_id() const = 0;
virtual uint64_t get_source_tenant_id() const = 0;
// Add the log stream
//
@ -181,7 +181,8 @@ public:
int init(
const LogFetcherUser &log_fetcher_user,
const int64_t cluster_id,
const uint64_t tenant_id,
const uint64_t source_tenant_id,
const uint64_t self_tenant_id,
const bool is_loading_data_dict_baseline_data,
const ClientFetchingMode fetching_mode,
const ObBackupPathString &archive_dest,
@ -203,7 +204,7 @@ public:
virtual void configure(const ObLogFetcherConfig &cfg);
virtual int64_t get_cluster_id() const { return cluster_id_; }
virtual uint64_t get_tenant_id() const { return tenant_id_; }
virtual uint64_t get_source_tenant_id() const { return source_tenant_id_; }
virtual int add_ls(
const share::ObLSID &ls_id,
@ -266,7 +267,8 @@ private:
bool is_inited_;
LogFetcherUser log_fetcher_user_;
int64_t cluster_id_;
uint64_t tenant_id_;
uint64_t source_tenant_id_;
uint64_t self_tenant_id_;
const ObLogFetcherConfig *cfg_;
bool is_loading_data_dict_baseline_data_;

View File

@ -77,6 +77,7 @@ int64_t ObLogRpc::g_rpc_process_handler_time_upper_limit =
ObLogRpc::ObLogRpc() :
is_inited_(false),
cluster_id_(OB_INVALID_CLUSTER_ID),
self_tenant_id_(OB_INVALID_TENANT_ID),
net_client_(),
last_ssl_info_hash_(UINT64_MAX),
ssl_key_expired_time_(0),
@ -117,6 +118,7 @@ int ObLogRpc::async_stream_fetch_log(const uint64_t tenant_id,
{
int ret = OB_SUCCESS;
req.set_client_id(client_id_);
req.set_tenant_id(self_tenant_id_);
if (1 == cfg_->test_mode_force_fetch_archive) {
req.set_flag(ObCdcRpcTestFlag::OBCDC_RPC_FETCH_ARCHIVE);
}
@ -137,6 +139,7 @@ int ObLogRpc::async_stream_fetch_missing_log(const uint64_t tenant_id,
{
int ret = OB_SUCCESS;
req.set_client_id(client_id_);
req.set_tenant_id(self_tenant_id_);
if (1 == cfg_->test_mode_force_fetch_archive) {
req.set_flag(ObCdcRpcTestFlag::OBCDC_RPC_FETCH_ARCHIVE);
}
@ -148,6 +151,7 @@ int ObLogRpc::async_stream_fetch_missing_log(const uint64_t tenant_id,
int ObLogRpc::init(
const int64_t cluster_id,
const uint64_t self_tenant_id,
const int64_t io_thread_num,
const ObLogFetcherConfig &cfg)
{
@ -172,6 +176,7 @@ int ObLogRpc::init(
LOG_ERROR("reload_ssl_config succ", KR(ret));
} else {
cluster_id_ = cluster_id;
self_tenant_id_ = self_tenant_id;
is_inited_ = true;
LOG_INFO("init ObLogRpc succ", K(cluster_id), K(io_thread_num));
}
@ -183,6 +188,7 @@ void ObLogRpc::destroy()
{
is_inited_ = false;
cluster_id_ = OB_INVALID_CLUSTER_ID;
self_tenant_id_ = OB_INVALID_TENANT_ID;
net_client_.destroy();
last_ssl_info_hash_ = UINT64_MAX;
ssl_key_expired_time_ = 0;

View File

@ -99,6 +99,7 @@ public:
public:
int init(
const int64_t cluster_id,
const uint64_t self_tenant_id,
const int64_t io_thread_num,
const ObLogFetcherConfig &cfg);
void destroy();
@ -111,6 +112,7 @@ private:
private:
bool is_inited_;
int64_t cluster_id_;
uint64_t self_tenant_id_;
obrpc::ObNetClient net_client_;
uint64_t last_ssl_info_hash_;
int64_t ssl_key_expired_time_;

View File

@ -401,7 +401,7 @@ int ObLogRestoreNetDriver::init_fetcher_if_needed_(const int64_t cluster_id, con
if (OB_FAIL(proxy_.get_sql_proxy(proxy))) {
LOG_WARN("get sql proxy failed");
} else if (OB_FAIL(fetcher_->init(log_fetcher_user, cluster_id, tenant_id,
} else if (OB_FAIL(fetcher_->init(log_fetcher_user, cluster_id, tenant_id, MTL_ID(),
is_loading_data_dict_baseline_data, fetching_mode, archive_dest,
cfg_, ls_ctx_factory_, ls_ctx_add_info_factory_,
restore_function_, proxy, &error_handler_))) {
@ -467,7 +467,7 @@ bool ObLogRestoreNetDriver::is_fetcher_stale_(const int64_t cluster_id, const ui
if (NULL == fetcher_) {
bret = true;
} else {
bret = cluster_id != fetcher_->get_cluster_id() || tenant_id != fetcher_->get_tenant_id();
bret = cluster_id != fetcher_->get_cluster_id() || tenant_id != fetcher_->get_source_tenant_id();
}
return bret;
}