From d4d9911a8233356c3ead950ffde74d24585c9bbc Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 4 Nov 2022 07:10:22 +0000 Subject: [PATCH] Fix ddl sync auto increment value error --- src/observer/ob_server.cpp | 1 - src/share/ob_autoincrement_service.cpp | 12 +- src/share/ob_autoincrement_service.h | 5 - src/share/ob_gais_client.cpp | 262 ++++++++++++------------- src/share/ob_gais_client.h | 8 +- src/share/ob_gais_rpc.cpp | 76 ++++--- 6 files changed, 175 insertions(+), 189 deletions(-) diff --git a/src/observer/ob_server.cpp b/src/observer/ob_server.cpp index 6563c69c3b..4d1aeeda73 100644 --- a/src/observer/ob_server.cpp +++ b/src/observer/ob_server.cpp @@ -1738,7 +1738,6 @@ int ObServer::init_autoincrement_service() &sql_proxy_, &srv_rpc_proxy_, &schema_service_, - location_service_, net_frame_.get_req_transport()))) { LOG_ERROR("init autoincrement_service_ fail", KR(ret)); } diff --git a/src/share/ob_autoincrement_service.cpp b/src/share/ob_autoincrement_service.cpp index 42b2b1db92..fea595f4fe 100644 --- a/src/share/ob_autoincrement_service.cpp +++ b/src/share/ob_autoincrement_service.cpp @@ -249,7 +249,6 @@ int ObAutoincrementService::init(ObAddr &addr, ObMySQLProxy *mysql_proxy, ObSrvRpcProxy *srv_proxy, ObMultiVersionSchemaService *schema_service, - ObLocationService &location_service, rpc::frame::ObReqTransport *req_transport) { int ret = OB_SUCCESS; @@ -260,7 +259,7 @@ int ObAutoincrementService::init(ObAddr &addr, if (OB_FAIL(distributed_autoinc_service_.init(mysql_proxy))) { LOG_WARN("fail init distributed_autoinc_service_ service", K(ret)); - } else if (OB_FAIL(global_autoinc_service_.init(my_addr_, *schema_service, location_service, req_transport))) { + } else if (OB_FAIL(global_autoinc_service_.init(my_addr_, req_transport))) { LOG_WARN("fail init auto inc global service", K(ret)); } else if (OB_FAIL(node_allocator_.init(sizeof(TableNode), ObModIds::OB_AUTOINCREMENT))) { LOG_WARN("failed to init table node allocator", K(ret)); @@ -277,7 +276,6 @@ int ObAutoincrementService::init_for_backup(ObAddr &addr, ObMySQLProxy *mysql_proxy, ObSrvRpcProxy *srv_proxy, ObMultiVersionSchemaService *schema_service, - share::ObLocationService &location_service, rpc::frame::ObReqTransport *req_transport) { int ret = OB_SUCCESS; @@ -286,7 +284,7 @@ int ObAutoincrementService::init_for_backup(ObAddr &addr, srv_proxy_ = srv_proxy; schema_service_ = schema_service; OZ(distributed_autoinc_service_.init(mysql_proxy)); - OZ(global_autoinc_service_.init(my_addr_, *schema_service, location_service, req_transport)); + OZ(global_autoinc_service_.init(my_addr_, req_transport)); return ret; } @@ -1441,8 +1439,6 @@ int ObInnerTableGlobalAutoIncrementService::local_sync_with_global_value( int ObRpcGlobalAutoIncrementService::init( const common::ObAddr &addr, - share::schema::ObMultiVersionSchemaService &schema_service, - share::ObLocationService &location_service, rpc::frame::ObReqTransport *req_transport) { int ret = OB_SUCCESS; @@ -1452,13 +1448,11 @@ int ObRpcGlobalAutoIncrementService::init( } else if (!addr.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(addr)); - } else if (OB_FAIL(location_adapter_def_.init(&schema_service, &location_service))) { - LOG_WARN("localtion adapter init error", K(ret)); } else if (OB_FAIL(gais_request_rpc_proxy_.init(req_transport, addr))) { LOG_WARN("rpc proxy init failed", K(ret), K(req_transport), K(addr)); } else if (OB_FAIL(gais_request_rpc_.init(&gais_request_rpc_proxy_, addr))) { LOG_WARN("response rpc init failed", K(ret), K(addr)); - } else if (OB_FAIL(gais_client_.init(addr, &gais_request_rpc_, &location_adapter_def_))) { + } else if (OB_FAIL(gais_client_.init(addr, &gais_request_rpc_))) { LOG_WARN("init client failed", K(ret)); } return ret; diff --git a/src/share/ob_autoincrement_service.h b/src/share/ob_autoincrement_service.h index bd7feaa934..cbcc06c856 100644 --- a/src/share/ob_autoincrement_service.h +++ b/src/share/ob_autoincrement_service.h @@ -277,8 +277,6 @@ public: virtual ~ObRpcGlobalAutoIncrementService() = default; int init(const common::ObAddr &addr, - share::schema::ObMultiVersionSchemaService &schema_service, - share::ObLocationService &location_service, rpc::frame::ObReqTransport *req_transport); virtual int get_value( @@ -316,7 +314,6 @@ public: private: bool is_inited_; ObGAISClient gais_client_; - transaction::ObLocationAdapter location_adapter_def_; obrpc::ObGAISRpcProxy gais_request_rpc_proxy_; ObGAISRequestRpc gais_request_rpc_; }; @@ -335,13 +332,11 @@ public: common::ObMySQLProxy *mysql_proxy, obrpc::ObSrvRpcProxy *srv_proxy, share::schema::ObMultiVersionSchemaService *schema_service, - share::ObLocationService &location_service, rpc::frame::ObReqTransport *req_transport); int init_for_backup(common::ObAddr &addr, common::ObMySQLProxy *mysql_proxy, obrpc::ObSrvRpcProxy *srv_proxy, share::schema::ObMultiVersionSchemaService *schema_service, - share::ObLocationService &location_service, rpc::frame::ObReqTransport *req_transport); int get_handle(AutoincParam ¶m, CacheHandle *&handle); void release_handle(CacheHandle *&handle); diff --git a/src/share/ob_gais_client.cpp b/src/share/ob_gais_client.cpp index 1d043c01e7..6ea3f32d7b 100644 --- a/src/share/ob_gais_client.cpp +++ b/src/share/ob_gais_client.cpp @@ -28,21 +28,18 @@ using namespace oceanbase::transaction; namespace share { -int ObGAISClient::init(const ObAddr &self, ObGAISRequestRpc *gais_request_rpc, - ObILocationAdapter *location_adapter) +int ObGAISClient::init(const ObAddr &self, ObGAISRequestRpc *gais_request_rpc) { int ret = OB_SUCCESS; if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", KR(ret)); - } else if (OB_UNLIKELY(!self.is_valid()) || OB_ISNULL(gais_request_rpc) || - OB_ISNULL(location_adapter)) { - LOG_WARN("invalid argument", KR(ret), K(self), KP(gais_request_rpc), KP(location_adapter)); + } else if (OB_UNLIKELY(!self.is_valid()) || OB_ISNULL(gais_request_rpc)) { ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(self), KP(gais_request_rpc)); } else { self_ = self; gais_request_rpc_ = gais_request_rpc; - location_adapter_ = location_adapter; is_inited_ = true; LOG_INFO("gais client init success", K(self), KP(this)); } @@ -54,7 +51,6 @@ void ObGAISClient::reset() is_inited_ = false; self_.reset(); gais_request_rpc_ = NULL; - location_adapter_ = NULL; reset_cache_leader_(); } @@ -75,31 +71,29 @@ int ObGAISClient::get_value(const AutoincKey &key, ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else { - MTL_SWITCH(tenant_id) { - ObGAISNextAutoIncValReq msg; - ObGAISNextValRpcResult rpc_result; - ObAddr leader; - if (OB_FAIL(get_leader_(tenant_id, leader))) { - LOG_WARN("get leader fail", K(ret)); - (void)refresh_location_(tenant_id); - } else if (OB_FAIL(msg.init(key, offset, increment, table_auto_increment, max_value, - desired_count, cache_size, self_))) { - LOG_WARN("fail to init request msg", K(ret)); - } else if (OB_UNLIKELY(!msg.is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(msg)); - } else if (OB_FAIL(gais_request_rpc_->next_autoinc_val(leader, msg, rpc_result))) { - LOG_WARN("handle gais request failed", K(ret), K(msg), K(rpc_result)); - (void)refresh_location_(tenant_id); - } else if (!rpc_result.is_valid()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("rpc result is unexpected", K(ret), K(rpc_result)); - } else { - start_inclusive = rpc_result.start_inclusive_; - end_inclusive = rpc_result.end_inclusive_; - sync_value = rpc_result.sync_value_; - LOG_DEBUG("handle gais success", K(rpc_result)); - } + ObGAISNextAutoIncValReq msg; + ObGAISNextValRpcResult rpc_result; + ObAddr leader; + if (OB_FAIL(get_leader_(tenant_id, leader))) { + LOG_WARN("get leader fail", K(ret)); + (void)refresh_location_(tenant_id); + } else if (OB_FAIL(msg.init(key, offset, increment, table_auto_increment, max_value, + desired_count, cache_size, self_))) { + LOG_WARN("fail to init request msg", K(ret)); + } else if (OB_UNLIKELY(!msg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(msg)); + } else if (OB_FAIL(gais_request_rpc_->next_autoinc_val(leader, msg, rpc_result))) { + LOG_WARN("handle gais request failed", K(ret), K(msg), K(rpc_result)); + (void)refresh_location_(tenant_id); + } else if (!rpc_result.is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("rpc result is unexpected", K(ret), K(rpc_result)); + } else { + start_inclusive = rpc_result.start_inclusive_; + end_inclusive = rpc_result.end_inclusive_; + sync_value = rpc_result.sync_value_; + LOG_DEBUG("handle gais success", K(rpc_result)); } } return ret; @@ -113,25 +107,23 @@ int ObGAISClient::get_sequence_value(const AutoincKey &key, uint64_t &sequence_v ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else { - MTL_SWITCH(tenant_id) { - ObGAISAutoIncKeyArg msg; - ObGAISCurrValRpcResult rpc_result; - ObAddr leader; - if (OB_FAIL(get_leader_(tenant_id, leader))) { - LOG_WARN("get leader fail", K(ret)); - (void)refresh_location_(tenant_id); - } else if (OB_FAIL(msg.init(key, self_))) { - LOG_WARN("fail to init request msg", K(ret)); - } else if (OB_UNLIKELY(!msg.is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(msg)); - } else if (OB_FAIL(gais_request_rpc_->curr_autoinc_val(leader, msg, rpc_result))) { - LOG_WARN("handle gais request failed", K(ret), K(msg), K(rpc_result)); - (void)refresh_location_(tenant_id); - } else { - sequence_value = rpc_result.sequence_value_; - LOG_DEBUG("handle gais success", K(rpc_result)); - } + ObGAISAutoIncKeyArg msg; + ObGAISCurrValRpcResult rpc_result; + ObAddr leader; + if (OB_FAIL(get_leader_(tenant_id, leader))) { + LOG_WARN("get leader fail", K(ret)); + (void)refresh_location_(tenant_id); + } else if (OB_FAIL(msg.init(key, self_))) { + LOG_WARN("fail to init request msg", K(ret)); + } else if (OB_UNLIKELY(!msg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(msg)); + } else if (OB_FAIL(gais_request_rpc_->curr_autoinc_val(leader, msg, rpc_result))) { + LOG_WARN("handle gais request failed", K(ret), K(msg), K(rpc_result)); + (void)refresh_location_(tenant_id); + } else { + sequence_value = rpc_result.sequence_value_; + LOG_DEBUG("handle gais success", K(rpc_result)); } } @@ -152,30 +144,28 @@ int ObGAISClient::get_auto_increment_values( ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else { - MTL_SWITCH(tenant_id) { - ObGAISAutoIncKeyArg msg; - ObGAISCurrValRpcResult rpc_result; - ObAddr leader; - if (OB_FAIL(get_leader_(tenant_id, leader))) { - LOG_WARN("get leader fail", K(ret)); - (void)refresh_location_(tenant_id); - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < autoinc_keys.count(); ++i) { - rpc_result.reset(); - AutoincKey key = autoinc_keys.at(i); - if (OB_FAIL(msg.init(key, self_))) { - LOG_WARN("fail to init request msg", K(ret)); - } else if (OB_UNLIKELY(!msg.is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(msg)); - } else if (OB_FAIL(gais_request_rpc_->curr_autoinc_val(leader, msg, rpc_result))) { - LOG_WARN("handle gais request failed", K(ret), K(msg), K(rpc_result)); - (void)refresh_location_(tenant_id); - } else if (OB_FAIL(seq_values.set_refactored(key, rpc_result.sequence_value_))) { - LOG_WARN("fail to get int_value.", K(ret)); - } else { - LOG_DEBUG("handle gais success", K(rpc_result)); - } + ObGAISAutoIncKeyArg msg; + ObGAISCurrValRpcResult rpc_result; + ObAddr leader; + if (OB_FAIL(get_leader_(tenant_id, leader))) { + LOG_WARN("get leader fail", K(ret)); + (void)refresh_location_(tenant_id); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < autoinc_keys.count(); ++i) { + rpc_result.reset(); + AutoincKey key = autoinc_keys.at(i); + if (OB_FAIL(msg.init(key, self_))) { + LOG_WARN("fail to init request msg", K(ret)); + } else if (OB_UNLIKELY(!msg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(msg)); + } else if (OB_FAIL(gais_request_rpc_->curr_autoinc_val(leader, msg, rpc_result))) { + LOG_WARN("handle gais request failed", K(ret), K(msg), K(rpc_result)); + (void)refresh_location_(tenant_id); + } else if (OB_FAIL(seq_values.set_refactored(key, rpc_result.sequence_value_))) { + LOG_WARN("fail to get int_value.", K(ret)); + } else { + LOG_DEBUG("handle gais success", K(rpc_result)); } } } @@ -196,25 +186,23 @@ int ObGAISClient::local_push_to_global_value(const AutoincKey &key, ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else { - MTL_SWITCH(tenant_id) { - ObGAISPushAutoIncValReq msg; - uint64_t new_sync_value = 0; - ObAddr leader; - if (OB_FAIL(get_leader_(tenant_id, leader))) { - LOG_WARN("get leader fail", K(ret)); - (void)refresh_location_(tenant_id); - } else if (OB_FAIL(msg.init(key, local_sync_value, max_value, self_))) { - LOG_WARN("fail to init request msg", K(ret)); - } else if (OB_UNLIKELY(!msg.is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(msg)); - } else if (OB_FAIL(gais_request_rpc_->push_autoinc_val(leader, msg, new_sync_value))) { - LOG_WARN("handle gais request failed", K(ret), K(msg)); - (void)refresh_location_(tenant_id); - } else { - global_sync_value = new_sync_value; - LOG_DEBUG("handle gais success", K(global_sync_value)); - } + ObGAISPushAutoIncValReq msg; + uint64_t new_sync_value = 0; + ObAddr leader; + if (OB_FAIL(get_leader_(tenant_id, leader))) { + LOG_WARN("get leader fail", K(ret)); + (void)refresh_location_(tenant_id); + } else if (OB_FAIL(msg.init(key, local_sync_value, max_value, self_))) { + LOG_WARN("fail to init request msg", K(ret)); + } else if (OB_UNLIKELY(!msg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(msg)); + } else if (OB_FAIL(gais_request_rpc_->push_autoinc_val(leader, msg, new_sync_value))) { + LOG_WARN("handle gais request failed", K(ret), K(msg)); + (void)refresh_location_(tenant_id); + } else { + global_sync_value = new_sync_value; + LOG_DEBUG("handle gais success", K(global_sync_value)); } } return ret; @@ -228,28 +216,26 @@ int ObGAISClient::local_sync_with_global_value(const AutoincKey &key, uint64_t & ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else { - MTL_SWITCH(tenant_id) { - ObGAISAutoIncKeyArg msg; - ObGAISCurrValRpcResult rpc_result; - ObAddr leader; - if (OB_FAIL(get_leader_(tenant_id, leader))) { - LOG_WARN("get leader fail", K(ret)); - (void)refresh_location_(key.tenant_id_); - } else if (OB_FAIL(msg.init(key, self_))) { - LOG_WARN("fail to init request msg", K(ret)); - } else if (OB_UNLIKELY(!msg.is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(msg)); - } else if (OB_FAIL(gais_request_rpc_->curr_autoinc_val(leader, msg, rpc_result))) { - LOG_WARN("handle gais request failed", K(ret), K(msg)); - (void)refresh_location_(key.tenant_id_); - } else if (!rpc_result.is_valid()) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("rpc result is unexpected", K(ret), K(rpc_result)); - } else { - global_sync_value = rpc_result.sync_value_; - LOG_DEBUG("handle gais success", K(global_sync_value)); - } + ObGAISAutoIncKeyArg msg; + ObGAISCurrValRpcResult rpc_result; + ObAddr leader; + if (OB_FAIL(get_leader_(tenant_id, leader))) { + LOG_WARN("get leader fail", K(ret)); + (void)refresh_location_(key.tenant_id_); + } else if (OB_FAIL(msg.init(key, self_))) { + LOG_WARN("fail to init request msg", K(ret)); + } else if (OB_UNLIKELY(!msg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(msg)); + } else if (OB_FAIL(gais_request_rpc_->curr_autoinc_val(leader, msg, rpc_result))) { + LOG_WARN("handle gais request failed", K(ret), K(msg)); + (void)refresh_location_(key.tenant_id_); + } else if (!rpc_result.is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("rpc result is unexpected", K(ret), K(rpc_result)); + } else { + global_sync_value = rpc_result.sync_value_; + LOG_DEBUG("handle gais success", K(global_sync_value)); } } return ret; @@ -263,23 +249,21 @@ int ObGAISClient::clear_global_autoinc_cache(const AutoincKey &key) ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else { - MTL_SWITCH(tenant_id) { - ObGAISAutoIncKeyArg msg; - ObAddr leader; - if (OB_FAIL(get_leader_(tenant_id, leader))) { - LOG_WARN("get leader fail", K(ret)); - (void)refresh_location_(key.tenant_id_); - } else if (OB_FAIL(msg.init(key, self_))) { - LOG_WARN("fail to init request msg", K(ret)); - } else if (OB_UNLIKELY(!msg.is_valid())) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(msg)); - } else if (OB_FAIL(gais_request_rpc_->clear_autoinc_cache(leader, msg))) { - LOG_WARN("handle gais request failed", K(ret), K(msg)); - (void)refresh_location_(key.tenant_id_); - } else { - LOG_DEBUG("clear global autoinc cache success", K(msg)); - } + ObGAISAutoIncKeyArg msg; + ObAddr leader; + if (OB_FAIL(get_leader_(tenant_id, leader))) { + LOG_WARN("get leader fail", K(ret)); + (void)refresh_location_(key.tenant_id_); + } else if (OB_FAIL(msg.init(key, self_))) { + LOG_WARN("fail to init request msg", K(ret)); + } else if (OB_UNLIKELY(!msg.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(msg)); + } else if (OB_FAIL(gais_request_rpc_->clear_autoinc_cache(leader, msg))) { + LOG_WARN("handle gais request failed", K(ret), K(msg)); + (void)refresh_location_(key.tenant_id_); + } else { + LOG_DEBUG("clear global autoinc cache success", K(msg)); } } return ret; @@ -292,8 +276,11 @@ int ObGAISClient::get_leader_(const uint64_t tenant_id, ObAddr &leader) lib::ObMutexGuard guard(cache_leader_mutex_); if (OB_LIKELY(gais_cache_leader_.is_valid())) { leader = gais_cache_leader_; - } else if (OB_FAIL(location_adapter_->nonblock_get_leader( - cluster_id, tenant_id, GAIS_LS, leader))) { + } else if (OB_ISNULL(GCTX.location_service_)) { + ret = OB_NOT_INIT; + LOG_WARN("location cache is NULL", K(ret)); + } else if (OB_FAIL(GCTX.location_service_->nonblock_get_leader( + cluster_id, tenant_id, GAIS_LS, leader))) { LOG_WARN("gais nonblock get leader failed", K(ret), K(tenant_id), K(GAIS_LS)); } else if (OB_UNLIKELY(!leader.is_valid())) { ret = OB_ERR_UNEXPECTED; @@ -309,7 +296,10 @@ int ObGAISClient::refresh_location_(const uint64_t tenant_id) int ret = OB_SUCCESS; const int64_t cluster_id = GCONF.cluster_id; reset_cache_leader_(); - if (OB_FAIL(location_adapter_->nonblock_renew(cluster_id, tenant_id, GAIS_LS))) { + if (OB_ISNULL(GCTX.location_service_)) { + ret = OB_NOT_INIT; + LOG_WARN("location cache is NULL", K(ret)); + } else if (OB_FAIL(GCTX.location_service_->nonblock_renew(cluster_id, tenant_id, GAIS_LS))) { LOG_WARN("gais nonblock renew error", KR(ret), K(tenant_id), K(GAIS_LS)); } return ret; diff --git a/src/share/ob_gais_client.h b/src/share/ob_gais_client.h index d1c2a39f31..ba1e684a8f 100644 --- a/src/share/ob_gais_client.h +++ b/src/share/ob_gais_client.h @@ -25,13 +25,10 @@ namespace share class ObGAISClient { public: - ObGAISClient() : is_inited_(false), self_(), gais_request_rpc_(nullptr), - location_adapter_(nullptr), gais_cache_leader_() { } + ObGAISClient() : is_inited_(false), self_(), gais_request_rpc_(nullptr), gais_cache_leader_() { } ~ObGAISClient() { } - int init(const common::ObAddr &self, share::ObGAISRequestRpc *gais_request_rpc, - transaction::ObILocationAdapter *location_adapter); + int init(const common::ObAddr &self, share::ObGAISRequestRpc *gais_request_rpc); void reset(); - int refresh_location(const uint64_t tenant_id) { return refresh_location_(tenant_id); } TO_STRING_KV(K_(self), K_(gais_cache_leader)); public: @@ -68,7 +65,6 @@ private: bool is_inited_; common::ObAddr self_; share::ObGAISRequestRpc *gais_request_rpc_; - transaction::ObILocationAdapter *location_adapter_; common::ObAddr gais_cache_leader_; lib::ObMutex cache_leader_mutex_; }; diff --git a/src/share/ob_gais_rpc.cpp b/src/share/ob_gais_rpc.cpp index ba1b3966da..40614ddcde 100644 --- a/src/share/ob_gais_rpc.cpp +++ b/src/share/ob_gais_rpc.cpp @@ -162,17 +162,20 @@ int ObGAISRequestRpc::next_autoinc_val(const ObAddr &server, } else if (server == self_) { // Use local calls instead of rpc ObGlobalAutoIncService *gais = nullptr; - if (OB_ISNULL(gais = MTL_WITH_CHECK_TENANT(ObGlobalAutoIncService *, msg.autoinc_key_.tenant_id_))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("global autoinc service is null", K(ret)); - } else if (OB_FAIL(gais->handle_next_autoinc_request(msg, rpc_result))) { - LOG_WARN("post local gais require autoinc request failed", KR(ret), K(server), K(msg)); - } else if (!rpc_result.is_valid()) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("post local gais require autoinc and gais_rpc_result is invalid", KR(ret), K(server), - K(msg), K(rpc_result)); - } else { - LOG_TRACE("post local require autoinc request success", K(msg), K(rpc_result)); + const uint64_t tenant_id = msg.autoinc_key_.tenant_id_; + MTL_SWITCH(tenant_id) { + if (OB_ISNULL(gais = MTL(ObGlobalAutoIncService *))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("global autoinc service is null", K(ret)); + } else if (OB_FAIL(gais->handle_next_autoinc_request(msg, rpc_result))) { + LOG_WARN("post local gais require autoinc request failed", KR(ret), K(server), K(msg)); + } else if (!rpc_result.is_valid()) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("post local gais require autoinc and gais_rpc_result is invalid", KR(ret), K(server), + K(msg), K(rpc_result)); + } else { + LOG_TRACE("post local require autoinc request success", K(msg), K(rpc_result)); + } } } else if (OB_FAIL(rpc_proxy_->to(server).by(msg.autoinc_key_.tenant_id_).timeout(timeout).next_autoinc_val(msg, rpc_result))) { LOG_WARN("post require autoinc request failed", KR(ret), K(server), K(msg)); @@ -201,13 +204,16 @@ int ObGAISRequestRpc::curr_autoinc_val(const ObAddr &server, } else if (server == self_) { // Use local calls instead of rpc ObGlobalAutoIncService *gais = nullptr; - if (OB_ISNULL(gais = MTL_WITH_CHECK_TENANT(ObGlobalAutoIncService *, msg.autoinc_key_.tenant_id_))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("global autoinc service is null", K(ret)); - } else if (OB_FAIL(gais->handle_curr_autoinc_request(msg, rpc_result))) { - LOG_WARN("post local gais get autoinc request failed", KR(ret), K(server), K(msg)); - } else { - LOG_TRACE("post local get autoinc request success", K(msg), K(rpc_result)); + const uint64_t tenant_id = msg.autoinc_key_.tenant_id_; + MTL_SWITCH(tenant_id) { + if (OB_ISNULL(gais = MTL(ObGlobalAutoIncService *))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("global autoinc service is null", K(ret)); + } else if (OB_FAIL(gais->handle_curr_autoinc_request(msg, rpc_result))) { + LOG_WARN("post local gais get autoinc request failed", KR(ret), K(server), K(msg)); + } else { + LOG_TRACE("post local get autoinc request success", K(msg), K(rpc_result)); + } } } else if (OB_FAIL(rpc_proxy_->to(server).by(msg.autoinc_key_.tenant_id_).timeout(timeout).curr_autoinc_val(msg, rpc_result))) { LOG_WARN("post gais request failed", KR(ret), K(server), K(msg)); @@ -232,13 +238,16 @@ int ObGAISRequestRpc::push_autoinc_val(const ObAddr &server, } else if (server == self_) { // Use local calls instead of rpc ObGlobalAutoIncService *gais = nullptr; - if (OB_ISNULL(gais = MTL_WITH_CHECK_TENANT(ObGlobalAutoIncService *, msg.autoinc_key_.tenant_id_))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("global autoinc service is null", K(ret)); - } else if (OB_FAIL(gais->handle_push_autoinc_request(msg, sync_value))) { - LOG_WARN("post local gais push global request failed", KR(ret), K(server), K(msg)); - } else { - LOG_TRACE("post local gais push global request request success", K(msg), K(sync_value)); + const uint64_t tenant_id = msg.autoinc_key_.tenant_id_; + MTL_SWITCH(tenant_id) { + if (OB_ISNULL(gais = MTL(ObGlobalAutoIncService *))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("global autoinc service is null", K(ret)); + } else if (OB_FAIL(gais->handle_push_autoinc_request(msg, sync_value))) { + LOG_WARN("post local gais push global request failed", KR(ret), K(server), K(msg)); + } else { + LOG_TRACE("post local gais push global request request success", K(msg), K(sync_value)); + } } } else if (OB_FAIL(rpc_proxy_->to(server).by(msg.autoinc_key_.tenant_id_).timeout(timeout).push_autoinc_val(msg, sync_value))) { LOG_WARN("post remote push global request failed", KR(ret), K(server), K(msg)); @@ -261,13 +270,16 @@ int ObGAISRequestRpc::clear_autoinc_cache(const ObAddr &server, const ObGAISAuto } else if (server == self_) { // Use local calls instead of rpc ObGlobalAutoIncService *gais = nullptr; - if (OB_ISNULL(gais = MTL_WITH_CHECK_TENANT(ObGlobalAutoIncService *, msg.autoinc_key_.tenant_id_))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("global autoinc service is null", K(ret)); - } else if (OB_FAIL(gais->handle_clear_autoinc_cache_request(msg))) { - LOG_WARN("post local gais clear autoinc cache failed", KR(ret), K(server), K(msg)); - } else { - LOG_TRACE("clear autoinc cache success", K(server), K(msg)); + const uint64_t tenant_id = msg.autoinc_key_.tenant_id_; + MTL_SWITCH(tenant_id) { + if (OB_ISNULL(gais = MTL(ObGlobalAutoIncService *))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("global autoinc service is null", K(ret)); + } else if (OB_FAIL(gais->handle_clear_autoinc_cache_request(msg))) { + LOG_WARN("post local gais clear autoinc cache failed", KR(ret), K(server), K(msg)); + } else { + LOG_TRACE("clear autoinc cache success", K(server), K(msg)); + } } } else if (OB_FAIL(rpc_proxy_->to(server).by(msg.autoinc_key_.tenant_id_).timeout(timeout).clear_autoinc_cache(msg))) { LOG_WARN("post gais request failed", KR(ret), K(server), K(msg));