From 9bbbb315853ae9e577ed9f97a857355eaf2d8084 Mon Sep 17 00:00:00 2001 From: "wanhong.wwh" Date: Wed, 28 Jul 2021 22:12:13 +0800 Subject: [PATCH] WRS: add weak read version verification --- src/share/ob_define.h | 6 ++++ .../ob_tenant_weak_read_cluster_service.cpp | 11 ++++---- ...ob_tenant_weak_read_server_version_mgr.cpp | 15 ++++++---- .../ob_tenant_weak_read_service.cpp | 5 ++-- .../transaction/ob_weak_read_service.cpp | 28 +++++++++++-------- 5 files changed, 40 insertions(+), 25 deletions(-) diff --git a/src/share/ob_define.h b/src/share/ob_define.h index 2a0f9ae56..fad93e04c 100644 --- a/src/share/ob_define.h +++ b/src/share/ob_define.h @@ -193,6 +193,12 @@ OB_INLINE bool is_valid_membership_version(const int64_t membership_version) return membership_version >= 0; } +OB_INLINE bool is_valid_read_snapshot_version(const int64_t read_snapshot_version) +{ + // read snapshot version should be greater than 0 and should not be INT64_MAX + return read_snapshot_version > 0 && INT64_MAX != read_snapshot_version; +} + inline bool is_schema_error(int err) { bool ret = false; diff --git a/src/storage/transaction/ob_tenant_weak_read_cluster_service.cpp b/src/storage/transaction/ob_tenant_weak_read_cluster_service.cpp index 11819e44c..c9f9ca2d8 100644 --- a/src/storage/transaction/ob_tenant_weak_read_cluster_service.cpp +++ b/src/storage/transaction/ob_tenant_weak_read_cluster_service.cpp @@ -663,13 +663,12 @@ int ObTenantWeakReadClusterService::update_version(int64_t& affected_rows) } else if (OB_UNLIKELY(cur_leader_epoch != leader_epoch_)) { ret = OB_NOT_MASTER; CLUSTER_ISTAT("WRS leader changed when update CLUSTER version, need stop CLUSTER weak read service", - "tenant_id", - wrs_pkey_.get_tenant_id(), - K(cur_leader_epoch), - K(leader_epoch_)); - } else if (!check_can_update_version_()) { + "tenant_id", wrs_pkey_.get_tenant_id(), K(cur_leader_epoch), K(leader_epoch_)); + } else if (! check_can_update_version_()) { // check if can update version or not - } else if (OB_UNLIKELY((new_version = compute_version_(skipped_server_count, need_print)) <= 0)) { + } else if (FALSE_IT(new_version = compute_version_(skipped_server_count, need_print))) { + // nothing to do + } else if (OB_UNLIKELY(! is_valid_read_snapshot_version(new_version))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid CLUSTER weak read version", K(new_version), KR(ret)); } else { diff --git a/src/storage/transaction/ob_tenant_weak_read_server_version_mgr.cpp b/src/storage/transaction/ob_tenant_weak_read_server_version_mgr.cpp index e84561dec..242daa361 100644 --- a/src/storage/transaction/ob_tenant_weak_read_server_version_mgr.cpp +++ b/src/storage/transaction/ob_tenant_weak_read_server_version_mgr.cpp @@ -15,6 +15,8 @@ #include "share/ob_errno.h" #include "ob_tenant_weak_read_server_version_mgr.h" +#include "share/ob_define.h" // is_valid_read_snapshot_version + using namespace oceanbase::common; namespace oceanbase { namespace transaction { @@ -64,7 +66,8 @@ int ObTenantWeakReadServerVersionMgr::update_with_part_info(const uint64_t tenan const bool need_skip, const bool is_user_part, const int64_t version) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(epoch_tstamp <= 0) || OB_UNLIKELY(!need_skip && version <= 0)) { + if (OB_UNLIKELY(epoch_tstamp <= 0) + || OB_UNLIKELY(! need_skip && ! is_valid_read_snapshot_version(version))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument, epoch or version", K(ret), @@ -84,8 +87,9 @@ int ObTenantWeakReadServerVersionMgr::generate_new_version(const uint64_t tenant const int64_t base_version_when_no_valid_partition, const bool need_print_status) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(epoch_tstamp <= 0) || OB_UNLIKELY(OB_INVALID_ID == tenant_id) || - OB_UNLIKELY(base_version_when_no_valid_partition <= 0)) { + if (OB_UNLIKELY(epoch_tstamp <= 0) + || OB_UNLIKELY(OB_INVALID_ID == tenant_id) + || OB_UNLIKELY(! is_valid_read_snapshot_version(base_version_when_no_valid_partition))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", KR(ret), K(epoch_tstamp), K(tenant_id), K(base_version_when_no_valid_partition)); } @@ -214,8 +218,9 @@ int ObTenantWeakReadServerVersionMgr::ServerVersionInner::amend( int ObTenantWeakReadServerVersionMgr::ServerVersionInner::update(const ServerVersionInner& new_version) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(0 == new_version.epoch_tstamp_) || OB_UNLIKELY(epoch_tstamp_ >= new_version.epoch_tstamp_) || - OB_UNLIKELY(new_version.version_ <= 0)) { + if (OB_UNLIKELY(0 == new_version.epoch_tstamp_) + || OB_UNLIKELY(epoch_tstamp_ >= new_version.epoch_tstamp_) + || OB_UNLIKELY(! is_valid_read_snapshot_version(new_version.version_))) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid new server version", KR(ret), K(new_version), KPC(this)); } else { diff --git a/src/storage/transaction/ob_tenant_weak_read_service.cpp b/src/storage/transaction/ob_tenant_weak_read_service.cpp index cd302d67c..0eaf8b8ac 100644 --- a/src/storage/transaction/ob_tenant_weak_read_service.cpp +++ b/src/storage/transaction/ob_tenant_weak_read_service.cpp @@ -221,8 +221,9 @@ int ObTenantWeakReadService::get_cluster_version_by_rpc_(int64_t& version) } else if (OB_SUCCESS != resp.err_code_) { LOG_WARN("get weak read cluster version RPC return error", K(resp), K(tenant_id_), K(cluster_service_master)); ret = resp.err_code_; - } else if (OB_UNLIKELY(resp.version_ <= 0)) { - LOG_WARN("invalid weak read cluster version from RPC", K(resp), K(tenant_id_), K(cluster_service_master)); + } else if (OB_UNLIKELY(! is_valid_read_snapshot_version(resp.version_))) { + LOG_WARN("invalid weak read cluster version from RPC", K(resp), K(tenant_id_), + K(cluster_service_master)); ret = OB_INVALID_ERROR; } else { version = resp.version_; diff --git a/src/storage/transaction/ob_weak_read_service.cpp b/src/storage/transaction/ob_weak_read_service.cpp index fbe389c25..8ff0f57d6 100644 --- a/src/storage/transaction/ob_weak_read_service.cpp +++ b/src/storage/transaction/ob_weak_read_service.cpp @@ -99,11 +99,11 @@ int ObWeakReadService::get_server_version(const uint64_t tenant_id, int64_t& ver LOG_WARN("change tenant context fail when get weak read service server version", KR(ret), K(tenant_id)); } - if (OB_SUCC(ret) && version <= 0) { + if (OB_SUCC(ret) && ! is_valid_read_snapshot_version(version)) { int old_ret = ret; ret = OB_ERR_UNEXPECTED; - LOG_ERROR( - "get server version succ, but version not bigger than zero", K(ret), K(old_ret), K(tenant_id), K(version)); + LOG_ERROR("get server version succ, but version is not valid snapshot version", K(ret), K(old_ret), + K(tenant_id), K(version)); } LOG_DEBUG("[WRS] get_server_version", K(ret), K(tenant_id), K(version)); @@ -131,11 +131,11 @@ int ObWeakReadService::get_cluster_version(const uint64_t tenant_id, int64_t& ve LOG_WARN("change tenant context fail when get weak read service cluster version", KR(ret), K(tenant_id)); } - if (OB_SUCC(ret) && version <= 0) { + if (OB_SUCC(ret) && ! is_valid_read_snapshot_version(version)) { int old_ret = ret; ret = OB_ERR_UNEXPECTED; - LOG_ERROR( - "get cluster version succ, but version not bigger than zero", K(ret), K(old_ret), K(tenant_id), K(version)); + LOG_ERROR("get cluster version succ, but version is not valid snapshot version", K(ret), K(old_ret), + K(tenant_id), K(version)); } LOG_DEBUG("[WRS] get_cluster_version", K(ret), K(tenant_id), K(version)); @@ -328,15 +328,19 @@ int ObWeakReadService::handle_partition_(ObIPartitionGroup& part, bool& need_ski ObTenantWeakReadService* twrs = NULL; const ObPartitionKey& pkey = part.get_partition_key(); uint64_t tenant_id = pkey.get_tenant_id(); - ObPartitionService& ps = ObPartitionService::get_instance(); + ObPartitionService &ps = ObPartitionService::get_instance(); // tenant maybe not exist, pass second parameter to ignore WARN log - int64_t max_stale_time = - ObWeakReadUtil::max_stale_time_for_weak_consistency(tenant_id, ObWeakReadUtil::IGNORE_TENANT_EXIST_WARN); + int64_t max_stale_time = ObWeakReadUtil::max_stale_time_for_weak_consistency(tenant_id, + ObWeakReadUtil::IGNORE_TENANT_EXIST_WARN); // generate partition level weak read snapshot version - if (OB_FAIL( - ps.generate_partition_weak_read_snapshot_version(part, need_skip, is_user_part, version, max_stale_time))) { - LOG_WARN("generate partition weak read snapshot version fail", K(ret), K(pkey)); + if (OB_FAIL(ps.generate_partition_weak_read_snapshot_version(part, need_skip, + is_user_part, version, max_stale_time))) { + LOG_WARN("generate partition weak read snapshot version fail", KR(ret), K(pkey)); + } else if (OB_UNLIKELY(!need_skip && ! is_valid_read_snapshot_version(version))) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpected generated partitoin weak read snapshot version", + KR(ret), "pkey", part.get_partition_key(), K(is_user_part), K(version)); } else { // switch tenant, get tenant weak read service FETCH_ENTITY(TENANT_SPACE, tenant_id)