[CP] Modify cache leader of auto increment to tenant isolation
This commit is contained in:
@ -31,12 +31,16 @@ namespace share
|
|||||||
int ObGAISClient::init(const ObAddr &self, ObGAISRequestRpc *gais_request_rpc)
|
int ObGAISClient::init(const ObAddr &self, ObGAISRequestRpc *gais_request_rpc)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
ObMemAttr attr(OB_SERVER_TENANT_ID, ObModIds::OB_AUTOINCREMENT);
|
||||||
|
SET_USE_500(attr);
|
||||||
if (OB_UNLIKELY(is_inited_)) {
|
if (OB_UNLIKELY(is_inited_)) {
|
||||||
ret = OB_INIT_TWICE;
|
ret = OB_INIT_TWICE;
|
||||||
LOG_WARN("init twice", KR(ret));
|
LOG_WARN("init twice", KR(ret));
|
||||||
} else if (OB_UNLIKELY(!self.is_valid()) || OB_ISNULL(gais_request_rpc)) {
|
} else if (OB_UNLIKELY(!self.is_valid()) || OB_ISNULL(gais_request_rpc)) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", KR(ret), K(self), KP(gais_request_rpc));
|
LOG_WARN("invalid argument", KR(ret), K(self), KP(gais_request_rpc));
|
||||||
|
} else if (OB_FAIL(gais_cache_leader_map_.create(32 /*init leader size*/, attr, attr))) {
|
||||||
|
LOG_WARN("fail to init leader map", K(ret));
|
||||||
} else {
|
} else {
|
||||||
self_ = self;
|
self_ = self;
|
||||||
gais_request_rpc_ = gais_request_rpc;
|
gais_request_rpc_ = gais_request_rpc;
|
||||||
@ -51,7 +55,7 @@ void ObGAISClient::reset()
|
|||||||
is_inited_ = false;
|
is_inited_ = false;
|
||||||
self_.reset();
|
self_.reset();
|
||||||
gais_request_rpc_ = NULL;
|
gais_request_rpc_ = NULL;
|
||||||
reset_cache_leader_();
|
gais_cache_leader_map_.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObGAISClient::get_value(const AutoincKey &key,
|
int ObGAISClient::get_value(const AutoincKey &key,
|
||||||
@ -275,12 +279,12 @@ int ObGAISClient::clear_global_autoinc_cache(const AutoincKey &key)
|
|||||||
|
|
||||||
int ObGAISClient::get_leader_(const uint64_t tenant_id, ObAddr &leader)
|
int ObGAISClient::get_leader_(const uint64_t tenant_id, ObAddr &leader)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = gais_cache_leader_map_.get_refactored(tenant_id, leader);
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
} else if (ret == OB_HASH_NOT_EXIST) {
|
||||||
|
ret = OB_SUCCESS;
|
||||||
const int64_t cluster_id = GCONF.cluster_id;
|
const int64_t cluster_id = GCONF.cluster_id;
|
||||||
lib::ObMutexGuard guard(cache_leader_mutex_);
|
if (OB_ISNULL(GCTX.location_service_)) {
|
||||||
if (OB_LIKELY(gais_cache_leader_.is_valid())) {
|
|
||||||
leader = gais_cache_leader_;
|
|
||||||
} else if (OB_ISNULL(GCTX.location_service_)) {
|
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("location cache is NULL", K(ret));
|
LOG_WARN("location cache is NULL", K(ret));
|
||||||
} else if (OB_FAIL(GCTX.location_service_->nonblock_get_leader(
|
} else if (OB_FAIL(GCTX.location_service_->nonblock_get_leader(
|
||||||
@ -289,8 +293,13 @@ int ObGAISClient::get_leader_(const uint64_t tenant_id, ObAddr &leader)
|
|||||||
} else if (OB_UNLIKELY(!leader.is_valid())) {
|
} else if (OB_UNLIKELY(!leader.is_valid())) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("get invaild lear from location adapter", K(ret), K(leader));
|
LOG_WARN("get invaild lear from location adapter", K(ret), K(leader));
|
||||||
|
} else if (OB_FAIL(gais_cache_leader_map_.set_refactored(tenant_id, leader, 1))) {
|
||||||
|
LOG_WARN("fail to set leader to map", K(ret), K(tenant_id), K(leader));
|
||||||
} else {
|
} else {
|
||||||
gais_cache_leader_ = leader;
|
LOG_INFO("succ to refresh leader", K(cluster_id), K(tenant_id), K(leader));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG_WARN("fail get cache from hash map", K(ret));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -299,7 +308,7 @@ int ObGAISClient::refresh_location_(const uint64_t tenant_id)
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
const int64_t cluster_id = GCONF.cluster_id;
|
const int64_t cluster_id = GCONF.cluster_id;
|
||||||
reset_cache_leader_();
|
gais_cache_leader_map_.erase_refactored(tenant_id);
|
||||||
if (OB_ISNULL(GCTX.location_service_)) {
|
if (OB_ISNULL(GCTX.location_service_)) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
LOG_WARN("location cache is NULL", K(ret));
|
LOG_WARN("location cache is NULL", K(ret));
|
||||||
|
|||||||
@ -25,12 +25,12 @@ namespace share
|
|||||||
class ObGAISClient
|
class ObGAISClient
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
ObGAISClient() : is_inited_(false), self_(), gais_request_rpc_(nullptr), gais_cache_leader_(),
|
ObGAISClient() : is_inited_(false), self_(), gais_request_rpc_(nullptr), gais_cache_leader_map_()
|
||||||
cache_leader_mutex_(common::ObLatchIds::AUTO_INCREMENT_LEADER_LOCK) { }
|
{ }
|
||||||
~ObGAISClient() { }
|
~ObGAISClient() { }
|
||||||
int init(const common::ObAddr &self, share::ObGAISRequestRpc *gais_request_rpc);
|
int init(const common::ObAddr &self, share::ObGAISRequestRpc *gais_request_rpc);
|
||||||
void reset();
|
void reset();
|
||||||
TO_STRING_KV(K_(self), K_(gais_cache_leader));
|
TO_STRING_KV(K_(self), "map_size", gais_cache_leader_map_.size());
|
||||||
|
|
||||||
public:
|
public:
|
||||||
int get_value(const AutoincKey &key,
|
int get_value(const AutoincKey &key,
|
||||||
@ -59,18 +59,12 @@ public:
|
|||||||
private:
|
private:
|
||||||
int get_leader_(const uint64_t tenant_id, common::ObAddr &leader);
|
int get_leader_(const uint64_t tenant_id, common::ObAddr &leader);
|
||||||
int refresh_location_(const uint64_t tenant_id);
|
int refresh_location_(const uint64_t tenant_id);
|
||||||
inline void reset_cache_leader_()
|
|
||||||
{
|
|
||||||
lib::ObMutexGuard guard(cache_leader_mutex_);
|
|
||||||
gais_cache_leader_.reset();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
common::ObAddr self_;
|
common::ObAddr self_;
|
||||||
share::ObGAISRequestRpc *gais_request_rpc_;
|
share::ObGAISRequestRpc *gais_request_rpc_;
|
||||||
common::ObAddr gais_cache_leader_;
|
common::hash::ObHashMap<uint64_t, common::ObAddr> gais_cache_leader_map_;
|
||||||
lib::ObMutex cache_leader_mutex_;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // share
|
} // share
|
||||||
|
|||||||
Reference in New Issue
Block a user