try flush ddl commit scn

This commit is contained in:
simonjoylet
2024-02-26 07:45:55 +00:00
committed by ob-robot
parent 50e775cd5e
commit fbbdf5f3ea
8 changed files with 62 additions and 12 deletions

View File

@ -1171,7 +1171,7 @@ int ObTenantTabletScheduler::schedule_tablet_minor_merge(
} }
int ObTenantTabletScheduler::schedule_tablet_ddl_major_merge( int ObTenantTabletScheduler::schedule_tablet_ddl_major_merge(
const share::ObLSID &ls_id, ObLSHandle &ls_handle,
ObTabletHandle &tablet_handle) ObTabletHandle &tablet_handle)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -1181,9 +1181,10 @@ int ObTenantTabletScheduler::schedule_tablet_ddl_major_merge(
ObTenantDirectLoadMgr *tenant_direct_load_mgr = MTL(ObTenantDirectLoadMgr *); ObTenantDirectLoadMgr *tenant_direct_load_mgr = MTL(ObTenantDirectLoadMgr *);
bool is_major_sstable_exist = false; bool is_major_sstable_exist = false;
bool has_freezed_ddl_kv = false; bool has_freezed_ddl_kv = false;
if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_handle.is_valid())) { SCN ddl_commit_scn;
if (OB_UNLIKELY(!ls_handle.is_valid() || !tablet_handle.is_valid())) {
ret = OB_INVALID_ARGUMENT; ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(ls_id), K(tablet_handle)); LOG_WARN("invalid argument", K(ret), K(ls_handle), K(tablet_handle));
} else if (tablet_handle.get_obj()->get_tablet_meta().has_transfer_table()) { } else if (tablet_handle.get_obj()->get_tablet_meta().has_transfer_table()) {
if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) { if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL)) {
LOG_INFO("The tablet in the transfer process does not do ddl major_merge", K(tablet_handle)); LOG_INFO("The tablet in the transfer process does not do ddl major_merge", K(tablet_handle));
@ -1192,7 +1193,7 @@ int ObTenantTabletScheduler::schedule_tablet_ddl_major_merge(
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected error", K(ret), K(MTL_ID())); LOG_WARN("unexpected error", K(ret), K(MTL_ID()));
} else if (OB_FAIL(tenant_direct_load_mgr->get_tablet_mgr_and_check_major( } else if (OB_FAIL(tenant_direct_load_mgr->get_tablet_mgr_and_check_major(
ls_id, ls_handle.get_ls()->get_ls_id(),
tablet_handle.get_obj()->get_tablet_meta().tablet_id_, tablet_handle.get_obj()->get_tablet_meta().tablet_id_,
true, /* is_full_direct_load */ true, /* is_full_direct_load */
direct_load_mgr_handle, direct_load_mgr_handle,
@ -1204,6 +1205,9 @@ int ObTenantTabletScheduler::schedule_tablet_ddl_major_merge(
} }
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) { } else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
LOG_WARN("get ddl kv mgr failed", K(ret)); LOG_WARN("get ddl kv mgr failed", K(ret));
} else if (FALSE_IT(ddl_commit_scn = direct_load_mgr_handle.get_full_obj()->get_commit_scn(tablet_handle.get_obj()->get_tablet_meta()))) {
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->try_flush_ddl_commit_scn(ls_handle, tablet_handle, direct_load_mgr_handle, ddl_commit_scn))) {
LOG_WARN("try flush ddl commit scn failed", K(ret), "tablet_id", tablet_handle.get_obj()->get_tablet_meta().tablet_id_);
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->check_has_freezed_ddl_kv(has_freezed_ddl_kv))) { } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->check_has_freezed_ddl_kv(has_freezed_ddl_kv))) {
LOG_WARN("check has freezed ddl kv failed", K(ret)); LOG_WARN("check has freezed ddl kv failed", K(ret));
} else if (OB_FAIL(direct_load_mgr_handle.get_full_obj()->prepare_ddl_merge_param(*tablet_handle.get_obj(), param))) { } else if (OB_FAIL(direct_load_mgr_handle.get_full_obj()->prepare_ddl_merge_param(*tablet_handle.get_obj(), param))) {
@ -1347,7 +1351,7 @@ int ObTenantTabletScheduler::schedule_tablet_minor(
} }
} }
if (!tablet_id.is_ls_inner_tablet()) { // data tablet if (!tablet_id.is_ls_inner_tablet()) { // data tablet
if (OB_TMP_FAIL(schedule_tablet_ddl_major_merge(ls_id, tablet_handle))) { if (OB_TMP_FAIL(schedule_tablet_ddl_major_merge(ls_handle, tablet_handle))) {
if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) { if (OB_SIZE_OVERFLOW != tmp_ret && OB_EAGAIN != tmp_ret) {
LOG_WARN("failed to schedule tablet ddl merge", K(tmp_ret), K(ls_id), K(tablet_handle)); LOG_WARN("failed to schedule tablet ddl merge", K(tmp_ret), K(ls_id), K(tablet_handle));
} }

View File

@ -223,7 +223,7 @@ public:
const ObMergeType merge_type, const ObMergeType merge_type,
const int64_t &merge_snapshot_version); const int64_t &merge_snapshot_version);
static int schedule_tablet_ddl_major_merge( static int schedule_tablet_ddl_major_merge(
const share::ObLSID &ls_id, ObLSHandle &ls_handle,
ObTabletHandle &tablet_handle); ObTabletHandle &tablet_handle);
int get_min_dependent_schema_version(int64_t &min_schema_version); int get_min_dependent_schema_version(int64_t &min_schema_version);

