Clear ls location cache for dropped tenants
This commit is contained in:
parent
a51696bce7
commit
8924e7291b
@ -217,8 +217,10 @@ TEST_F(TestLocationService, test_location_service)
|
||||
|
||||
TEST_F(TestLocationService, test_check_ls_exist)
|
||||
{
|
||||
// create tenant
|
||||
uint64_t user_tenant_id = OB_INVALID_TENANT_ID;
|
||||
ASSERT_EQ(OB_SUCCESS, get_tenant_id(user_tenant_id));
|
||||
ASSERT_EQ(OB_SUCCESS, create_tenant("tt2"));
|
||||
ASSERT_EQ(OB_SUCCESS, get_tenant_id(user_tenant_id, "tt2"));
|
||||
uint64_t meta_tenant_id = gen_meta_tenant_id(user_tenant_id);
|
||||
|
||||
ObLSID user_ls_id(1001);
|
||||
@ -299,6 +301,20 @@ TEST_F(TestLocationService, test_check_ls_exist)
|
||||
state.reset();
|
||||
ASSERT_EQ(OB_SUCCESS, ObLocationService::check_ls_exist(meta_tenant_id, SYS_LS, state));
|
||||
ASSERT_TRUE(state.is_uncreated());
|
||||
|
||||
// reset
|
||||
ASSERT_EQ(OB_SUCCESS, delete_tenant("tt2"));
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("alter system set_tp tp_name = EN_CHECK_LS_EXIST_WITH_TENANT_NOT_NORMAL, error_code = 0, frequency = 0"));
|
||||
ASSERT_EQ(OB_SUCCESS, inner_proxy.write(OB_SYS_TENANT_ID, sql.ptr(), affected_rows));
|
||||
bool tenant_exist = true;
|
||||
int ret = OB_SUCCESS;
|
||||
while (true == tenant_exist && OB_SUCC(ret)) {
|
||||
if (OB_FAIL(check_tenant_exist(tenant_exist, "tt2"))) {
|
||||
SERVER_LOG(WARN, "check_tenant_exist failed", K(ret));
|
||||
} else {
|
||||
usleep(1_s);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(TestLocationService, test_clear_tablet_ls_cache)
|
||||
@ -342,6 +358,7 @@ TEST_F(TestLocationService, test_clear_tablet_ls_cache)
|
||||
ASSERT_EQ(OB_SUCCESS, batch_create_table(oracle_sql_proxy, TABLET_COUNT, true, tablet_ls_pairs));
|
||||
ASSERT_TRUE(TABLET_COUNT == tablet_ls_pairs.count());
|
||||
const int64_t cache_size_before_renew = tablet_ls_service->inner_cache_.size();
|
||||
ASSERT_TRUE(cache_size_before_renew > 0);
|
||||
ObArenaAllocator allocator;
|
||||
ObList<ObTabletID, ObIAllocator> tablet_list(allocator);
|
||||
ObSEArray<ObTabletLSCache, TABLET_COUNT> tablet_ls_caches;
|
||||
@ -359,7 +376,7 @@ TEST_F(TestLocationService, test_clear_tablet_ls_cache)
|
||||
ASSERT_EQ(OB_SUCCESS, delete_tenant("oracle"));
|
||||
ASSERT_EQ(OB_SUCCESS, tablet_ls_service->clear_expired_cache());
|
||||
cache_size = tablet_ls_service->inner_cache_.size();
|
||||
ASSERT_TRUE(cache_size == cache_size_before_renew);
|
||||
ASSERT_TRUE(cache_size_before_renew == cache_size);
|
||||
|
||||
// test 1 million cache clear
|
||||
const bool update_only = false;
|
||||
@ -373,10 +390,45 @@ TEST_F(TestLocationService, test_clear_tablet_ls_cache)
|
||||
const int64_t start_time = ObTimeUtility::current_time();
|
||||
ASSERT_EQ(OB_SUCCESS, tablet_ls_service->clear_expired_cache());
|
||||
cache_size = tablet_ls_service->inner_cache_.size();
|
||||
ASSERT_TRUE(cache_size = cache_size_before_renew);
|
||||
ASSERT_TRUE(cache_size_before_renew == cache_size);
|
||||
LOG_INFO("TEST: clear 1 million cache", "cost_time", ObTimeUtility::current_time() - start_time); // cost_time = 1.67s
|
||||
}
|
||||
|
||||
TEST_F(TestLocationService, test_clear_ls_location)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t user_tenant_id = OB_INVALID_TENANT_ID;
|
||||
ASSERT_EQ(OB_SUCCESS, get_tenant_id(user_tenant_id, "tt1"));
|
||||
ASSERT_TRUE(is_user_tenant(user_tenant_id));
|
||||
const uint64_t meta_tenant_id = gen_meta_tenant_id(user_tenant_id);
|
||||
ObLocationService *location_service = GCTX.location_service_;
|
||||
ASSERT_TRUE(OB_NOT_NULL(location_service));
|
||||
ObLSLocationService *ls_location_service = &(location_service->ls_location_service_);
|
||||
ASSERT_TRUE(OB_NOT_NULL(ls_location_service));
|
||||
const ObLSID &user_ls_id = ObLSID(1001);
|
||||
ObLSLocation location;
|
||||
// assert caches exist
|
||||
usleep(ls_location_service->RENEW_LS_LOCATION_INTERVAL_US);
|
||||
ASSERT_EQ(OB_SUCCESS, ls_location_service->get_from_cache_(GCONF.cluster_id, user_tenant_id, user_ls_id, location));
|
||||
ASSERT_TRUE(location.get_cache_key() == ObLSLocationCacheKey(GCONF.cluster_id, user_tenant_id, user_ls_id));
|
||||
location.reset();
|
||||
ASSERT_EQ(OB_SUCCESS, ls_location_service->get_from_cache_(GCONF.cluster_id, meta_tenant_id, SYS_LS, location));
|
||||
ASSERT_TRUE(location.get_cache_key() == ObLSLocationCacheKey(GCONF.cluster_id, meta_tenant_id, SYS_LS));
|
||||
|
||||
// drop tenant force
|
||||
ASSERT_EQ(OB_SUCCESS, delete_tenant("tt1"));
|
||||
// meta tenant is dropped in schema and user tenant unit has been gc
|
||||
bool is_dropped = false;
|
||||
ASSERT_EQ(OB_SUCCESS, GSCHEMASERVICE.check_if_tenant_has_been_dropped(meta_tenant_id, is_dropped));
|
||||
ASSERT_TRUE(is_dropped);
|
||||
|
||||
// auto clear caches successfully
|
||||
usleep(ls_location_service->CLEAR_CACHE_INTERVAL);
|
||||
usleep(ls_location_service->RENEW_LS_LOCATION_BY_RPC_INTERVAL_US + GCONF.rpc_timeout);
|
||||
ASSERT_EQ(OB_CACHE_NOT_HIT, ls_location_service->get_from_cache_(GCONF.cluster_id, user_tenant_id, user_ls_id, location));
|
||||
ASSERT_EQ(OB_CACHE_NOT_HIT, ls_location_service->get_from_cache_(GCONF.cluster_id, meta_tenant_id, SYS_LS, location));
|
||||
}
|
||||
|
||||
} // namespace rootserver
|
||||
} // namespace oceanbase
|
||||
int main(int argc, char **argv)
|
||||
|
@ -201,7 +201,10 @@ TEST_F(TestTabletAutoincMgr, test_lob_tablet_autoinc_location_cache)
|
||||
|
||||
// remove source ls and clear src ls cache
|
||||
ASSERT_EQ(OB_SUCCESS, MTL(ObLSService*)->remove_ls(src_ls_id));
|
||||
ASSERT_EQ(OB_SUCCESS, ls_location_service->erase_location_(GCONF.cluster_id, g_tenant_id, src_ls_id));
|
||||
ObLSLocationCacheKey cache_key(GCONF.cluster_id, g_tenant_id, src_ls_id);
|
||||
ObLSLocation tmp_loc;
|
||||
ASSERT_EQ(OB_SUCCESS, ls_location_service->inner_cache_.del(cache_key, 0/*safe_delete_time*/));
|
||||
ASSERT_EQ(OB_LS_LOCATION_NOT_EXIST, ls_location_service->nonblock_get(GCONF.cluster_id, g_tenant_id, src_ls_id, tmp_loc));
|
||||
|
||||
// insert lob
|
||||
ASSERT_EQ(OB_SUCCESS, sql.assign_fmt("insert into t1 values (2, repeat('abcde0123456789', 1000));"));
|
||||
|
@ -189,7 +189,7 @@ int ObLSLocationMap::get(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSLocationMap::del(const ObLSLocationCacheKey &key)
|
||||
int ObLSLocationMap::del(const ObLSLocationCacheKey &key, const int64_t safe_delete_time)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSLocation *prev = NULL;
|
||||
@ -214,6 +214,9 @@ int ObLSLocationMap::del(const ObLSLocationCacheKey &key)
|
||||
|
||||
if (OB_ISNULL(ls_location)) {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
} else if (ObTimeUtility::current_time() - ls_location->get_renew_time() <= safe_delete_time) {
|
||||
// must use ObTimeUtil::current_time() for clock source unification of renew_time
|
||||
ret = OB_NEED_WAIT;
|
||||
} else {
|
||||
if (OB_ISNULL(prev)) {
|
||||
// the first node
|
||||
|
@ -70,7 +70,7 @@ public:
|
||||
const ObLSLocationCacheKey &key,
|
||||
ObLSLocation &ls_location);
|
||||
int get(const ObLSLocationCacheKey &key, ObLSLocation &location) const;
|
||||
int del(const ObLSLocationCacheKey &key);
|
||||
int del(const ObLSLocationCacheKey &key, const int64_t safe_delete_time);
|
||||
int check_and_generate_dead_cache(ObLSLocationArray &arr);
|
||||
int get_all(ObLSLocationArray &arr);
|
||||
int64_t size() { return size_; }
|
||||
|
@ -256,6 +256,8 @@ int ObLSLocationService::start()
|
||||
DUMP_CACHE_INTERVAL_US,
|
||||
false/*repeat*/))) {
|
||||
LOG_WARN("ObLSLocationService timer schedule dump_cache_timer_task failed", KR(ret));
|
||||
} else {
|
||||
last_cache_clear_ts_ = ObTimeUtility::current_time();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -693,7 +695,7 @@ int ObLSLocationService::check_and_clear_dead_cache()
|
||||
// do not clear sys tenant ls location cache
|
||||
} else if (OB_FAIL(hash.get_refactored(key, exist))) {
|
||||
if (OB_HASH_NOT_EXIST == ret) {
|
||||
if (OB_FAIL(inner_cache_.del(ls_cache_key))) {
|
||||
if (OB_FAIL(inner_cache_.del(ls_cache_key, 0/*safe_delete_time*/))) {
|
||||
LOG_WARN("inner cache del error", KR(ret), "ls_location", total_arr.at(i));
|
||||
} else {
|
||||
LOG_INFO("del ls location cache succ", "ls_location_cache", total_arr.at(i));
|
||||
@ -719,8 +721,15 @@ int ObLSLocationService::renew_all_ls_locations()
|
||||
int ret = OB_SUCCESS;
|
||||
int ret_fail = OB_SUCCESS;
|
||||
ObArray<uint64_t> tenant_ids;
|
||||
bool sys_tenant_schema_ready = false;
|
||||
if (OB_FAIL(check_inner_stat_())) {
|
||||
LOG_WARN("fail to check inner stat", KR(ret));
|
||||
} else if (FALSE_IT(sys_tenant_schema_ready = schema_service_->is_tenant_refreshed(OB_SYS_TENANT_ID))) {
|
||||
} else if (!sys_tenant_schema_ready) {
|
||||
// sys tenant schema may be not ready when starting observer
|
||||
if (REACH_TIME_INTERVAL(10 * 1000 * 1000L)) { // 10s
|
||||
FLOG_INFO("can not renew all ls locations because sys tenant schema is not ready", KR(ret));
|
||||
}
|
||||
} else if (OB_FAIL(schema_service_->get_tenant_ids(tenant_ids))) {
|
||||
LOG_WARN("get tenant_ids failed", KR(ret));
|
||||
} else {
|
||||
@ -742,6 +751,15 @@ int ObLSLocationService::renew_all_ls_locations()
|
||||
} // end ARRAY_FOREACH_NORET
|
||||
ret = OB_FAIL(ret) ? ret : ret_fail;
|
||||
}
|
||||
// try clear ls location caches whose tenant is dropped
|
||||
if (OB_FAIL(ret) || !sys_tenant_schema_ready) {
|
||||
} else if (ObTimeUtil::current_time() - last_cache_clear_ts_ > CLEAR_CACHE_INTERVAL) {
|
||||
if (OB_FAIL(try_clear_dropped_tenant_caches_())) {
|
||||
LOG_WARN("try clear dropped tenant caches failed", KR(ret), K(last_cache_clear_ts_));
|
||||
} else {
|
||||
last_cache_clear_ts_ = ObTimeUtil::current_time();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -928,7 +946,7 @@ int ObLSLocationService::batch_update_caches_(
|
||||
} else if (new_location.get_replica_locations().empty()) {
|
||||
if (!can_erase) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(erase_location_(cluster_id, ls_info.get_tenant_id(), ls_info.get_ls_id()))) {
|
||||
} else if (OB_FAIL(erase_location_safely_(cluster_id, ls_info.get_tenant_id(), ls_info.get_ls_id()))) {
|
||||
LOG_WARN("fail to erase location", KR(ret), K(cluster_id), K(ls_info));
|
||||
}
|
||||
} else if (OB_FAIL(update_cache_(
|
||||
@ -1082,7 +1100,7 @@ int ObLSLocationService::update_cache_(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSLocationService::erase_location_(
|
||||
int ObLSLocationService::erase_location_safely_(
|
||||
const int64_t cluster_id,
|
||||
const uint64_t tenant_id,
|
||||
const ObLSID &ls_id)
|
||||
@ -1096,16 +1114,22 @@ int ObLSLocationService::erase_location_(
|
||||
} else if (is_sys_tenant(tenant_id)) {
|
||||
// location of sys ls shouldn't be erased
|
||||
} else {
|
||||
// can not erase the location just detected by RPC
|
||||
const int64_t safe_delete_time = RENEW_LS_LOCATION_BY_RPC_INTERVAL_US + GCONF.rpc_timeout;
|
||||
ObLSLocationCacheKey cache_key(cluster_id, tenant_id, ls_id);
|
||||
if (OB_FAIL(inner_cache_.del(cache_key))) {
|
||||
if (OB_FAIL(inner_cache_.del(cache_key, safe_delete_time))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
LOG_TRACE("not exist in inner_cache_", K(cache_key));
|
||||
} else if (OB_NEED_WAIT == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
LOG_TRACE("can not delete cache because safe_delete_time has not been reached",
|
||||
K(cache_key), K(safe_delete_time));
|
||||
} else {
|
||||
LOG_WARN("fail to erase location from inner_cache_", KR(ret), K(cache_key));
|
||||
}
|
||||
} else {
|
||||
LOG_TRACE("erase location from inner_cache_", K(cache_key));
|
||||
LOG_INFO("[LS_LOCATION] erase ls location successfully", K(cache_key));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -1257,5 +1281,69 @@ int ObLSLocationService::batch_renew_ls_locations(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLSLocationService::try_clear_dropped_tenant_caches_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSLocationArray all_caches;
|
||||
ObArray<uint64_t> dropped_tenant_ids;
|
||||
hash::ObHashSet<uint64_t> dropped_tenant_set;
|
||||
if (OB_FAIL(check_inner_stat_())) {
|
||||
LOG_WARN("fail to check inner stat", KR(ret));
|
||||
} else if (OB_FAIL(schema_service_->get_dropped_tenant_ids(dropped_tenant_ids))) {
|
||||
LOG_WARN("get dropped tenant_ids failed", KR(ret));
|
||||
} else if (dropped_tenant_ids.empty()) {
|
||||
// no tenant is dropped, do nothing
|
||||
} else if (OB_FAIL(all_caches.reserve(inner_cache_.size()))) {
|
||||
LOG_WARN("fail to reserve all_caches", KR(ret), "size", inner_cache_.size());
|
||||
} else if (OB_FAIL(inner_cache_.get_all(all_caches))) {
|
||||
LOG_WARN("get all inner cache failed", KR(ret));
|
||||
} else if (OB_FAIL(dropped_tenant_set.create(dropped_tenant_ids.count()))) {
|
||||
LOG_WARN("create failed", KR(ret), "count", dropped_tenant_ids.count());
|
||||
} else {
|
||||
// use hashset to improve performance
|
||||
ARRAY_FOREACH(dropped_tenant_ids, idx) {
|
||||
const uint64_t tenant_id = dropped_tenant_ids.at(idx);
|
||||
if (is_user_tenant(tenant_id) || is_meta_tenant(tenant_id)) {
|
||||
if (OB_FAIL(dropped_tenant_set.set_refactored(tenant_id))) {
|
||||
// OB_HASH_EXIST is also unexpected
|
||||
LOG_WARN("set_refactored failed", KR(ret), K(idx), K(tenant_id), K(dropped_tenant_ids));
|
||||
}
|
||||
}
|
||||
}
|
||||
ARRAY_FOREACH(all_caches, idx) {
|
||||
const ObLSLocationCacheKey &cache_key = all_caches.at(idx).get_cache_key();
|
||||
const uint64_t tenant_id = cache_key.get_tenant_id();
|
||||
if (OB_ISNULL(dropped_tenant_set.get(tenant_id))) {
|
||||
// not dropped tenant, do nothing
|
||||
} else if (is_user_tenant(tenant_id)) {
|
||||
// the cache of user tenant ls location can not be erased until it's meta tenant has been dropped
|
||||
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
|
||||
if (OB_ISNULL(dropped_tenant_set.get(meta_tenant_id))) {
|
||||
// meta tenant exists, do nothing
|
||||
} else if (OB_FAIL(erase_location_safely_(
|
||||
cache_key.get_cluster_id(),
|
||||
cache_key.get_tenant_id(),
|
||||
cache_key.get_ls_id()))) {
|
||||
LOG_WARN("erase location failed", KR(ret), K(cache_key), K(meta_tenant_id));
|
||||
}
|
||||
} else if (is_meta_tenant(tenant_id)) {
|
||||
// the cache of meta tenant can not be erased until it is removed from ls meta table in sys
|
||||
ObLSLocation tmp_loc;
|
||||
if (OB_FAIL(renew_location_(
|
||||
cache_key.get_cluster_id(),
|
||||
cache_key.get_tenant_id(),
|
||||
cache_key.get_ls_id(),
|
||||
tmp_loc))) {
|
||||
LOG_WARN("renew location failed", KR(ret), K(cache_key));
|
||||
}
|
||||
} else { // other tenant
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("there should be only user or meta tenant", KR(ret), K(cache_key));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // end namespace share
|
||||
} // end namespace oceanbase
|
||||
|
@ -216,10 +216,11 @@ private:
|
||||
const uint64_t tenant_id,
|
||||
const ObLSID &ls_id,
|
||||
ObLSLocation &location);
|
||||
int erase_location_(
|
||||
int erase_location_safely_(
|
||||
const int64_t cluster_id,
|
||||
const uint64_t tenant_id,
|
||||
const ObLSID &ls_id);
|
||||
int try_clear_dropped_tenant_caches_();
|
||||
int build_tenant_ls_info_hash_(ObTenantLsInfoHashMap &hash);
|
||||
int construct_rpc_dests_(common::ObIArray<common::ObAddr> &addrs);
|
||||
int detect_ls_leaders_(
|
||||
@ -235,6 +236,7 @@ private:
|
||||
static const int64_t RENEW_LS_LOCATION_INTERVAL_US = 5 * 1000 * 1000L; // 5s
|
||||
static const int64_t RENEW_LS_LOCATION_BY_RPC_INTERVAL_US = 1000 * 1000L; // 1s
|
||||
static const int64_t DUMP_CACHE_INTERVAL_US = 10 * 1000 * 1000L; // 10s
|
||||
static const int64_t CLEAR_CACHE_INTERVAL = 60 * 1000 * 1000L; // 1m
|
||||
|
||||
bool inited_;
|
||||
bool stopped_;
|
||||
|
Loading…
x
Reference in New Issue
Block a user