optimize ddl kv dump scheduler and ddl commit
This commit is contained in:
		@ -2453,7 +2453,8 @@ int ObRpcRemoteWriteDDLCommitLogP::process()
 | 
			
		||||
                                                 arg_.start_scn_,
 | 
			
		||||
                                                 commit_scn,
 | 
			
		||||
                                                 arg_.table_id_,
 | 
			
		||||
                                                 arg_.ddl_task_id_))) {
 | 
			
		||||
                                                 arg_.ddl_task_id_,
 | 
			
		||||
                                                 false/*is replay*/))) {
 | 
			
		||||
        LOG_WARN("failed to do ddl kv commit", K(ret), K(arg_));
 | 
			
		||||
      } else {
 | 
			
		||||
        result_ = commit_scn.get_val_for_tx();
 | 
			
		||||
 | 
			
		||||
@ -1177,8 +1177,10 @@ int ObTenantTabletScheduler::schedule_tablet_ddl_major_merge(
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObDDLTableMergeDagParam param;
 | 
			
		||||
  ObTabletDirectLoadMgrHandle direct_load_mgr_handle;
 | 
			
		||||
  ObDDLKvMgrHandle ddl_kv_mgr_handle;
 | 
			
		||||
  ObTenantDirectLoadMgr *tenant_direct_load_mgr = MTL(ObTenantDirectLoadMgr *);
 | 
			
		||||
  bool is_major_sstable_exist = false;
 | 
			
		||||
  bool has_freezed_ddl_kv = false;
 | 
			
		||||
  if (OB_UNLIKELY(!ls_id.is_valid() || !tablet_handle.is_valid())) {
 | 
			
		||||
    ret = OB_INVALID_ARGUMENT;
 | 
			
		||||
    LOG_WARN("invalid argument", K(ret), K(ls_id), K(tablet_handle));
 | 
			
		||||
@ -1200,15 +1202,21 @@ int ObTenantTabletScheduler::schedule_tablet_ddl_major_merge(
 | 
			
		||||
    } else {
 | 
			
		||||
      LOG_WARN("get tablet direct load mgr failed", K(ret), "tablet_id", tablet_handle.get_obj()->get_tablet_meta().tablet_id_);
 | 
			
		||||
    }
 | 
			
		||||
  } else if (OB_FAIL(tablet_handle.get_obj()->get_ddl_kv_mgr(ddl_kv_mgr_handle))) {
 | 
			
		||||
    LOG_WARN("get ddl kv mgr failed", K(ret));
 | 
			
		||||
  } else if (OB_FAIL(ddl_kv_mgr_handle.get_obj()->check_has_freezed_ddl_kv(has_freezed_ddl_kv))) {
 | 
			
		||||
    LOG_WARN("check has freezed ddl kv failed", K(ret));
 | 
			
		||||
  } else if (OB_FAIL(direct_load_mgr_handle.get_full_obj()->prepare_ddl_merge_param(*tablet_handle.get_obj(), param))) {
 | 
			
		||||
    if (OB_EAGAIN != ret) {
 | 
			
		||||
      LOG_WARN("prepare major merge param failed", K(ret), "tablet_id", tablet_handle.get_obj()->get_tablet_meta().tablet_id_);
 | 
			
		||||
    }
 | 
			
		||||
  } else if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) {
 | 
			
		||||
  } else if (has_freezed_ddl_kv || param.is_commit_) {
 | 
			
		||||
    if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_ddl_table_merge_dag(param))) {
 | 
			
		||||
      if (OB_SIZE_OVERFLOW != ret && OB_EAGAIN != ret) {
 | 
			
		||||
        LOG_WARN("schedule ddl merge dag failed", K(ret), K(param));
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -428,7 +428,7 @@ int ObDDLCommitReplayExecutor::do_replay_(ObTabletHandle &tablet_handle) //TODO(
 | 
			
		||||
  } else if (OB_ISNULL(data_direct_load_mgr = direct_load_mgr_handle.get_full_obj())) {
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("unexpected err", K(ret), K(table_key));
 | 
			
		||||
  } else if (OB_FAIL(data_direct_load_mgr->commit(*tablet_handle.get_obj(), log_->get_start_scn(), scn_, false/*wait_major_generate*/))) {
 | 
			
		||||
  } else if (OB_FAIL(data_direct_load_mgr->commit(*tablet_handle.get_obj(), log_->get_start_scn(), scn_, 0/*unused table_id*/, 0/*unused ddl_task_id*/, true/*is replay*/))) {
 | 
			
		||||
    if (OB_TABLET_NOT_EXIST == ret || OB_TASK_EXPIRED == ret) {
 | 
			
		||||
      ret = OB_SUCCESS; // exit when tablet not exist or task expired
 | 
			
		||||
    } else {
 | 
			
		||||
 | 
			
		||||
@ -2155,7 +2155,7 @@ int ObTabletFullDirectLoadMgr::close(const int64_t execution_id, const SCN &star
 | 
			
		||||
    ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
    LOG_WARN("unexpected err", K(ret), KPC(this));
 | 
			
		||||
  } else if (OB_FAIL(commit(*tablet_handle.get_obj(), start_scn, commit_scn,
 | 
			
		||||
      sqc_build_ctx_.build_param_.runtime_only_param_.table_id_, sqc_build_ctx_.build_param_.runtime_only_param_.task_id_))) {
 | 
			
		||||
      sqc_build_ctx_.build_param_.runtime_only_param_.table_id_, sqc_build_ctx_.build_param_.runtime_only_param_.task_id_, false/*is replay*/))) {
 | 
			
		||||
    LOG_WARN("failed to do ddl kv commit", K(ret), KPC(this));
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
@ -2163,11 +2163,11 @@ int ObTabletFullDirectLoadMgr::close(const int64_t execution_id, const SCN &star
 | 
			
		||||
  } else if (sstable_already_created || is_delay_build_major) {
 | 
			
		||||
    LOG_INFO("sstable had already created, skip waiting for major generated and reporting chksum", K(start_scn), K(commit_scn),
 | 
			
		||||
        K(sstable_already_created), K(is_delay_build_major));
 | 
			
		||||
  } else if (OB_FAIL(schedule_merge_task(start_scn, commit_scn, true/*wait_major_generate*/))) {
 | 
			
		||||
  } else if (OB_FAIL(schedule_merge_task(start_scn, commit_scn, true/*wait_major_generate*/, false/*is_replay*/))) {
 | 
			
		||||
    LOG_WARN("schedule merge task and wait real major generate", K(ret),
 | 
			
		||||
        K(is_remote_write), K(sstable_already_created), K(start_scn), K(commit_scn));
 | 
			
		||||
  } else if (lob_mgr_handle_.is_valid() &&
 | 
			
		||||
      OB_FAIL(lob_mgr_handle_.get_full_obj()->schedule_merge_task(start_scn, commit_scn, true/*wait_major_generate*/))) {
 | 
			
		||||
      OB_FAIL(lob_mgr_handle_.get_full_obj()->schedule_merge_task(start_scn, commit_scn, true/*wait_major_generate*/, false/*is_replay*/))) {
 | 
			
		||||
    LOG_WARN("schedule merge task and wait real major generate for lob failed", K(ret),
 | 
			
		||||
        K(is_remote_write), K(sstable_already_created), K(start_scn), K(commit_scn));
 | 
			
		||||
  } else if (OB_FAIL(ObDDLUtil::ddl_get_tablet(ls_handle, tablet_id_, new_tablet_handle))) {
 | 
			
		||||
@ -2403,7 +2403,8 @@ int ObTabletFullDirectLoadMgr::commit(
 | 
			
		||||
    const share::SCN &start_scn,
 | 
			
		||||
    const share::SCN &commit_scn,
 | 
			
		||||
    const uint64_t table_id,
 | 
			
		||||
    const int64_t ddl_task_id)
 | 
			
		||||
    const int64_t ddl_task_id,
 | 
			
		||||
    const bool is_replay)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  ObDDLKvMgrHandle ddl_kv_mgr_handle;
 | 
			
		||||
@ -2435,7 +2436,7 @@ int ObTabletFullDirectLoadMgr::commit(
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (OB_SUCC(ret)) {
 | 
			
		||||
      if (OB_FAIL(schedule_merge_task(start_scn, commit_scn, false/*wait_major_generate*/))) {
 | 
			
		||||
      if (OB_FAIL(schedule_merge_task(start_scn, commit_scn, false/*wait_major_generate*/, is_replay))) {
 | 
			
		||||
        LOG_WARN("schedule major merge task failed", K(ret));
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
@ -2456,22 +2457,26 @@ int ObTabletFullDirectLoadMgr::commit(
 | 
			
		||||
      LOG_ERROR("ls should not be null", K(ret));
 | 
			
		||||
    } else if (OB_FAIL(ls->get_tablet(lob_tablet_id, lob_tablet_handle, ObTabletCommon::DEFAULT_GET_TABLET_DURATION_US, ObMDSGetTabletMode::READ_WITHOUT_CHECK))) {
 | 
			
		||||
      LOG_WARN("get tablet handle failed", K(ret), K(ls_id), K(lob_tablet_id));
 | 
			
		||||
    } else if (OB_FAIL(lob_mgr_handle_.get_full_obj()->commit(*lob_tablet_handle.get_obj(), start_scn, commit_scn))) {
 | 
			
		||||
    } else if (OB_FAIL(lob_mgr_handle_.get_full_obj()->commit(*lob_tablet_handle.get_obj(), start_scn, commit_scn, table_id, ddl_task_id, is_replay))) {
 | 
			
		||||
      LOG_WARN("commit for lob failed", K(ret), K(start_scn), K(commit_scn));
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObTabletFullDirectLoadMgr::schedule_merge_task(const share::SCN &start_scn, const share::SCN &commit_scn, const bool wait_major_generated)
 | 
			
		||||
int ObTabletFullDirectLoadMgr::schedule_merge_task(
 | 
			
		||||
    const share::SCN &start_scn,
 | 
			
		||||
    const share::SCN &commit_scn,
 | 
			
		||||
    const bool wait_major_generated,
 | 
			
		||||
    const bool is_replay)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (OB_UNLIKELY(!is_inited_)) {
 | 
			
		||||
    ret = OB_NOT_INIT;
 | 
			
		||||
    LOG_WARN("not init", K(ret), K(is_inited_));
 | 
			
		||||
  } else if (OB_UNLIKELY(!start_scn.is_valid_and_not_min() || !commit_scn.is_valid_and_not_min())) {
 | 
			
		||||
  } else if (OB_UNLIKELY(!start_scn.is_valid_and_not_min() || !commit_scn.is_valid_and_not_min() || (is_replay && wait_major_generated))) {
 | 
			
		||||
    ret = OB_ERR_SYS;
 | 
			
		||||
    LOG_WARN("unknown start scn or commit snc", K(ret), K(start_scn), K(commit_scn));
 | 
			
		||||
    LOG_WARN("unknown start scn or commit snc", K(ret), K(start_scn), K(commit_scn), K(is_replay), K(wait_major_generated));
 | 
			
		||||
  } else {
 | 
			
		||||
    const int64_t wait_start_ts = ObTimeUtility::fast_current_time();
 | 
			
		||||
    while (OB_SUCC(ret)) {
 | 
			
		||||
@ -2492,6 +2497,9 @@ int ObTabletFullDirectLoadMgr::schedule_merge_task(const share::SCN &start_scn,
 | 
			
		||||
            LOG_WARN("schedule ddl merge dag failed", K(ret), K(param));
 | 
			
		||||
          } else {
 | 
			
		||||
            ret = OB_SUCCESS;
 | 
			
		||||
            if (is_replay) {
 | 
			
		||||
              break;
 | 
			
		||||
            }
 | 
			
		||||
          }
 | 
			
		||||
        } else if (!wait_major_generated) {
 | 
			
		||||
          // schedule successfully and no need to wait physical major generates.
 | 
			
		||||
 | 
			
		||||
@ -422,8 +422,9 @@ public:
 | 
			
		||||
      ObTablet &tablet,
 | 
			
		||||
      const share::SCN &start_scn,
 | 
			
		||||
      const share::SCN &commit_scn,
 | 
			
		||||
      const uint64_t table_id = 0,
 | 
			
		||||
      const int64_t ddl_task_id = 0); // schedule build a major sstable
 | 
			
		||||
      const uint64_t table_id,
 | 
			
		||||
      const int64_t ddl_task_id,
 | 
			
		||||
      const bool is_replay); // schedule build a major sstable
 | 
			
		||||
 | 
			
		||||
  void set_commit_scn_nolock(const share::SCN &scn);
 | 
			
		||||
  int set_commit_scn(const share::SCN &scn);
 | 
			
		||||
@ -441,7 +442,7 @@ public:
 | 
			
		||||
  INHERIT_TO_STRING_KV("ObTabletDirectLoadMgr", ObTabletDirectLoadMgr, K_(start_scn), K_(commit_scn), K_(execution_id));
 | 
			
		||||
private:
 | 
			
		||||
  bool is_started() { return start_scn_.is_valid_and_not_min(); }
 | 
			
		||||
  int schedule_merge_task(const share::SCN &start_scn, const share::SCN &commit_scn, const bool wait_major_generated); // try wait build major sstable
 | 
			
		||||
  int schedule_merge_task(const share::SCN &start_scn, const share::SCN &commit_scn, const bool wait_major_generated, const bool is_replay); // try wait build major sstable
 | 
			
		||||
  int cleanup_unlock();
 | 
			
		||||
  int init_ddl_table_store(const share::SCN &start_scn, const int64_t snapshot_version, const share::SCN &ddl_checkpoint_scn);
 | 
			
		||||
  int update_major_sstable();
 | 
			
		||||
 | 
			
		||||
@ -568,6 +568,30 @@ int ObTabletDDLKvMgr::check_has_effective_ddl_kv(bool &has_ddl_kv)
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObTabletDDLKvMgr::check_has_freezed_ddl_kv(bool &has_freezed_ddl_kv)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  has_freezed_ddl_kv = false;
 | 
			
		||||
  ObLatchRGuard guard(lock_, ObLatchIds::TABLET_DDL_KV_MGR_LOCK);
 | 
			
		||||
  if (IS_NOT_INIT) {
 | 
			
		||||
    ret = OB_NOT_INIT;
 | 
			
		||||
    LOG_WARN("ObTabletDDLKvMgr is not inited", K(ret));
 | 
			
		||||
  } else {
 | 
			
		||||
    for (int64_t pos = head_; !has_freezed_ddl_kv && OB_SUCC(ret) && pos < tail_; ++pos) {
 | 
			
		||||
      const int64_t idx = get_idx(pos);
 | 
			
		||||
      ObDDLKVHandle &cur_kv_handle = ddl_kv_handles_[idx];
 | 
			
		||||
      ObDDLKV *cur_kv = cur_kv_handle.get_obj();
 | 
			
		||||
      if (OB_ISNULL(cur_kv)) {
 | 
			
		||||
        ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
        LOG_WARN("ddl kv is null", K(ret), K(ls_id_), K(tablet_id_), KP(cur_kv), K(pos), K(head_), K(tail_));
 | 
			
		||||
      } else if (cur_kv->is_freezed()) {
 | 
			
		||||
        has_freezed_ddl_kv = true;
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObTabletDDLKvMgr::alloc_ddl_kv(
 | 
			
		||||
    const share::SCN &start_scn,
 | 
			
		||||
    const int64_t snapshot_version,
 | 
			
		||||
 | 
			
		||||
@ -55,6 +55,7 @@ public:
 | 
			
		||||
      const share::SCN &freeze_scn = share::SCN::min_scn()); // freeze the active ddl kv, when memtable freeze or ddl commit
 | 
			
		||||
  int release_ddl_kvs(const share::SCN &rec_scn); // release persistent ddl kv, used in ddl merge task for free ddl kv
 | 
			
		||||
  int check_has_effective_ddl_kv(bool &has_ddl_kv); // used in ddl log handler for checkpoint
 | 
			
		||||
  int check_has_freezed_ddl_kv(bool &has_freezed_ddl_kv);
 | 
			
		||||
  int64_t get_count();
 | 
			
		||||
  void set_ddl_kv(const int64_t idx, ObDDLKVHandle &kv_handle); //for unittest
 | 
			
		||||
  OB_INLINE void inc_ref() { ATOMIC_INC(&ref_cnt_); }
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user