View File

@ -228,7 +228,7 @@ int wait_lob_tablet_major_exist(ObLSHandle &ls_handle, ObTablet &tablet)
if (!is_major_sstable_exist) { if (!is_major_sstable_exist) {
ret = OB_EAGAIN; ret = OB_EAGAIN;
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_tablet_ddl_major_merge(ls_handle.get_ls()->get_ls_id(), lob_tablet_handle))) { if (OB_TMP_FAIL(compaction::ObTenantTabletScheduler::schedule_tablet_ddl_major_merge(ls_handle, lob_tablet_handle))) {
LOG_WARN("schedule ddl major merge for lob tablet failed", K(tmp_ret), K(lob_tablet_id)); LOG_WARN("schedule ddl major merge for lob tablet failed", K(tmp_ret), K(lob_tablet_id));
} }
} }

View File

@ -285,12 +285,12 @@ const ObTabletDirectLoadMgr *ObTabletDirectLoadMgrHandle::get_obj() const
return tablet_mgr_; return tablet_mgr_;
} }
ObTabletFullDirectLoadMgr* ObTabletDirectLoadMgrHandle::get_full_obj() ObTabletFullDirectLoadMgr* ObTabletDirectLoadMgrHandle::get_full_obj() const
{ {
return static_cast<ObTabletFullDirectLoadMgr *>(tablet_mgr_); return static_cast<ObTabletFullDirectLoadMgr *>(tablet_mgr_);
} }
ObTabletIncDirectLoadMgr* ObTabletDirectLoadMgrHandle::get_inc_obj() ObTabletIncDirectLoadMgr* ObTabletDirectLoadMgrHandle::get_inc_obj() const
{ {
return static_cast<ObTabletIncDirectLoadMgr *>(tablet_mgr_); return static_cast<ObTabletIncDirectLoadMgr *>(tablet_mgr_);
} }

View File

@ -159,8 +159,8 @@ public:
int assign(const ObTabletDirectLoadMgrHandle &handle); int assign(const ObTabletDirectLoadMgrHandle &handle);
ObTabletDirectLoadMgr *get_obj(); ObTabletDirectLoadMgr *get_obj();
const ObTabletDirectLoadMgr *get_obj() const; const ObTabletDirectLoadMgr *get_obj() const;
ObTabletFullDirectLoadMgr *get_full_obj(); ObTabletFullDirectLoadMgr *get_full_obj() const;
ObTabletIncDirectLoadMgr *get_inc_obj(); ObTabletIncDirectLoadMgr *get_inc_obj() const;
void reset(); void reset();
bool is_valid() const; bool is_valid() const;
TO_STRING_KV(KP_(tablet_mgr)); TO_STRING_KV(KP_(tablet_mgr));

View File

