[memtable_mgr] ObTabletMemtableMgr memtable management

This commit is contained in:
obdev
2023-11-27 08:00:49 +00:00
committed by ob-robot
parent b7ea58d6e6
commit fdae2b1ab4
31 changed files with 837 additions and 425 deletions

View File

@ -196,7 +196,6 @@ int ObStorageTableGuard::refresh_and_protect_table(ObRelativeTable &relative_tab
int ObStorageTableGuard::refresh_and_protect_memtable()
{
int ret = OB_SUCCESS;
ObIMemtableMgr *memtable_mgr = tablet_->get_memtable_mgr();
memtable::ObIMemtable *memtable = nullptr;
ObTableHandleV2 handle;
const share::ObLSID &ls_id = tablet_->get_tablet_meta().ls_id_;
@ -205,57 +204,56 @@ int ObStorageTableGuard::refresh_and_protect_memtable()
bool bool_ret = true;
bool for_replace_tablet_meta = false;
const int64_t start = ObTimeUtility::current_time();
ObProtectedMemtableMgrHandle *protected_handle = NULL;
if (OB_ISNULL(memtable_mgr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("memtable mgr is null", K(ret), KP(memtable_mgr));
} else {
do {
if (OB_FAIL(memtable_mgr->get_boundary_memtable(handle))) {
// if there is no memtable, create a new one
if (OB_ENTRY_NOT_EXIST == ret) {
LOG_DEBUG("there is no boundary memtable", K(ret), K(ls_id), K(tablet_id));
if (OB_FAIL(memtable_mgr->get_newest_clog_checkpoint_scn(clog_checkpoint_scn))) {
LOG_WARN("fail to get newest clog_checkpoint_scn", K(ret), K(ls_id), K(tablet_id));
} else if (replay_scn_ > clog_checkpoint_scn) {
// TODO: get the newest schema_version from tablet
ObLSHandle ls_handle;
if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
LOG_WARN("failed to get log stream", K(ret), K(ls_id), K(tablet_id));
} else if (OB_UNLIKELY(!ls_handle.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error, invalid ls handle", K(ret), K(ls_handle), K(ls_id), K(tablet_id));
} else if (OB_FAIL(ls_handle.get_ls()->get_tablet_svr()->create_memtable(
tablet_id, 0/*schema version*/, for_replay_, clog_checkpoint_scn))) {
LOG_WARN("fail to create a boundary memtable", K(ret), K(ls_id), K(tablet_id));
}
} else { // replay_log_scn_ <= clog_checkpoint_scn
// no need to create a boundary memtable
ret = OB_SUCCESS;
break;
do {
if (OB_FAIL(tablet_->get_boundary_memtable(handle))) {
// if there is no memtable, create a new one
if (OB_ENTRY_NOT_EXIST == ret) {
LOG_DEBUG("there is no boundary memtable", K(ret), K(ls_id), K(tablet_id));
if (OB_FAIL(tablet_->get_protected_memtable_mgr_handle(protected_handle))) {
LOG_WARN("failed to get_protected_memtable_mgr_handle", K(ret), KPC(tablet_));
} else if (OB_FAIL(protected_handle->get_newest_clog_checkpoint_scn(
tablet_->get_tablet_meta(), clog_checkpoint_scn))) {
LOG_WARN("fail to get newest clog_checkpoint_scn", K(ret), K(ls_id), K(tablet_id));
} else if (replay_scn_ > clog_checkpoint_scn) {
// TODO: get the newest schema_version from tablet
ObLSHandle ls_handle;
if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
LOG_WARN("failed to get log stream", K(ret), K(ls_id), K(tablet_id));
} else if (OB_UNLIKELY(!ls_handle.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error, invalid ls handle", K(ret), K(ls_handle), K(ls_id), K(tablet_id));
} else if (OB_FAIL(ls_handle.get_ls()->get_tablet_svr()->create_memtable(
tablet_id, 0/*schema version*/, for_replay_, clog_checkpoint_scn))) {
LOG_WARN("fail to create a boundary memtable", K(ret), K(ls_id), K(tablet_id));
}
} else { // OB_ENTRY_NOT_EXIST != ret
LOG_WARN("fail to get boundary memtable", K(ret), K(ls_id), K(tablet_id));
} else { // replay_log_scn_ <= clog_checkpoint_scn
// no need to create a boundary memtable
ret = OB_SUCCESS;
break;
}
} else if (OB_FAIL(handle.get_memtable(memtable))) {
LOG_WARN("fail to get memtable from ObTableHandle", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(check_freeze_to_inc_write_ref(memtable, bool_ret, for_replace_tablet_meta))) {
if (OB_EAGAIN == ret) {
} else if (OB_MINOR_FREEZE_NOT_ALLOW != ret) {
LOG_WARN("fail to check_freeze", K(ret), K(tablet_id), K(bool_ret), KPC(memtable));
}
} else {
// do nothing
} else { // OB_ENTRY_NOT_EXIST != ret
LOG_WARN("fail to get boundary memtable", K(ret), K(ls_id), K(tablet_id));
}
const int64_t cost_time = ObTimeUtility::current_time() - start;
if (cost_time > 10 * 1000) {
if (TC_REACH_TIME_INTERVAL(10 * 1000)) {
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "refresh replay table too much times", K(ret),
K(ls_id), K(tablet_id), K(cost_time));
}
} else if (OB_FAIL(handle.get_memtable(memtable))) {
LOG_WARN("fail to get memtable from ObTableHandle", K(ret), K(ls_id), K(tablet_id));
} else if (OB_FAIL(check_freeze_to_inc_write_ref(memtable, bool_ret, for_replace_tablet_meta))) {
if (OB_EAGAIN == ret) {
} else if (OB_MINOR_FREEZE_NOT_ALLOW != ret) {
LOG_WARN("fail to check_freeze", K(ret), K(tablet_id), K(bool_ret), KPC(memtable));
}
} while ((OB_SUCC(ret) || OB_ENTRY_NOT_EXIST == ret || OB_EAGAIN == ret) && bool_ret);
}
} else {
// do nothing
}
const int64_t cost_time = ObTimeUtility::current_time() - start;
if (cost_time > 10 * 1000) {
if (TC_REACH_TIME_INTERVAL(10 * 1000)) {
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "refresh replay table too much times", K(ret),
K(ls_id), K(tablet_id), K(cost_time));
}
}
} while ((OB_SUCC(ret) || OB_ENTRY_NOT_EXIST == ret || OB_EAGAIN == ret) && bool_ret);
return ret;
}
@ -311,7 +309,6 @@ int ObStorageTableGuard::check_freeze_to_inc_write_ref(ObITable *table, bool &bo
bool_ret = true;
const share::ObLSID &ls_id = tablet_->get_tablet_meta().ls_id_;
const common::ObTabletID &tablet_id = tablet_->get_tablet_meta().tablet_id_;
ObIMemtableMgr *memtable_mgr = tablet_->get_memtable_mgr();
// need to make sure the memtable is a right boundary memtable
memtable::ObIMemtable *memtable = static_cast<memtable::ObIMemtable *>(table);
memtable::ObIMemtable *old_memtable = memtable;
@ -319,11 +316,9 @@ int ObStorageTableGuard::check_freeze_to_inc_write_ref(ObITable *table, bool &bo
// prevent that the memtable transforms from active to frozen before inc_write_ref
uint32_t old_freeze_flag = 0;
bool is_tablet_freeze = false;
ObProtectedMemtableMgrHandle *protected_handle = NULL;
if (OB_ISNULL(memtable_mgr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("memtable mgr is null", K(ret), K(bool_ret), K(ls_id), K(tablet_id), KP(memtable_mgr));
} else if (OB_ISNULL(table)) {
if (OB_ISNULL(table)) {
LOG_INFO("table is null, need to refresh", K(bool_ret), K(ls_id), K(tablet_id));
} else if (FALSE_IT(old_freeze_flag = memtable->get_freeze_flag())) {
} else if (FALSE_IT(is_tablet_freeze = memtable->get_is_tablet_freeze())) {
@ -333,7 +328,9 @@ int ObStorageTableGuard::check_freeze_to_inc_write_ref(ObITable *table, bool &bo
if (for_replay_ || for_multi_source_data_) {
// filter memtables for replay or multi_source_data according to scn
ObTableHandleV2 handle;
if (OB_FAIL(memtable_mgr->get_memtable_for_replay(replay_scn_, handle))) {
if (OB_FAIL(tablet_->get_protected_memtable_mgr_handle(protected_handle))) {
LOG_WARN("failed to get_protected_memtable_mgr_handle", K(ret), KPC(tablet_));
} else if (OB_FAIL(protected_handle->get_memtable_for_replay(replay_scn_, handle))) {
if (OB_NO_NEED_UPDATE == ret) {
// no need to replay the log
bool_ret = false;
@ -357,16 +354,15 @@ int ObStorageTableGuard::check_freeze_to_inc_write_ref(ObITable *table, bool &bo
// need to create a new memtable
// TODO: allow to write frozen memtables except for the boundary one when replaying
const int64_t write_ref = memtable->get_write_ref();
ObProtectedMemtableMgrHandle *protected_handle = NULL;
if (0 == write_ref) {
SCN clog_checkpoint_scn;
bool need_create_memtable = true;
SCN migration_clog_checkpoint_scn;
ObIMemtableMgr *memtable_mgr = tablet_->get_memtable_mgr();
if (OB_ISNULL(memtable_mgr)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("memtable mgr is null", K(ret), K(bool_ret), K(ls_id), K(tablet_id), KP(memtable_mgr));
} else if (OB_FAIL(memtable_mgr->get_newest_clog_checkpoint_scn(clog_checkpoint_scn))) {
if (OB_FAIL(tablet_->get_protected_memtable_mgr_handle(protected_handle))) {
LOG_WARN("failed to get_protected_memtable_mgr_handle", K(ret), KPC(tablet_));
} else if (OB_FAIL(protected_handle->get_newest_clog_checkpoint_scn(
tablet_->get_tablet_meta(), clog_checkpoint_scn))) {
LOG_WARN("failed to get newest clog_checkpoint_scn", K(ret), K(ls_id), K(tablet_id), K(clog_checkpoint_scn));
} else if (FALSE_IT(migration_clog_checkpoint_scn = static_cast<memtable::ObMemtable *>(memtable)->get_migration_clog_checkpoint_scn())) {
} else if (for_replay_ && !migration_clog_checkpoint_scn.is_min()) {