Filter logs before create_memtable for replacing tablet_meta
This commit is contained in:
@ -143,7 +143,7 @@ int ObStorageTableGuard::refresh_and_protect_memtable()
|
|||||||
ObTableHandleV2 handle;
|
ObTableHandleV2 handle;
|
||||||
const share::ObLSID &ls_id = tablet_->get_tablet_meta().ls_id_;
|
const share::ObLSID &ls_id = tablet_->get_tablet_meta().ls_id_;
|
||||||
const common::ObTabletID &tablet_id = tablet_->get_tablet_meta().tablet_id_;
|
const common::ObTabletID &tablet_id = tablet_->get_tablet_meta().tablet_id_;
|
||||||
SCN clog_checkpoint_scn;
|
SCN clog_checkpoint_scn = tablet_->get_tablet_meta().clog_checkpoint_scn_;
|
||||||
bool bool_ret = true;
|
bool bool_ret = true;
|
||||||
const int64_t start = ObTimeUtility::current_time();
|
const int64_t start = ObTimeUtility::current_time();
|
||||||
|
|
||||||
@ -152,29 +152,25 @@ int ObStorageTableGuard::refresh_and_protect_memtable()
|
|||||||
LOG_WARN("memtable mgr is null", K(ret), KP(memtable_mgr));
|
LOG_WARN("memtable mgr is null", K(ret), KP(memtable_mgr));
|
||||||
} else {
|
} else {
|
||||||
do {
|
do {
|
||||||
if (OB_FAIL(memtable_mgr->get_boundary_memtable(handle))) {
|
if (replay_scn_ <= clog_checkpoint_scn) {
|
||||||
|
bool_ret = false;
|
||||||
|
LOG_INFO("no need to replay the log", K(ls_id), K(tablet_id), K(replay_scn_), K(clog_checkpoint_scn));
|
||||||
|
} else if (OB_FAIL(memtable_mgr->get_boundary_memtable(handle))) {
|
||||||
// if there is no memtable, create a new one
|
// if there is no memtable, create a new one
|
||||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||||
LOG_DEBUG("there is no boundary memtable", K(ret), K(ls_id), K(tablet_id));
|
LOG_DEBUG("there is no boundary memtable", K(ret), K(ls_id), K(tablet_id));
|
||||||
if (OB_FAIL(memtable_mgr->get_newest_clog_checkpoint_scn(clog_checkpoint_scn))) {
|
|
||||||
LOG_WARN("fail to get newest clog_checkpoint_scn", K(ret), K(ls_id), K(tablet_id));
|
|
||||||
} else if (replay_scn_ > clog_checkpoint_scn) {
|
|
||||||
// TODO: get the newest schema_version from tablet
|
|
||||||
ObLSHandle ls_handle;
|
ObLSHandle ls_handle;
|
||||||
|
// overwrite the ret code OB_ENTRY_NOT_EXIST
|
||||||
if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id, ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
||||||
LOG_WARN("failed to get log stream", K(ret), K(ls_id), K(tablet_id));
|
LOG_WARN("failed to get log stream", K(ret), K(ls_id), K(tablet_id));
|
||||||
} else if (OB_UNLIKELY(!ls_handle.is_valid())) {
|
} else if (OB_UNLIKELY(!ls_handle.is_valid())) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("unexpected error, invalid ls handle", K(ret), K(ls_handle), K(ls_id), K(tablet_id));
|
LOG_WARN("unexpected error, invalid ls handle", K(ret), K(ls_handle), K(ls_id), K(tablet_id));
|
||||||
} else if (OB_FAIL(ls_handle.get_ls()->get_tablet_svr()->create_memtable(
|
} else if (OB_FAIL(ls_handle.get_ls()->get_tablet_svr()->create_memtable(tablet_id,
|
||||||
tablet_id, 0/*schema version*/, for_replay_))) {
|
0/*schema version*/,
|
||||||
|
for_replay_))) {
|
||||||
LOG_WARN("fail to create a boundary memtable", K(ret), K(ls_id), K(tablet_id));
|
LOG_WARN("fail to create a boundary memtable", K(ret), K(ls_id), K(tablet_id));
|
||||||
}
|
}
|
||||||
} else { // replay_log_scn_ <= clog_checkpoint_scn
|
|
||||||
// no need to create a boundary memtable
|
|
||||||
ret = OB_SUCCESS;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else { // OB_ENTRY_NOT_EXIST != ret
|
} else { // OB_ENTRY_NOT_EXIST != ret
|
||||||
LOG_WARN("fail to get boundary memtable", K(ret), K(ls_id), K(tablet_id));
|
LOG_WARN("fail to get boundary memtable", K(ret), K(ls_id), K(tablet_id));
|
||||||
}
|
}
|
||||||
@ -184,8 +180,11 @@ int ObStorageTableGuard::refresh_and_protect_memtable()
|
|||||||
if (OB_MINOR_FREEZE_NOT_ALLOW != ret) {
|
if (OB_MINOR_FREEZE_NOT_ALLOW != ret) {
|
||||||
LOG_WARN("fail to check_freeze", K(ret), K(tablet_id), K(bool_ret), KPC(memtable));
|
LOG_WARN("fail to check_freeze", K(ret), K(tablet_id), K(bool_ret), KPC(memtable));
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
// do nothing
|
if (OB_EAGAIN != ret) {
|
||||||
|
// overwrite the ret code OB_EAGAIN
|
||||||
|
} else if (OB_FAIL(memtable_mgr->get_newest_clog_checkpoint_scn(clog_checkpoint_scn))) {
|
||||||
|
LOG_WARN("fail to get newest clog_checkpoint_scn", K(ret), K(ls_id), K(tablet_id));
|
||||||
}
|
}
|
||||||
const int64_t cost_time = ObTimeUtility::current_time() - start;
|
const int64_t cost_time = ObTimeUtility::current_time() - start;
|
||||||
if (cost_time > 10 * 1000) {
|
if (cost_time > 10 * 1000) {
|
||||||
|
|||||||
@ -156,6 +156,7 @@ int ObTabletMemtableMgr::create_memtable(const SCN clog_checkpoint_scn,
|
|||||||
memtable::ObMemtable *active_memtable = nullptr;
|
memtable::ObMemtable *active_memtable = nullptr;
|
||||||
uint32_t memtable_freeze_clock = UINT32_MAX;
|
uint32_t memtable_freeze_clock = UINT32_MAX;
|
||||||
const share::ObLSID ls_id = ls_->get_ls_id();
|
const share::ObLSID ls_id = ls_->get_ls_id();
|
||||||
|
SCN new_clog_checkpoint_scn;
|
||||||
if (has_memtable && OB_NOT_NULL(active_memtable = get_active_memtable_())) {
|
if (has_memtable && OB_NOT_NULL(active_memtable = get_active_memtable_())) {
|
||||||
memtable_freeze_clock = active_memtable->get_freeze_clock();
|
memtable_freeze_clock = active_memtable->get_freeze_clock();
|
||||||
}
|
}
|
||||||
@ -175,6 +176,11 @@ int ObTabletMemtableMgr::create_memtable(const SCN clog_checkpoint_scn,
|
|||||||
K(get_memtable_count_()),
|
K(get_memtable_count_()),
|
||||||
KPC(first_frozen_memtable.get_table()));
|
KPC(first_frozen_memtable.get_table()));
|
||||||
}
|
}
|
||||||
|
} else if (OB_FAIL(get_newest_clog_checkpoint_scn(new_clog_checkpoint_scn))) {
|
||||||
|
LOG_WARN("failed to get newest clog_checkpoint_scn", K(ret), K(ls_id), K(tablet_id_), K(new_clog_checkpoint_scn));
|
||||||
|
} else if (for_replay && clog_checkpoint_scn != new_clog_checkpoint_scn) {
|
||||||
|
ret = OB_EAGAIN;
|
||||||
|
LOG_INFO("clog_checkpoint_scn changed, need retry to replay", K(ls_id), K(tablet_id_), K(clog_checkpoint_scn), K(new_clog_checkpoint_scn));
|
||||||
} else {
|
} else {
|
||||||
ObITable::TableKey table_key;
|
ObITable::TableKey table_key;
|
||||||
table_key.table_type_ = ObITable::DATA_MEMTABLE;
|
table_key.table_type_ = ObITable::DATA_MEMTABLE;
|
||||||
@ -198,8 +204,6 @@ int ObTabletMemtableMgr::create_memtable(const SCN clog_checkpoint_scn,
|
|||||||
LOG_WARN("failed to init memtable", K(ret), K(ls_id), K(table_key), KP(freezer_), KP(this),
|
LOG_WARN("failed to init memtable", K(ret), K(ls_id), K(table_key), KP(freezer_), KP(this),
|
||||||
K(schema_version), K(logstream_freeze_clock));
|
K(schema_version), K(logstream_freeze_clock));
|
||||||
} else {
|
} else {
|
||||||
SCN new_clog_checkpoint_scn;
|
|
||||||
SCN new_snapshot_version;
|
|
||||||
memtable::ObMemtable *last_frozen_memtable = get_last_frozen_memtable_();
|
memtable::ObMemtable *last_frozen_memtable = get_last_frozen_memtable_();
|
||||||
if (OB_NOT_NULL(last_frozen_memtable)) {
|
if (OB_NOT_NULL(last_frozen_memtable)) {
|
||||||
// keep the check order: is_frozen, write_ref_cnt, then unsubmitted_cnt and unsynced_cnt
|
// keep the check order: is_frozen, write_ref_cnt, then unsubmitted_cnt and unsynced_cnt
|
||||||
@ -227,11 +231,6 @@ int ObTabletMemtableMgr::create_memtable(const SCN clog_checkpoint_scn,
|
|||||||
// there is no frozen memtable and new sstable will not be generated,
|
// there is no frozen memtable and new sstable will not be generated,
|
||||||
// meaning that clog_checkpoint_scn will not be updated now,
|
// meaning that clog_checkpoint_scn will not be updated now,
|
||||||
// so get newest clog_checkpoint_scn to set left boundary
|
// so get newest clog_checkpoint_scn to set left boundary
|
||||||
} else if (OB_FAIL(get_newest_clog_checkpoint_scn(new_clog_checkpoint_scn))){
|
|
||||||
LOG_WARN("failed to get newest clog_checkpoint_scn", K(ret), K(ls_id), K(tablet_id_),
|
|
||||||
K(new_clog_checkpoint_scn));
|
|
||||||
} else if (OB_FAIL(get_newest_snapshot_version(new_snapshot_version))){
|
|
||||||
LOG_WARN("failed to get newest snapshot_version", K(ret), K(ls_id), K(tablet_id_), K(new_snapshot_version));
|
|
||||||
} else {
|
} else {
|
||||||
memtable->resolve_left_boundary(new_clog_checkpoint_scn);
|
memtable->resolve_left_boundary(new_clog_checkpoint_scn);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user