optimize ddl kv dump scheduler and ddl commit

This commit is contained in:
Charles0429
2024-02-09 04:56:41 +00:00
committed by ob-robot
parent d7dc28d729
commit b19b08e137
7 changed files with 60 additions and 17 deletions

View File

@ -2453,7 +2453,8 @@ int ObRpcRemoteWriteDDLCommitLogP::process()
arg_.start_scn_,
commit_scn,
arg_.table_id_,
arg_.ddl_task_id_))) {
arg_.ddl_task_id_,
false/*is replay*/))) {
LOG_WARN("failed to do ddl kv commit", K(ret), K(arg_));
} else {
result_ = commit_scn.get_val_for_tx();

View File

@ -1177,8 +1177,10 @@ int ObTenantTabletScheduler::schedule_tablet_ddl_major_merge(
int ret = OB_SUCCESS;
ObDDLTableMergeDagParam param;
ObTabletDirectLoadMgrHandle direct_load_mgr_handle;
ObDDLKvMgrHandle ddl_kv_mgr_handle;
ObTenantDirectLoadMgr *tenant_direct_load_mgr = MTL(ObTenantDirectLoadMgr *);
bool is_major_sstable_exist = false;
bool has_freezed_ddl_kv = false;
if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_handle.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(ls_id), K(tablet_handle));
@ -1200,13 +1202,19 @@ int ObTenantTabletScheduler::schedule_tablet_ddl_major_merge(
} else {
LOG_WARN("get tablet direct load mgr failed", K(ret), "tablet_id", tablet_handle.get_obj()->get_tablet_meta().tablet_id_);
}
} else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
LOG_WARN("get ddl kv mgr failed", K(ret));
} else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->check_has_freezed_ddl_kv(has_freezed_ddl_kv))) {
LOG_WARN("check has freezed ddl kv failed", K(ret));
} else if (OB_FAIL(direct_load_mgr_handle.get_full_obj()->prepare_ddl_merge_param(*tablet_handle.get_obj(), param))) {
if (OB_EAGAIN != ret) {
LOG_WARN("prepare major merge param failed", K(ret), "tablet_id", tablet_handle.get_obj()->get_tablet_meta().tablet_id_);
}
} else if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) {
if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) {
LOG_WARN("schedule ddl merge dag failed", K(ret), K(param));
} else if (has_freezed_ddl_kv || param.is_commit_) {
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) {
if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) {
LOG_WARN("schedule ddl merge dag failed", K(ret), K(param));
}
}
}
return ret;

View File

