free new mgr when assign failed and switch back allocator when memory not enough

This commit is contained in:
obdev
2023-07-11 18:18:17 +00:00
committed by ob-robot
parent 21e7012901
commit 871b092354
6 changed files with 274 additions and 207 deletions

View File

@ -755,57 +755,14 @@ int ObMultiVersionSchemaService::put_fallback_schema_to_slot(ObSchemaMgr *&new_m
"schema_version", new_mgr->get_schema_version(),
"eliminated_schema_version", NULL != eli_schema_mgr ?
eli_schema_mgr->get_schema_version() : OB_INVALID_VERSION);
if (OB_NOT_NULL(eli_schema_mgr)) {
FLOG_INFO("[SCHEMA_RELEASE] free schema mgr",
"tenant_id", eli_schema_mgr->get_tenant_id(),
"schema_version", eli_schema_mgr->get_schema_version());
eli_schema_mgr->~ObSchemaMgr();
if (OB_FAIL(schema_mem_mgr.free(static_cast<void *>(eli_schema_mgr)))) {
LOG_WARN("free eli schema mgr falied", K(ret));
} else {
// The allocator of the schema_mgr generated by the schema fallback is independent
// and does not need to be released through switch_allocator.
// For the current use scenarios of OB, liboblog, and agentserver in 2.x, fallback will not consume
// non-latest schema mgr for a long time, and the memory release of schema_mgr generated by fallback
// can be accelerated through background thread inspection.
}
}
}
return ret;
}
int ObMultiVersionSchemaService::alloc_schema_mgr_for_liboblog(
ObSchemaMemMgr &mem_mgr_for_liboblog,
ObSchemaMgr *&schema_mgr)
{
int ret = OB_SUCCESS;
ObSchemaMgr *new_mgr = NULL;
void *tmp_ptr= NULL;
ObIAllocator *allocator = NULL;
if (OB_FAIL(mem_mgr_for_liboblog.alloc(sizeof(ObSchemaMgr), tmp_ptr, &allocator))) {
LOG_WARN("alloc mem for liboblog falied", K(ret));
} else if (OB_ISNULL(allocator) || OB_ISNULL(tmp_ptr)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("tmp ptr or allocator is null", K(ret), K(tmp_ptr), K(allocator));
} else {
new_mgr = new (tmp_ptr) ObSchemaMgr();
schema_mgr = new_mgr;
}
return ret;
}
int ObMultiVersionSchemaService::free_schema_mgr_for_liboblog(
ObSchemaMemMgr &mem_mgr_for_liboblog,
ObSchemaMgr *schema_mgr)
{
int ret = OB_SUCCESS;
if (OB_NOT_NULL(schema_mgr)) {
FLOG_INFO("[SCHEMA_RELEASE] free schema mgr",
"tenant_id", schema_mgr->get_tenant_id(),
"schema_version", schema_mgr->get_schema_version());
schema_mgr->~ObSchemaMgr();
if (OB_FAIL(mem_mgr_for_liboblog.free(static_cast<void *>(schema_mgr)))) {
LOG_ERROR("free schema_mgr for liboblog failed", K(ret), K(schema_mgr));
if (OB_FAIL(schema_mem_mgr.free_schema_mgr(eli_schema_mgr))) {
LOG_WARN("free eli schema mgr falied", KR(ret));
} else {
// The allocator of the schema_mgr generated by the schema fallback is independent
// and does not need to be released through switch_allocator.
// For the current use scenarios of OB, liboblog, and agentserver in 2.x, fallback will not consume
// non-latest schema mgr for a long time, and the memory release of schema_mgr generated by fallback
// can be accelerated through background thread inspection.
}
}
return ret;
@ -924,12 +881,13 @@ int ObMultiVersionSchemaService::fallback_schema_mgr_for_liboblog(
K(schema_status), K(from_version), K(target_version));
}
if (OB_SUCC(ret)) {
bool alloc_for_liboblog = true;
ObSchemaMgr *new_mgr = NULL;
target_mgr_handle.reset();
if (OB_FAIL(fallback_schema_mgr(schema_status, tmp_mgr, target_version))) {
LOG_WARN("fallback schema mgr falied",
K(ret), K(schema_status), K(from_version), K(target_version));
} else if (OB_FAIL(alloc_schema_mgr_for_liboblog(*mem_mgr_for_liboblog, new_mgr))) {
} else if (OB_FAIL(mem_mgr_for_liboblog->alloc_schema_mgr(new_mgr, alloc_for_liboblog))) {
LOG_WARN("alloc schema mgr for liboblog failed",
K(ret), K(schema_status), K(from_version), K(target_version));
} else if (OB_ISNULL(new_mgr)) {
@ -955,10 +913,10 @@ int ObMultiVersionSchemaService::fallback_schema_mgr_for_liboblog(
schema_mgr = new_mgr;
}
}
if (OB_FAIL(ret) && !OB_ISNULL(new_mgr)) {
if (OB_FAIL(ret)) {
int tmp_ret = OB_SUCCESS;
schema_mgr = NULL;
if (OB_SUCCESS != (tmp_ret = free_schema_mgr_for_liboblog(*mem_mgr_for_liboblog, new_mgr))) {
if (OB_TMP_FAIL(mem_mgr_for_liboblog->free_schema_mgr(new_mgr))) {
LOG_ERROR("fail to free schema mgr", K(ret), K(tmp_ret), K(from_version));
}
}
@ -2115,103 +2073,38 @@ int ObMultiVersionSchemaService::add_schema(
schema_mgr_cache = &schema_store->schema_mgr_cache_;
int64_t new_schema_version = schema_mgr_for_cache->get_schema_version();
refreshed_schema_version = schema_store->get_refreshed_version();
ALLOW_NEXT_LOG();
LOG_INFO("add schema",
K(refreshed_schema_version),
K(new_schema_version));
if (refreshed_schema_version > new_schema_version) {
if (OB_ISNULL(schema_mgr_cache)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema_mgr_cache is null", K(ret), K(tenant_id));
} else if (refreshed_schema_version > new_schema_version) {
LOG_WARN("add schema is old",
K(refreshed_schema_version),
K(new_schema_version),
K(received_broadcast_version));
}
void *tmp_ptr = NULL;
ObSchemaMgr *new_mgr = NULL;
ObSchemaMgr *eli_schema_mgr = NULL;
FLOG_INFO("add schema", K(tenant_id), K(refreshed_schema_version), K(new_schema_version));
bool is_exist = false;
if (OB_FAIL(schema_mgr_cache->check_schema_mgr_exist(new_schema_version, is_exist))) {
if (FAILEDx(schema_mgr_cache->check_schema_mgr_exist(new_schema_version, is_exist))) {
LOG_WARN("fail to check schema_mgr exist", K(ret), K(tenant_id), K(new_schema_version));
} else if (is_exist) {
LOG_INFO("schema mgr already exist, just skip", K(ret), K(tenant_id), K(new_schema_version));
} else if (OB_FAIL(mem_mgr->alloc(sizeof(ObSchemaMgr), tmp_ptr))) {
LOG_WARN("alloc mem falied", K(ret));
} else if (FALSE_IT(new_mgr = new (tmp_ptr) ObSchemaMgr)) {
// will not reach here
} else if (OB_FAIL(new_mgr->init())) {
LOG_WARN("init schema mgr falied", K(ret));
} else if (OB_FAIL(new_mgr->assign(*schema_mgr_for_cache))) {
LOG_WARN("assign schema mgr falied", K(ret));
} else if (OB_FAIL(schema_mgr_cache->put(new_mgr, eli_schema_mgr))) {
LOG_WARN("put schema mgr failed", K(ret));
if (OB_EAGAIN == ret) {
FLOG_INFO("[SCHEMA_RELEASE] free schema mgr",
"tenant_id", new_mgr->get_tenant_id(),
"schema_version", new_mgr->get_schema_version());
int64_t tmp_ret = OB_SUCCESS;
new_mgr->~ObSchemaMgr();
const int64_t nsv = new_mgr->get_schema_version();
if (OB_SUCCESS != (tmp_ret = mem_mgr->free(new_mgr))) {
LOG_ERROR("free new_mgr failed", K(tmp_ret), K(nsv));
} else if (OB_FAIL(alloc_and_put_schema_mgr_(*mem_mgr, *schema_mgr_for_cache, *schema_mgr_cache))) {
LOG_WARN("fail to alloc and put schema mgr", KR(ret));
}
// try switch allocator
if (OB_SUCC(ret)) {
bool can_switch = false;
int64_t switch_cnt = ObSchemaService::g_liboblog_mode_ ? init_version_cnt_ : GCONF._max_schema_slot_num;
if (OB_FAIL(mem_mgr->check_can_switch_allocator(switch_cnt, can_switch))) {
LOG_WARN("fail to check can switch allocator", KR(ret));
} else if (can_switch) {
// Switch allocator && rewrite schema_mgr_for_cache_
LOG_INFO("try to switch allocator", KR(ret), K(tenant_id), K(new_schema_version));
if (OB_FAIL(switch_allocator_(*mem_mgr, schema_mgr_for_cache))) {
LOG_WARN("fail to alloc schema mgr after switch allocator", KR(ret), K(tenant_id), K(new_schema_version));
}
}
} else {
LOG_INFO("put schema mgr succeed",
"schema_version", new_mgr->get_schema_version(),
"eliminated_schema_version", NULL != eli_schema_mgr ?
eli_schema_mgr->get_schema_version() : OB_INVALID_VERSION);
if (OB_NOT_NULL(eli_schema_mgr)) {
FLOG_INFO("[SCHEMA_RELEASE] free schema mgr",
"tenant_id", eli_schema_mgr->get_tenant_id(),
"schema_version", eli_schema_mgr->get_schema_version());
eli_schema_mgr->~ObSchemaMgr();
if (OB_FAIL(mem_mgr->free(static_cast<void *>(eli_schema_mgr)))) {
LOG_WARN("free eli schema mgr falied", K(ret));
}
}
// try switch allocator
if (OB_SUCC(ret)) {
bool can_switch = false;
int64_t alloc_cnt = 0;
int64_t switch_cnt = ObSchemaService::g_liboblog_mode_ ? init_version_cnt_ : GCONF._max_schema_slot_num;
if (OB_FAIL(mem_mgr->check_can_switch_allocator(can_switch))) {
LOG_WARN("check can switch allocator falied", K(ret));
} else if (OB_FAIL(mem_mgr->get_cur_alloc_cnt(alloc_cnt))) {
LOG_WARN("get current alloc count falied", K(ret));
} else if (can_switch && alloc_cnt > switch_cnt) {
// Switch allocator && rewrite schema_mgr_for_cache_
LOG_INFO("try to switch allocator", K(ret), K(tenant_id),
"schema_version", schema_mgr_for_cache->get_schema_version());
void *tmp_ptr = NULL;
ObIAllocator *allocator = NULL;
ObSchemaMgr *old_mgr = schema_mgr_for_cache;
ObSchemaMgr *new_mgr = NULL;
bool overwrite = true;
if (OB_FAIL(mem_mgr->switch_allocator())) {
LOG_WARN("switch allocator falied", K(ret));
} else if (OB_FAIL(mem_mgr->alloc(sizeof(ObSchemaMgr), tmp_ptr, &allocator))) {
LOG_WARN("alloc mem falied", K(ret));
} else if (FALSE_IT(new_mgr = new (tmp_ptr) ObSchemaMgr(*allocator))) {
// will not reach here
} else if (OB_FAIL(new_mgr->init())) {
LOG_WARN("init new schema mgr falied", K(ret));
} else if (OB_FAIL(new_mgr->deep_copy(*old_mgr))) {
LOG_WARN("deep copy old schema mgr falied", K(ret));
} else if (OB_FAIL(schema_mgr_for_cache_map_.set_refactored(tenant_id, new_mgr, overwrite))) {
LOG_WARN("fail to get schema mgr for cache", K(ret), K(ret));
} else {
FLOG_INFO("[SCHEMA_RELEASE] free schema mgr",
"tenant_id", old_mgr->get_tenant_id(),
"schema_version", old_mgr->get_schema_version());
old_mgr->~ObSchemaMgr();
if (OB_FAIL(mem_mgr->free(static_cast<void *>(old_mgr)))) {
LOG_WARN("free old schema mgr falied", K(ret));
} else {
schema_mgr_for_cache = new_mgr;
}
}
}
LOG_DEBUG("check can switch", K(ret), K(tenant_id), K(can_switch), K(alloc_cnt), K(switch_cnt));
}
}
if (OB_SUCC(ret)) {
schema_store->update_refreshed_version(new_schema_version);
@ -2226,6 +2119,126 @@ int ObMultiVersionSchemaService::add_schema(
return ret;
}
ERRSIM_POINT_DEF(ERRSIM_PUT_SCHEMA);
ERRSIM_POINT_DEF(ERRSIM_ASSIGN_NEW_MGR);
int ObMultiVersionSchemaService::alloc_and_put_schema_mgr_(
ObSchemaMemMgr &mem_mgr,
ObSchemaMgr &latest_schema_mgr,
ObSchemaMgrCache &schema_mgr_cache)
{
int ret = OB_SUCCESS;
ObSchemaMgr *new_mgr = NULL;
bool alloc_for_liboblog = false;
ObSchemaMgr *eli_schema_mgr = NULL;
const uint64_t tenant_id = latest_schema_mgr.get_tenant_id();
const int64_t schema_version = latest_schema_mgr.get_schema_version();
if (OB_FAIL(mem_mgr.alloc_schema_mgr(new_mgr, alloc_for_liboblog))) {
LOG_WARN("fail to alloc mem", KR(ret));
} else {
if (OB_ISNULL(new_mgr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("new_mgr is NULL", KR(ret), K(tenant_id), K(schema_version));
} else if (OB_FAIL(new_mgr->init())) {
LOG_WARN("init schema mgr falied", KR(ret));
} else if (OB_UNLIKELY(ERRSIM_ASSIGN_NEW_MGR)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("turn on error injection ERRSIM_ASSIGN_NEW_MGR", KR(ret));
} else if (OB_FAIL(new_mgr->assign(latest_schema_mgr))) {
LOG_WARN("assign schema mgr falied", KR(ret));
} else if (OB_UNLIKELY(ERRSIM_PUT_SCHEMA)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("turn on error injection ERRSIM_PUT_SCHEMA", KR(ret));
} else if (OB_FAIL(schema_mgr_cache.put(new_mgr, eli_schema_mgr))) {
LOG_WARN("put schema mgr failed", KR(ret));
} else {
LOG_INFO("put schema mgr succeed",
"schema_version", new_mgr->get_schema_version(),
"eliminated_schema_version", NULL != eli_schema_mgr ?
eli_schema_mgr->get_schema_version() : OB_INVALID_VERSION, K(tenant_id));
}
int tmp_ret = OB_SUCCESS;
// whatever put success or put failed, we should try to free eli_schema_mgr
if (OB_TMP_FAIL(mem_mgr.free_schema_mgr(eli_schema_mgr))) {
LOG_ERROR("fail to free eli_schema_mgr", KR(tmp_ret));
ret = OB_SUCC(ret) ? tmp_ret : ret;
}
// whatever assign/put/free schema mgr failed, new schema mgr will be useless, so free it
if (OB_FAIL(ret)) {
LOG_WARN("handle new schema mgr failed", KR(ret), K(schema_version), K(tenant_id));
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(mem_mgr.free_schema_mgr(new_mgr))) {
LOG_ERROR("fail to free new_mgr", KR(tmp_ret));
}
}
}
return ret;
}
ERRSIM_POINT_DEF(ERRSIM_SET_REFACTOR);
ERRSIM_POINT_DEF(ERRSIM_AFTER_SET_REFACTOR);
int ObMultiVersionSchemaService::switch_allocator_(
ObSchemaMemMgr &mem_mgr,
ObSchemaMgr *&latest_schema_mgr)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(latest_schema_mgr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("latest schema mgr is NULL", KR(ret));
} else if (OB_FAIL(mem_mgr.switch_allocator())) {
LOG_WARN("switch allocator falied", KR(ret));
} else {
bool overwrite = true;
bool need_switch_back = true;
bool alloc_for_liboblog = false;
ObSchemaMgr *new_mgr = NULL;
ObSchemaMgr *old_mgr = latest_schema_mgr;
const uint64_t tenant_id = latest_schema_mgr->get_tenant_id();
const int64_t schema_version = latest_schema_mgr->get_schema_version();
if (OB_FAIL(mem_mgr.alloc_schema_mgr(new_mgr, alloc_for_liboblog))) {
LOG_WARN("fail to alloc mem", KR(ret));
} else {
if (OB_ISNULL(new_mgr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("new mgr is NULL", KR(ret), K(tenant_id), K(schema_version));
} else if (OB_FAIL(new_mgr->init())) {
LOG_WARN("init new schema mgr falied", K(ret));
} else if (OB_FAIL(new_mgr->deep_copy(*old_mgr))) {
LOG_WARN("deep copy old schema mgr falied", K(ret));
} else if (OB_UNLIKELY(ERRSIM_SET_REFACTOR)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("turn on error injection ERRSIM_SET_REFACTOR", KR(ret));
} else if (OB_FAIL(schema_mgr_for_cache_map_.set_refactored(tenant_id, new_mgr, overwrite))) {
LOG_WARN("fail to set schema mgr for cache", K(ret));
} else {
// handle new schema mgr success, no need to switch back allocator
need_switch_back = false;
latest_schema_mgr = new_mgr;
if (OB_UNLIKELY(ERRSIM_AFTER_SET_REFACTOR)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("turn on error injection ERRSIM_AFTER_SET_REFACTOR", KR(ret));
} else if (OB_FAIL(mem_mgr.free_schema_mgr(old_mgr))) {
// old mgr will be release in try_gc_another_allocator
LOG_ERROR("fail to free old schema mgr", KR(ret));
}
}
}
// switch back allocator when cur allocator can not use
// 1.alloc new schema mgr failed
// 2.handle new schema failed, like deep copy
if (need_switch_back) {
LOG_WARN("after switch allocator, handle schema mgr encounters something wrong", KR(ret), K(tenant_id), K(schema_version));
int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(mem_mgr.switch_back_allocator())) {
LOG_ERROR("switch back allocator falied", KR(tmp_ret));
} else if (OB_TMP_FAIL(mem_mgr.free_schema_mgr(new_mgr))) {
LOG_ERROR("fail to free new_mgr", KR(tmp_ret), K(schema_version));
}
}
}
return ret;
}
int ObMultiVersionSchemaService::async_refresh_schema(
const uint64_t tenant_id,
const int64_t schema_version)
@ -3436,14 +3449,8 @@ int ObMultiVersionSchemaService::try_gc_tenant_schema_mgr(ObSchemaMemMgr *&mem_m
do {
if (OB_FAIL(schema_mgr_cache->try_gc_tenant_schema_mgr(eli_schema_mgr))) {
LOG_WARN("fail to eliminate schema mgr", K(ret), K(tenant_id));
} else if (OB_NOT_NULL(eli_schema_mgr)) {
FLOG_INFO("[SCHEMA_RELEASE] try to gc schema mgr",
"tenant_id", eli_schema_mgr->get_tenant_id(),
"schema_version", eli_schema_mgr->get_schema_version());
eli_schema_mgr->~ObSchemaMgr();
if (OB_FAIL(mem_mgr->free(static_cast<void *>(eli_schema_mgr)))) {
LOG_ERROR("free eli schema mgr falied", K(ret), K(tenant_id));
}
} else if (OB_FAIL(mem_mgr->free_schema_mgr(eli_schema_mgr))) {
LOG_ERROR("free eli schema mgr falied", KR(ret), K(tenant_id));
}
} while (OB_SUCC(ret) && OB_NOT_NULL(eli_schema_mgr));
@ -3547,14 +3554,8 @@ int ObMultiVersionSchemaService::try_gc_another_allocator(
} else if (FALSE_IT(eli_schema_mgr = static_cast<ObSchemaMgr *>(another_ptrs.at(i)))) {
} else if (OB_FAIL(schema_mgr_cache->try_elimiante_schema_mgr(eli_schema_mgr))) {
LOG_WARN("fail to elimiante schema_mgr", K(ret), K(tenant_id), K(eli_schema_mgr));
} else {
FLOG_INFO("[SCHEMA_RELEASE] try to eliminate schema mgr",
"tenant_id", eli_schema_mgr->get_tenant_id(),
"schema_version", eli_schema_mgr->get_schema_version());
eli_schema_mgr->~ObSchemaMgr();
if (OB_FAIL(mem_mgr->free(static_cast<void *>(eli_schema_mgr)))) {
LOG_ERROR("free eli schema mgr falied", K(ret), K(tenant_id));
}
} else if (OB_FAIL(mem_mgr->free_schema_mgr(eli_schema_mgr))) {
LOG_ERROR("free eli schema mgr falied", KR(ret), K(tenant_id));
}
}
if (OB_FAIL(ret)) {