@ -2944,7 +2944,7 @@ int ObTabletFullDirectLoadMgr::update_major_sstable()
storage_schema, storage_schema,
ls_handle.get_ls()->get_rebuild_seq()); ls_handle.get_ls()->get_rebuild_seq());
param.ddl_info_.keep_old_ddl_sstable_ = true; param.ddl_info_.keep_old_ddl_sstable_ = true;
param.ddl_info_.ddl_commit_scn_ = get_commit_scn(tablet_handle.get_obj()->get_tablet_meta()); param.ddl_info_.ddl_commit_scn_ = get_commit_scn(tablet_handle.get_obj()->get_tablet_meta()); // ddl commit scn may larger than ddl checkpoint scn
if (OB_FAIL(ls_handle.get_ls()->update_tablet_table_store(tablet_id_, param, new_tablet_handle))) { if (OB_FAIL(ls_handle.get_ls()->update_tablet_table_store(tablet_id_, param, new_tablet_handle))) {
LOG_WARN("failed to update tablet table store", K(ret), K(ls_id_), K(tablet_id_), K(param)); LOG_WARN("failed to update tablet table store", K(ret), K(ls_id_), K(tablet_id_), K(param));
} }

View File

@ -569,6 +569,47 @@ int ObTabletDDLKvMgr::get_ddl_kvs_for_query(ObTablet &tablet, ObIArray<ObDDLKVHa
return ret; return ret;
} }
// when ddl commit scn is only in memory, try flush it, need wait log replay point elapsed the ddl commit scn
int ObTabletDDLKvMgr::try_flush_ddl_commit_scn(
ObLSHandle &ls_handle,
const ObTabletHandle &tablet_handle,
const ObTabletDirectLoadMgrHandle &direct_load_mgr_handle,
const share::SCN &commit_scn)
{
int ret = OB_SUCCESS;
ObTabletFullDirectLoadMgr *direct_load_mgr = nullptr;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTabletDDLKvMgr is not inited", K(ret));
} else if (OB_UNLIKELY(!ls_handle.is_valid() || !tablet_handle.is_valid() || OB_ISNULL(direct_load_mgr = direct_load_mgr_handle.get_full_obj()))) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(ls_handle), K(tablet_handle));
} else if (commit_scn.is_valid_and_not_min() // already committed
&& tablet_handle.get_obj()->get_tablet_meta().ddl_checkpoint_scn_ != commit_scn) {// only exist in memory
SCN max_decided_scn;
bool already_freezed = true;
{
ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK);
already_freezed = max_freeze_scn_ >= commit_scn;
}
if (already_freezed) {
// do nothing
} else if (OB_FAIL(ls_handle.get_ls()->get_max_decided_scn(max_decided_scn))) {
LOG_WARN("get max decided log ts failed", K(ret), K(ls_handle.get_ls()->get_ls_id()));
} else if (SCN::plus(max_decided_scn, 1) >= commit_scn) { // commit_scn elapsed, means the prev clog already replayed or applied
// max_decided_scn is the left border scn - 1
// the min deciding(replay or apply) scn (aka left border) is max_decided_scn + 1
if (OB_FAIL(freeze_ddl_kv(direct_load_mgr->get_start_scn(),
direct_load_mgr->get_table_key().get_snapshot_version(),
direct_load_mgr->get_data_format_version(),
commit_scn))) {
LOG_WARN("freeze ddl kv failed", K(ret));
}
}
}
return ret;
}
int ObTabletDDLKvMgr::check_has_effective_ddl_kv(bool &has_ddl_kv) int ObTabletDDLKvMgr::check_has_effective_ddl_kv(bool &has_ddl_kv)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;

View File

@ -54,6 +54,11 @@ public:
const share::SCN &freeze_scn = share::SCN::min_scn()); // freeze the active ddl kv, when memtable freeze or ddl commit const share::SCN &freeze_scn = share::SCN::min_scn()); // freeze the active ddl kv, when memtable freeze or ddl commit
int release_ddl_kvs(const share::SCN &rec_scn); // release persistent ddl kv, used in ddl merge task for free ddl kv int release_ddl_kvs(const share::SCN &rec_scn); // release persistent ddl kv, used in ddl merge task for free ddl kv
int check_has_effective_ddl_kv(bool &has_ddl_kv); // used in ddl log handler for checkpoint int check_has_effective_ddl_kv(bool &has_ddl_kv); // used in ddl log handler for checkpoint
int try_flush_ddl_commit_scn(
ObLSHandle &ls_handle,
const ObTabletHandle &tablet_handle,
const ObTabletDirectLoadMgrHandle &direct_load_mgr_handle,
const share::SCN &commit_scn);
int check_has_freezed_ddl_kv(bool &has_freezed_ddl_kv); int check_has_freezed_ddl_kv(bool &has_freezed_ddl_kv);
int64_t get_count(); int64_t get_count();
void set_ddl_kv(const int64_t idx, ObDDLKVHandle &kv_handle); //for unittest void set_ddl_kv(const int64_t idx, ObDDLKVHandle &kv_handle); //for unittest