@ -428,7 +428,7 @@ int ObDDLCommitReplayExecutor::do_replay_(ObTabletHandle &tablet_handle) //TODO(
} else if (OB_ISNULL(data_direct_load_mgr = direct_load_mgr_handle.get_full_obj())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected err", K(ret), K(table_key));
} else if (OB_FAIL(data_direct_load_mgr->commit(*tablet_handle.get_obj(), log_->get_start_scn(), scn_, false/*wait_major_generate*/))) {
} else if (OB_FAIL(data_direct_load_mgr->commit(*tablet_handle.get_obj(), log_->get_start_scn(), scn_, 0/*unused table_id*/, 0/*unused ddl_task_id*/, true/*is replay*/))) {
if (OB_TABLET_NOT_EXIST == ret || OB_TASK_EXPIRED == ret) {
ret = OB_SUCCESS; // exit when tablet not exist or task expired
} else {

View File

@ -2155,7 +2155,7 @@ int ObTabletFullDirectLoadMgr::close(const int64_t execution_id, const SCN &star
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected err", K(ret), KPC(this));
} else if (OB_FAIL(commit(*tablet_handle.get_obj(), start_scn, commit_scn,
sqc_build_ctx_.build_param_.runtime_only_param_.table_id_, sqc_build_ctx_.build_param_.runtime_only_param_.task_id_))) {
sqc_build_ctx_.build_param_.runtime_only_param_.table_id_, sqc_build_ctx_.build_param_.runtime_only_param_.task_id_, false/*is replay*/))) {
LOG_WARN("failed to do ddl kv commit", K(ret), KPC(this));
}
@ -2163,11 +2163,11 @@ int ObTabletFullDirectLoadMgr::close(const int64_t execution_id, const SCN &star
} else if (sstable_already_created || is_delay_build_major) {
LOG_INFO("sstable had already created, skip waiting for major generated and reporting chksum", K(start_scn), K(commit_scn),
K(sstable_already_created), K(is_delay_build_major));
} else if (OB_FAIL(schedule_merge_task(start_scn, commit_scn, true/*wait_major_generate*/))) {
} else if (OB_FAIL(schedule_merge_task(start_scn, commit_scn, true/*wait_major_generate*/, false/*is_replay*/))) {
LOG_WARN("schedule merge task and wait real major generate", K(ret),
K(is_remote_write), K(sstable_already_created), K(start_scn), K(commit_scn));
} else if (lob_mgr_handle_.is_valid() &&
OB_FAIL(lob_mgr_handle_.get_full_obj()->schedule_merge_task(start_scn, commit_scn, true/*wait_major_generate*/))) {
OB_FAIL(lob_mgr_handle_.get_full_obj()->schedule_merge_task(start_scn, commit_scn, true/*wait_major_generate*/, false/*is_replay*/))) {
LOG_WARN("schedule merge task and wait real major generate for lob failed", K(ret),
K(is_remote_write), K(sstable_already_created), K(start_scn), K(commit_scn));
} else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, tablet_id_, new_tablet_handle))) {
@ -2403,7 +2403,8 @@ int ObTabletFullDirectLoadMgr::commit(
const share::SCN &start_scn,
const share::SCN &commit_scn,
const uint64_t table_id,
const int64_t ddl_task_id)
const int64_t ddl_task_id,
const bool is_replay)
{
int ret = OB_SUCCESS;
ObDDLKvMgrHandle ddl_kv_mgr_handle;
@ -2435,7 +2436,7 @@ int ObTabletFullDirectLoadMgr::commit(
}
if (OB_SUCC(ret)) {
if (OB_FAIL(schedule_merge_task(start_scn, commit_scn, false/*wait_major_generate*/))) {
if (OB_FAIL(schedule_merge_task(start_scn, commit_scn, false/*wait_major_generate*/, is_replay))) {
LOG_WARN("schedule major merge task failed", K(ret));
}
}
@ -2456,22 +2457,26 @@ int ObTabletFullDirectLoadMgr::commit(
LOG_ERROR("ls should not be null", K(ret));
} else if (OB_FAIL(ls->get_tablet(lob_tablet_id, lob_tablet_handle, ObTabletCommon::DEFAULT_GET_TABLET_DURATION_US, ObMDSGetTabletMode::READ_WITHOUT_CHECK))) {
LOG_WARN("get tablet handle failed", K(ret), K(ls_id), K(lob_tablet_id));
} else if (OB_FAIL(lob_mgr_handle_.get_full_obj()->commit(*lob_tablet_handle.get_obj(), start_scn, commit_scn))) {
} else if (OB_FAIL(lob_mgr_handle_.get_full_obj()->commit(*lob_tablet_handle.get_obj(), start_scn, commit_scn, table_id, ddl_task_id, is_replay))) {
LOG_WARN("commit for lob failed", K(ret), K(start_scn), K(commit_scn));
}
}
return ret;
}
int ObTabletFullDirectLoadMgr::schedule_merge_task(const share::SCN &start_scn, const share::SCN &commit_scn, const bool wait_major_generated)
int ObTabletFullDirectLoadMgr::schedule_merge_task(
const share::SCN &start_scn,
const share::SCN &commit_scn,
const bool wait_major_generated,
const bool is_replay)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret), K(is_inited_));
} else if (OB_UNLIKELY(!start_scn.is_valid_and_not_min() || !commit_scn.is_valid_and_not_min())) {
} else if (OB_UNLIKELY(!start_scn.is_valid_and_not_min() || !commit_scn.is_valid_and_not_min() || (is_replay && wait_major_generated))) {
ret = OB_ERR_SYS;
LOG_WARN("unknown start scn or commit snc", K(ret), K(start_scn), K(commit_scn));
LOG_WARN("unknown start scn or commit snc", K(ret), K(start_scn), K(commit_scn), K(is_replay), K(wait_major_generated));
} else {
const int64_t wait_start_ts = ObTimeUtility::fast_current_time();
while (OB_SUCC(ret)) {
@ -2492,6 +2497,9 @@ int ObTabletFullDirectLoadMgr::schedule_merge_task(const share::SCN &start_scn,
LOG_WARN("schedule ddl merge dag failed", K(ret), K(param));
} else {
ret = OB_SUCCESS;
if (is_replay) {
break;
}
}
} else if (!wait_major_generated) {
// schedule successfully and no need to wait physical major generates.

View File

@ -422,8 +422,9 @@ public:
ObTablet &tablet,
const share::SCN &start_scn,
const share::SCN &commit_scn,
const uint64_t table_id = 0,
const int64_t ddl_task_id = 0); // schedule build a major sstable
const uint64_t table_id,
const int64_t ddl_task_id,
const bool is_replay); // schedule build a major sstable
void set_commit_scn_nolock(const share::SCN &scn);
int set_commit_scn(const share::SCN &scn);
@ -441,7 +442,7 @@ public:
INHERIT_TO_STRING_KV("ObTabletDirectLoadMgr", ObTabletDirectLoadMgr, K_(start_scn), K_(commit_scn), K_(execution_id));
private:
bool is_started() { return start_scn_.is_valid_and_not_min(); }
int schedule_merge_task(const share::SCN &start_scn, const share::SCN &commit_scn, const bool wait_major_generated); // try wait build major sstable
int schedule_merge_task(const share::SCN &start_scn, const share::SCN &commit_scn, const bool wait_major_generated, const bool is_replay); // try wait build major sstable
int cleanup_unlock();
int init_ddl_table_store(const share::SCN &start_scn, const int64_t snapshot_version, const share::SCN &ddl_checkpoint_scn);
int update_major_sstable();

