diff --git a/src/share/partition_table/ob_partition_location_cache.cpp b/src/share/partition_table/ob_partition_location_cache.cpp index 0a1159ab0a..5fe085112b 100644 --- a/src/share/partition_table/ob_partition_location_cache.cpp +++ b/src/share/partition_table/ob_partition_location_cache.cpp @@ -43,6 +43,7 @@ #include "share/ob_server_blacklist.h" #include "observer/ob_server_struct.h" #include "rootserver/ob_rs_async_rpc_proxy.h" +#include "share/cache/ob_cache_name_define.h" namespace oceanbase { using namespace common; @@ -319,6 +320,26 @@ int ObLocationLeaderCache::set_strong_leader_info( return ret; } +// OB_INVALID_TENANT_ID means flush all tenant's leader cache +int ObLocationLeaderCache::flush_cache(const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + for (int64_t i = 0; OB_SUCC(ret) && i < CACHE_NUM; i++) { + ObLocationLeaderInfo &info = buffer_[i]; + SpinWLockGuard guard(info.get_lock()); + ObLocationLeader *&value_ptr = info.get_value(); + if (OB_NOT_NULL(value_ptr)) { + const ObLocationCacheKey &key = value_ptr->get_key(); + if (OB_INVALID_TENANT_ID == tenant_id || tenant_id == extract_tenant_id(key.table_id_)) { + // reset cache + value_ptr->set_strong_leader_info(LocationInfo()); + LOG_TRACE("flush user leader cache", KR(ret), K(key)); + } + } + } + return ret; +} + /////////////////////////////////////////// bool ObILocationFetcher::treat_sql_as_timeout(const int error_code) { @@ -512,10 +533,7 @@ int ObILocationFetcher::fill_location(ObPartitionInfo& partition_info, ObPartiti location.set_renew_time(ObTimeUtility::current_time()); location.set_sql_renew_time(ObTimeUtility::current_time()); ObReplicaLocation leader; - if (location.size() <= 0) { - ret = OB_ENTRY_NOT_EXIST; - LOG_WARN("location is empty", K(ret), K(location)); - } else if (OB_FAIL(location.get_leader_by_election(leader))) { + if (OB_FAIL(location.get_leader_by_election(leader))) { if (OB_LOCATION_LEADER_NOT_EXIST == ret) { ret = OB_SUCCESS; LOG_DEBUG("location leader not exist", K(ret), K(location)); @@ -3263,6 +3281,7 @@ int ObPartitionLocationCache::renew_location(const uint64_t table_id, const int6 bool refresh_by_rpc = false; bool refresh_by_sql = false; + bool can_erase = false; bool exist_in_cache = true; // try to get from cache, maybe other thread has already fetched location if (OB_SUCC(ret)) { @@ -3378,6 +3397,7 @@ int ObPartitionLocationCache::renew_location(const uint64_t table_id, const int6 } } } else { + can_erase = true; EVENT_INC(LOCATION_CACHE_SQL_RENEW); } if (location.get_replica_locations().count() > 0) { @@ -3395,13 +3415,17 @@ int ObPartitionLocationCache::renew_location(const uint64_t table_id, const int6 } if (OB_SUCC(ret)) { - if (OB_FAIL(update_location(table_id, partition_id, cluster_id, new_location))) { - LOG_WARN("update location in cache failed", K(ret), KT(table_id), K(partition_id), K(new_location)); + if (OB_FAIL(update_location(table_id, partition_id, cluster_id, can_erase, new_location))) { + LOG_WARN( + "update location in cache failed", K(ret), KT(table_id), K(partition_id), K(can_erase), K(new_location)); } else if (result_filter_not_readable_replica && OB_FAIL(location.assign_with_only_readable_replica(new_location))) { LOG_WARN("assign with only readable replica fail", K(ret), K(location), K(new_location)); } else if (!result_filter_not_readable_replica && OB_FAIL(location.assign(new_location))) { LOG_WARN("assign location fail", K(ret), K(location), K(new_location)); + } else if (location.size() <= 0) { + ret = OB_ENTRY_NOT_EXIST; + LOG_WARN("location is empty", KR(ret), K(location)); } } } else { @@ -3492,7 +3516,8 @@ int ObPartitionLocationCache::check_skip_rpc_renew_v2(const ObPartitionLocation& } int ObPartitionLocationCache::update_location( - const uint64_t table_id, const int64_t partition_id, const int64_t cluster_id, const ObPartitionLocation& location) + const uint64_t table_id, const int64_t partition_id, const int64_t cluster_id, + const bool can_erase, const ObPartitionLocation& location) { int ret = OB_SUCCESS; if (!is_inited_) { @@ -3501,6 +3526,13 @@ int ObPartitionLocationCache::update_location( } else if (!ObIPartitionTable::is_valid_key(table_id, partition_id) || !location.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), KT(table_id), K(partition_id), K(location)); + } else if (location.size() <= 0) { + if (!can_erase) { + ret = OB_ENTRY_NOT_EXIST; + LOG_WARN("location is empty", KR(ret), K(table_id), K(partition_id), K(cluster_id), K(location)); + } else if (OB_FAIL(erase_location(table_id, partition_id, cluster_id))) { + LOG_WARN("fail to erase location", KR(ret), K(table_id), K(partition_id), K(cluster_id)); + } } else { ObLocationCacheKey cache_key(table_id, partition_id, cluster_id); if (use_sys_cache(table_id)) { @@ -3565,6 +3597,45 @@ int ObPartitionLocationCache::update_location( return ret; } +int ObPartitionLocationCache::erase_location( + const uint64_t table_id, const int64_t partition_id, const int64_t cluster_id) +{ + int ret = OB_SUCCESS; + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else if (!ObIPartitionTable::is_valid_key(table_id, partition_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), KT(table_id), K(partition_id), K(cluster_id)); + } else if (use_sys_cache(table_id)) { + // use_sys_cache() includes user_sys_leader_cache(), and sys cache shouldn't be erased. + } else { + ObLocationCacheKey cache_key(table_id, partition_id, cluster_id); + // try erase user leader cache + if (cluster_id_ == cluster_id) { + LocationInfo leader_info; + int tmp_ret = leader_cache_.get_strong_leader_info(cache_key, leader_info); + if (OB_SUCCESS == tmp_ret) { + leader_info.reset(); + tmp_ret = leader_cache_.set_strong_leader_info(cache_key, leader_info, false /*force update*/); + LOG_TRACE("erase user leader cache", KR(tmp_ret), K(cache_key)); + } + } + // try erase user location cache + if (OB_FAIL(user_cache_.erase(cache_key))) { + if (OB_ENTRY_NOT_EXIST == ret) { + ret = OB_SUCCESS; + LOG_TRACE("user location cache not exist", K(cache_key)); + } else { + LOG_WARN("fail to erase user location cache", KR(ret), K(cache_key)); + } + } else { + LOG_TRACE("erase user location cache", K(cache_key)); + } + } + return ret; +} + int ObPartitionLocationCache::clear_location( const uint64_t table_id, const int64_t partition_id, const int64_t expire_renew_time, const int64_t cluster_id) { @@ -4407,6 +4478,7 @@ int ObPartitionLocationCache::batch_renew_location(const common::ObIArray 0) { + bool can_erase = false; ObSEArray new_locations; common::ObTimeoutCtx ctx; if (OB_FAIL(set_batch_timeout_ctx(locations.count(), renew_type, ctx))) { @@ -4459,6 +4531,7 @@ int ObPartitionLocationCache::batch_renew_location(const common::ObIArrayget_table_id(), new_location->get_partition_id(), cluster_id, *new_location))) { + } else if (OB_FAIL(update_location(new_location->get_table_id(), + new_location->get_partition_id(), + cluster_id, + can_erase, + *new_location))) { LOG_WARN("fail to update location", K(ret), KPC(new_location)); } if (OB_SUCC(ret)) { @@ -4731,5 +4807,123 @@ int64_t ObPartitionLocationCache::get_primary_cluster_id() const return cluster_id; } +int ObPartitionLocationCache::LeaderCacheKeyGetter::operator()( + common::hash::HashMapPair &entry) +{ + int ret = OB_SUCCESS; + const ObLocationCacheKey &key = entry.first; + const uint64_t tenant_id = extract_tenant_id(key.table_id_); + if (OB_INVALID_TENANT_ID == tenant_id_ || tenant_id_ == tenant_id) { + if (OB_FAIL(keys_.push_back(key))) { + LOG_WARN("fail to push back key", KR(ret), K(key)); + } + } + return ret; +} + +int ObPartitionLocationCache::LocationCacheKeyGetter::operator()( + common::hash::HashMapPair &entry) +{ + int ret = OB_SUCCESS; + const ObLocationCacheKey &key = entry.first; + const uint64_t tenant_id = extract_tenant_id(key.table_id_); + if (OB_INVALID_TENANT_ID == tenant_id_ || tenant_id_ == tenant_id) { + if (OB_FAIL(keys_.push_back(key))) { + LOG_WARN("fail to push back key", KR(ret), K(key)); + } + } + return ret; +} + +// OB_INVALID_TENANT_ID means flush all tenant's location cache +int ObPartitionLocationCache::flush_cache(const uint64_t tenant_id) +{ + int ret = OB_SUCCESS; + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret), K(tenant_id)); + } else { + // 1. flush sys cache, overwrite ret + int64_t start_time = ObTimeUtility::fast_current_time(); + LOG_INFO("begin flush sys location cache", K(tenant_id)); + LocationCacheKeyGetter location_key_getter(tenant_id); + if (OB_FAIL(sys_cache_.foreach_refactored(location_key_getter))) { + LOG_WARN("fail to get location cache key", KR(ret), K(tenant_id)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < location_key_getter.get_keys().count(); i++) { + const ObLocationCacheKey &key = location_key_getter.get_keys().at(i); + if (OB_FAIL(sys_cache_.erase_refactored(key))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_SUCCESS; + LOG_TRACE("sys cache not exist, just skip", KR(ret), K(key)); + } else { + LOG_WARN("fail to erase sys cache", KR(ret), K(key)); + } + } else { + LOG_TRACE("erase sys cache", KR(ret), K(key)); + } + } // end for + } + LOG_INFO("finish flush sys location cache", + KR(ret), + K(tenant_id), + "cost_ts", + ObTimeUtility::fast_current_time() - start_time); + + // 2. flush sys leader cache, overwrite ret + start_time = ObTimeUtility::fast_current_time(); + LOG_INFO("begin flush sys leader cache", K(tenant_id)); + LeaderCacheKeyGetter leader_key_getter(tenant_id); + if (OB_FAIL(sys_leader_cache_.foreach_refactored(leader_key_getter))) { + LOG_WARN("fail to get leader cache key", KR(ret), K(tenant_id)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < leader_key_getter.get_keys().count(); i++) { + const ObLocationCacheKey &key = leader_key_getter.get_keys().at(i); + if (OB_FAIL(sys_leader_cache_.erase_refactored(key))) { + if (OB_HASH_NOT_EXIST == ret) { + ret = OB_SUCCESS; + LOG_TRACE("sys leader cache not exist, just skip", KR(ret), K(key)); + } else { + LOG_WARN("fail to erase sys leader cache", KR(ret), K(key)); + } + } else { + LOG_TRACE("erase sys leader cache", KR(ret), K(key)); + } + } // end for + } + LOG_INFO("finish flush sys leader cache", + KR(ret), + K(tenant_id), + "cost_ts", + ObTimeUtility::fast_current_time() - start_time); + + // 3. flush user cache, overwrite ret + // user cache store in sys tenant, so we always flush all tenant's user location cache. + start_time = ObTimeUtility::fast_current_time(); + LOG_INFO("begin flush user location cache", K(tenant_id)); + if (OB_FAIL(common::ObKVGlobalCache::get_instance().erase_cache(OB_LOCATION_CACHE_NAME))) { + LOG_WARN("fail to flush user location cache", KR(ret), K(tenant_id)); + } + LOG_INFO("finish flush user location cache", + KR(ret), + K(tenant_id), + "cost_ts", + ObTimeUtility::fast_current_time() - start_time); + + // 4. flush user leader cache, overwrite ret + start_time = ObTimeUtility::fast_current_time(); + LOG_INFO("begin flush user leader cache", K(tenant_id)); + if (OB_FAIL(leader_cache_.flush_cache(tenant_id))) { + LOG_WARN("fail to flush user leader cache", KR(ret), K(tenant_id)); + } + LOG_INFO("finish flush user leader cache", + KR(ret), + K(tenant_id), + "cost_ts", + ObTimeUtility::fast_current_time() - start_time); + } + return ret; +} + } // end namespace share } // end namespace oceanbase diff --git a/src/share/partition_table/ob_partition_location_cache.h b/src/share/partition_table/ob_partition_location_cache.h index 0a7cce686d..f626784613 100644 --- a/src/share/partition_table/ob_partition_location_cache.h +++ b/src/share/partition_table/ob_partition_location_cache.h @@ -81,6 +81,11 @@ public: {} public: + void reset() + { + server_.reset(); + renew_ts_ = 0; + } bool is_valid() const { return server_.is_valid() && renew_ts_ > 0; @@ -614,6 +619,7 @@ public: {} int get_strong_leader_info(const ObLocationCacheKey& key, LocationInfo& location_info); int set_strong_leader_info(const ObLocationCacheKey& key, const LocationInfo& location_info, bool force_update); + int flush_cache(const uint64_t tenant_id); private: static const int64_t CACHE_NUM = 10000; @@ -694,6 +700,48 @@ public: }; explicit ObPartitionLocationCache(ObILocationFetcher& location_fetcher); + class LeaderCacheKeyGetter { + public: + LeaderCacheKeyGetter() : tenant_id_(common::OB_INVALID_TENANT_ID), keys_() + {} + LeaderCacheKeyGetter(const uint64_t tenant_id) : tenant_id_(tenant_id), keys_() + {} + ~LeaderCacheKeyGetter() + {} + int operator()(common::hash::HashMapPair &entry); + const common::ObIArray &get_keys() const + { + return keys_; + } + + private: + // OB_INVALID_TENANT_ID means get all tenant's location key + uint64_t tenant_id_; + common::ObArray keys_; + DISALLOW_COPY_AND_ASSIGN(LeaderCacheKeyGetter); + }; + + class LocationCacheKeyGetter { + public: + LocationCacheKeyGetter() : tenant_id_(common::OB_INVALID_TENANT_ID), keys_() + {} + LocationCacheKeyGetter(const uint64_t tenant_id) : tenant_id_(tenant_id), keys_() + {} + ~LocationCacheKeyGetter() + {} + int operator()(common::hash::HashMapPair &entry); + const common::ObIArray &get_keys() const + { + return keys_; + } + + private: + // OB_INVALID_TENANT_ID means get all tenant's location key + uint64_t tenant_id_; + common::ObArray keys_; + DISALLOW_COPY_AND_ASSIGN(LocationCacheKeyGetter); + }; + virtual ~ObPartitionLocationCache(); int init(share::schema::ObMultiVersionSchemaService& schema_service, common::ObServerConfig& config, @@ -784,6 +832,8 @@ public: const ObPartitionLocation& location, char* buf, const int64_t buf_size, ObLocationCacheValue& cache_value); static const int64_t OB_MAX_LOCATION_SERIALIZATION_SIZE = common::OB_MALLOC_BIG_BLOCK_SIZE; + int flush_cache(const uint64_t tenant_id); + private: int remote_get(const common::ObPartitionKey& pkey, ObPartitionLocation& location); @@ -799,6 +849,7 @@ private: /*-----batch async renew location end -----*/ private: static const int64_t OB_SYS_LOCATION_CACHE_BUCKET_NUM = 512; + // default mode is LatchReadWriteDefendMode typedef common::hash::ObHashMap NoSwapCache; typedef common::hash::ObHashMap NoSwapLeaderCache; typedef common::ObKVCache KVCache; @@ -829,7 +880,8 @@ private: // update location in cache int update_location(const uint64_t table_id, const int64_t partition_id, const int64_t cluster_id, - const ObPartitionLocation& location); + const bool can_erase, const ObPartitionLocation& location); + int erase_location(const uint64_t table_id, const int64_t partition_id, const int64_t cluster_id); // clear location in cache int clear_location( const uint64_t table_id, const int64_t partiton_id, const int64_t expire_renew_time, const int64_t cluster_id); diff --git a/src/sql/engine/cmd/ob_alter_system_executor.cpp b/src/sql/engine/cmd/ob_alter_system_executor.cpp index 2dfba30e6c..7584365dfe 100644 --- a/src/sql/engine/cmd/ob_alter_system_executor.cpp +++ b/src/sql/engine/cmd/ob_alter_system_executor.cpp @@ -177,13 +177,33 @@ int ObFlushCacheExecutor::execute(ObExecContext& ctx, ObFlushCacheStmt& stmt) case CACHE_TYPE_BLOCK: case CACHE_TYPE_ROW: case CACHE_TYPE_BLOOM_FILTER: - case CACHE_TYPE_LOCATION: case CACHE_TYPE_CLOG: case CACHE_TYPE_ILOG: case CACHE_TYPE_SCHEMA: { ret = OB_NOT_SUPPORTED; LOG_WARN("cache type not supported flush", "type", stmt.flush_cache_arg_.cache_type_, K(ret)); } break; + case CACHE_TYPE_LOCATION: { + share::ObPartitionLocationCache *location_cache = GCTX.location_cache_; + if (OB_ISNULL(location_cache)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("location cache ptr is null", KR(ret)); + } else if (0 == tenant_num) { + if (OB_FAIL(location_cache->flush_cache(OB_INVALID_TENANT_ID))) { + LOG_WARN("fail to flush all location cache", KR(ret)); + } + } else { + int64_t tenant_num = stmt.flush_cache_arg_.tenant_ids_.count(); + for (int64_t i = 0; i < tenant_num; i++) { // ingore error + const uint64_t tenant_id = stmt.flush_cache_arg_.tenant_ids_.at(i); + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = location_cache->flush_cache(tenant_id))) { + LOG_WARN("fail to flush tenant's cache", KR(ret), K(tenant_id)); + } + ret = OB_SUCC(ret) ? tmp_ret : ret; + } // end for + } + } break; default: { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid cache type", "type", stmt.flush_cache_arg_.cache_type_);