View File

@ -568,6 +568,30 @@ int ObTabletDDLKvMgr::check_has_effective_ddl_kv(bool &has_ddl_kv)
return ret;
}
int ObTabletDDLKvMgr::check_has_freezed_ddl_kv(bool &has_freezed_ddl_kv)
{
int ret = OB_SUCCESS;
has_freezed_ddl_kv = false;
ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObTabletDDLKvMgr is not inited", K(ret));
} else {
for (int64_t pos = head_; !has_freezed_ddl_kv && OB_SUCC(ret) && pos < tail_; ++pos) {
const int64_t idx = get_idx(pos);
ObDDLKVHandle &cur_kv_handle = ddl_kv_handles_[idx];
ObDDLKV *cur_kv = cur_kv_handle.get_obj();
if (OB_ISNULL(cur_kv)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ddl kv is null", K(ret), K(ls_id_), K(tablet_id_), KP(cur_kv), K(pos), K(head_), K(tail_));
} else if (cur_kv->is_freezed()) {
has_freezed_ddl_kv = true;
}
}
}
return ret;
}
int ObTabletDDLKvMgr::alloc_ddl_kv(
const share::SCN &start_scn,
const int64_t snapshot_version,

View File

@ -55,6 +55,7 @@ public:
const share::SCN &freeze_scn = share::SCN::min_scn()); // freeze the active ddl kv, when memtable freeze or ddl commit
int release_ddl_kvs(const share::SCN &rec_scn); // release persistent ddl kv, used in ddl merge task for free ddl kv
int check_has_effective_ddl_kv(bool &has_ddl_kv); // used in ddl log handler for checkpoint
int check_has_freezed_ddl_kv(bool &has_freezed_ddl_kv);
int64_t get_count();
void set_ddl_kv(const int64_t idx, ObDDLKVHandle &kv_handle); //for unittest
OB_INLINE void inc_ref() { ATOMIC_INC(&ref_cnt_); }