alloc ctx after running & fix diagnose too_many_sstables
This commit is contained in:
@ -2297,14 +2297,15 @@ int ObTenantDagScheduler::get_max_major_finish_time(const int64_t version, int64
|
||||
ObThreadCondGuard guard(scheduler_sync_);
|
||||
ObIDag *head = dag_list_[READY_DAG_LIST].get_head(ObDagPrio::DAG_PRIO_COMPACTION_LOW);
|
||||
ObIDag *cur = head->get_next();
|
||||
compaction::ObTabletMergeCtx *ctx = nullptr;
|
||||
while (head != cur) {
|
||||
if (ObDagType::DAG_TYPE_MAJOR_MERGE == cur->get_type()) {
|
||||
dag = static_cast<compaction::ObTabletMergeDag *>(cur);
|
||||
if (ObIDag::DAG_STATUS_NODE_RUNNING == dag->get_dag_status()) {
|
||||
if (dag->get_ctx().param_.merge_version_ == version) {
|
||||
if (OB_NOT_NULL(dag->get_ctx().merge_progress_)
|
||||
&& dag->get_ctx().merge_progress_->get_estimated_finish_time() > estimated_finish_time) {
|
||||
estimated_finish_time = dag->get_ctx().merge_progress_->get_estimated_finish_time();
|
||||
if (nullptr != (ctx = dag->get_ctx()) && ctx->param_.merge_version_ == version) {
|
||||
if (OB_NOT_NULL(ctx->merge_progress_)
|
||||
&& ctx->merge_progress_->get_estimated_finish_time() > estimated_finish_time) {
|
||||
estimated_finish_time = ctx->merge_progress_->get_estimated_finish_time();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -859,8 +859,9 @@ int ObCompactionDiagnoseMgr::diagnose_tenant_tablet()
|
||||
if (OB_TMP_FAIL(get_suspect_info_and_print(MEDIUM_MERGE, share::ObLSID(INT64_MAX), ObTabletID(INT64_MAX)))) {
|
||||
LOG_WARN("failed get tenant merge suspect info", K(tmp_ret));
|
||||
}
|
||||
if (scheduler->could_major_merge_start() && can_add_diagnose_info()
|
||||
&& scheduler->get_prohibit_medium_ls_map().get_cnt() > 0) {
|
||||
if ((!scheduler->could_major_merge_start()
|
||||
|| scheduler->get_prohibit_medium_ls_map().get_transfer_flag_cnt() > 0)
|
||||
&& can_add_diagnose_info()) {
|
||||
SET_DIAGNOSE_INFO(
|
||||
info_array_[idx_++],
|
||||
!scheduler->could_major_merge_start() ? MAJOR_MERGE : MEDIUM_MERGE,
|
||||
|
||||
@ -157,6 +157,11 @@ int ObPartitionMergePolicy::get_medium_merge_tables(
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObPartitionMergePolicy::is_sstable_count_not_safe(const int64_t minor_table_cnt)
|
||||
{
|
||||
return minor_table_cnt >= MAX_SSTABLE_CNT_IN_STORAGE;
|
||||
}
|
||||
|
||||
int ObPartitionMergePolicy::get_mini_merge_tables(
|
||||
const ObGetMergeTablesParam ¶m,
|
||||
ObLS &ls,
|
||||
@ -180,7 +185,7 @@ int ObPartitionMergePolicy::get_mini_merge_tables(
|
||||
} else if (OB_UNLIKELY(nullptr == tablet.get_memtable_mgr() || !table_store_wrapper.get_member()->is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("get unexpected null memtable mgr from tablet or invalid table store", K(ret), K(tablet), K(table_store_wrapper));
|
||||
} else if (table_store_wrapper.get_member()->get_minor_sstables().count() >= MAX_SSTABLE_CNT_IN_STORAGE) {
|
||||
} else if (is_sstable_count_not_safe(table_store_wrapper.get_member()->get_minor_sstables().count())) {
|
||||
ret = OB_SIZE_OVERFLOW;
|
||||
LOG_ERROR("Too many sstables, delay mini merge until sstable count falls below MAX_SSTABLE_CNT",
|
||||
K(ret), K(PRINT_TS_WRAPPER(table_store_wrapper)), K(tablet));
|
||||
|
||||
@ -100,7 +100,7 @@ public:
|
||||
ObLS &ls,
|
||||
const ObTablet &tablet,
|
||||
ObVersionRange &result_version_range);
|
||||
|
||||
static bool is_sstable_count_not_safe(const int64_t minor_table_cnt);
|
||||
private:
|
||||
static int find_mini_merge_tables(
|
||||
const storage::ObGetMergeTablesParam ¶m,
|
||||
|
||||
@ -209,7 +209,7 @@ int ObPartitionMergeProgress::estimate(ObTabletMergeCtx *ctx)
|
||||
update_estimated_finish_time_();
|
||||
if (ctx->param_.is_tenant_major_merge_) {
|
||||
if (OB_FAIL(MTL(ObTenantCompactionProgressMgr*)->update_progress(
|
||||
merge_dag_->get_ctx().param_.merge_version_,
|
||||
merge_dag_->get_ctx()->param_.merge_version_,
|
||||
estimate_occupy_size_ - old_major_data_size, // estimate_occupy_size_delta
|
||||
0, // scanned_data_size_delta
|
||||
0, // output_block_cnt_delta
|
||||
@ -218,7 +218,7 @@ int ObPartitionMergeProgress::estimate(ObTabletMergeCtx *ctx)
|
||||
LOG_WARN("failed to update progress", K(ret), K(old_major_data_size));
|
||||
} else {
|
||||
LOG_DEBUG("init() success to update progress", K(ret),
|
||||
"param", merge_dag_->get_ctx().param_, K_(estimate_row_cnt), K_(estimate_occupy_size),
|
||||
"param", merge_dag_->get_ctx()->param_, K_(estimate_row_cnt), K_(estimate_occupy_size),
|
||||
K(old_major_data_size));
|
||||
}
|
||||
}
|
||||
@ -398,7 +398,7 @@ int ObPartitionMajorMergeProgress::update_merge_progress(
|
||||
update_estimated_finish_time_();
|
||||
|
||||
if (OB_FAIL(MTL(ObTenantCompactionProgressMgr*)->update_progress(
|
||||
merge_dag_->get_ctx().param_.merge_version_,
|
||||
merge_dag_->get_ctx()->param_.merge_version_,
|
||||
0, // estimate_occupy_size_delta
|
||||
scan_data_size_delta,
|
||||
output_block_cnt_delta,
|
||||
@ -407,7 +407,7 @@ int ObPartitionMajorMergeProgress::update_merge_progress(
|
||||
LOG_WARN("failed to update progress", K(ret), K(idx), K(scan_data_size_delta), K(output_block_cnt_delta));
|
||||
} else {
|
||||
LOG_DEBUG("update() success to update progress", K(ret),
|
||||
"param", merge_dag_->get_ctx().param_, K(scan_data_size_delta), K(output_block_cnt_delta));
|
||||
"param", merge_dag_->get_ctx()->param_, K(scan_data_size_delta), K(output_block_cnt_delta));
|
||||
}
|
||||
ATOMIC_STORE(&is_updating_, false);
|
||||
}
|
||||
@ -423,22 +423,22 @@ int ObPartitionMajorMergeProgress::finish_merge_progress(const int64_t output_cn
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObPartitionMajorMergeProgress not inited", K(ret));
|
||||
} else if (OB_FAIL(MTL(ObTenantCompactionProgressMgr*)->update_progress(
|
||||
merge_dag_->get_ctx().param_.merge_version_,
|
||||
merge_dag_->get_ctx()->param_.merge_version_,
|
||||
0, // estimate_occupy_size_delta
|
||||
estimate_occupy_size_ - pre_scanned_row_cnt_ * avg_row_length_,// scanned_data_size_delta
|
||||
output_cnt - pre_output_block_cnt_,// output_block_cnt_delta
|
||||
estimated_finish_time_,
|
||||
true/*finish_flag*/,
|
||||
&merge_dag_->get_ctx().time_guard_))) {
|
||||
&merge_dag_->get_ctx()->time_guard_))) {
|
||||
LOG_WARN("failed to update progress", K(ret), K(output_cnt), K(estimate_occupy_size_),
|
||||
K(pre_scanned_row_cnt_), K(avg_row_length_));
|
||||
} else if (OB_FAIL(MTL(ObTenantCompactionProgressMgr*)->update_compression_ratio(
|
||||
merge_dag_->get_ctx().param_.merge_version_,
|
||||
merge_dag_->get_ctx().merge_info_.get_sstable_merge_info()))) {
|
||||
merge_dag_->get_ctx()->param_.merge_version_,
|
||||
merge_dag_->get_ctx()->merge_info_.get_sstable_merge_info()))) {
|
||||
LOG_WARN("failed to update progress", K(ret), K(output_cnt));
|
||||
} else {
|
||||
LOG_DEBUG("finish() success to update progress", K(ret),
|
||||
"param", merge_dag_->get_ctx().param_, K(output_cnt),
|
||||
"param", merge_dag_->get_ctx()->param_, K(output_cnt),
|
||||
K(pre_scanned_row_cnt_), K(avg_row_length_));
|
||||
}
|
||||
return ret;
|
||||
|
||||
@ -895,7 +895,7 @@ int ObTabletMergeCtx::update_tablet_directly(const ObGetMergeTablesResult &get_m
|
||||
} else {
|
||||
merge_info_.get_sstable_merge_info().merge_finish_time_ = common::ObTimeUtility::fast_current_time();
|
||||
(void)generate_participant_table_info(merge_info_.get_sstable_merge_info().participant_table_info_);
|
||||
(void)merge_dag_->get_ctx().collect_running_info();
|
||||
(void)merge_dag_->get_ctx()->collect_running_info();
|
||||
|
||||
if (OB_TMP_FAIL(ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge(
|
||||
*ls_handle_.get_ls(), *new_tablet_handle.get_obj()))) {
|
||||
|
||||
@ -261,23 +261,41 @@ int ObBasicTabletMergeDag::alloc_merge_ctx()
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObBasicTabletMergeDag::prepare_merge_ctx()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (OB_FAIL(alloc_merge_ctx())) {
|
||||
LOG_WARN("failed to alloc memory for merge ctx", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ctx_->ls_handle_, ObLSGetMod::STORAGE_MOD))) {
|
||||
LOG_WARN("failed to get log stream", K(ret), K(ls_id_));
|
||||
} else {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (typeid(*this) != typeid(ObTxTableMergeDag)
|
||||
&& OB_TMP_FAIL(ctx_->init_merge_progress(param_.is_tenant_major_merge_))) {
|
||||
LOG_WARN("failed to init merge progress", K(tmp_ret), K_(param));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int ObBasicTabletMergeDag::get_tablet_and_compat_mode()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// can't get tablet_handle now! because this func is called in create dag,
|
||||
// the last compaction dag is not finished yet, tablet is in old version
|
||||
ObLSHandle tmp_ls_handle;
|
||||
ObTabletHandle tmp_tablet_handle;
|
||||
ObTabletMemberWrapper<ObTabletTableStore> table_store_wrapper;
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ctx_->ls_handle_, ObLSGetMod::STORAGE_MOD))) {
|
||||
if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, tmp_ls_handle, ObLSGetMod::STORAGE_MOD))) {
|
||||
LOG_WARN("failed to get log stream", K(ret), K(ls_id_));
|
||||
} else if (OB_FAIL(ctx_->ls_handle_.get_ls()->get_tablet_svr()->get_tablet(
|
||||
} else if (OB_FAIL(tmp_ls_handle.get_ls()->get_tablet_svr()->get_tablet(
|
||||
tablet_id_,
|
||||
tmp_tablet_handle,
|
||||
0/*timeout*/,
|
||||
ObMDSGetTabletMode::READ_ALL_COMMITED))) {
|
||||
LOG_WARN("failed to get tablet", K(ret), K(ls_id_), K(tablet_id_));
|
||||
} else if (OB_FAIL(ObTabletMergeChecker::check_need_merge(ctx_->param_.merge_type_, *tmp_tablet_handle.get_obj()))) {
|
||||
} else if (OB_FAIL(ObTabletMergeChecker::check_need_merge(param_.merge_type_, *tmp_tablet_handle.get_obj()))) {
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
LOG_WARN("failed to check need merge", K(ret));
|
||||
}
|
||||
@ -288,27 +306,20 @@ int ObBasicTabletMergeDag::get_tablet_and_compat_mode()
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && is_mini_merge(merge_type_)) {
|
||||
int64_t inc_sstable_cnt = table_store_wrapper.get_member()->get_minor_sstables().count() + 1/*major table*/;
|
||||
bool is_exist = false;
|
||||
if (OB_FAIL(MTL(ObTenantDagScheduler *)->check_dag_exist(this, is_exist))) {
|
||||
LOG_WARN("failed to check dag exist", K(ret), K_(param));
|
||||
} else if (is_exist) {
|
||||
++inc_sstable_cnt;
|
||||
}
|
||||
if (OB_SUCC(ret) && inc_sstable_cnt >= MAX_SSTABLE_CNT_IN_STORAGE) {
|
||||
} else {
|
||||
const int64_t inc_sstable_cnt = table_store_wrapper.get_member()->get_minor_sstables().count() + (is_exist ? 1 : 0);
|
||||
if (ObPartitionMergePolicy::is_sstable_count_not_safe(inc_sstable_cnt)) {
|
||||
ret = OB_TOO_MANY_SSTABLE;
|
||||
LOG_WARN("Too many sstables in tablet, cannot schdule mini compaction, retry later",
|
||||
K(ret), K_(ls_id), K_(tablet_id), K(inc_sstable_cnt), K(tmp_tablet_handle.get_obj()));
|
||||
K(ret), K_(ls_id), K_(tablet_id), K(tmp_tablet_handle.get_obj()),
|
||||
"inc_sstable_cnt", table_store_wrapper.get_member()->get_minor_sstables().count());
|
||||
ObPartitionMergePolicy::diagnose_table_count_unsafe(MINI_MERGE, *tmp_tablet_handle.get_obj());
|
||||
}
|
||||
}
|
||||
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_SUCC(ret)
|
||||
&& typeid(*this) != typeid(ObTxTableMergeDag)
|
||||
&& OB_UNLIKELY(OB_SUCCESS != (tmp_ret = ctx_->init_merge_progress(param_.is_tenant_major_merge_)))) {
|
||||
LOG_WARN("failed to init merge progress", K(tmp_ret), K_(param));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -346,7 +357,7 @@ int ObBasicTabletMergeDag::inner_init(const ObTabletMergeDagParam ¶m)
|
||||
if (is_inited_) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("cannot init twice", K(ret), K(param));
|
||||
} else if (!param.is_valid()) {
|
||||
} else if (OB_UNLIKELY(!param.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", K(ret), K(param));
|
||||
} else {
|
||||
@ -355,8 +366,6 @@ int ObBasicTabletMergeDag::inner_init(const ObTabletMergeDagParam ¶m)
|
||||
ls_id_ = param.ls_id_;
|
||||
tablet_id_ = param.tablet_id_;
|
||||
if (param.for_diagnose_) {
|
||||
} else if (OB_FAIL(alloc_merge_ctx())) {
|
||||
LOG_WARN("failed to alloc merge ctx", K(ret));
|
||||
} else if (OB_FAIL(get_tablet_and_compat_mode())) {
|
||||
LOG_WARN("failed to get tablet and compat mode", K(ret));
|
||||
}
|
||||
@ -929,7 +938,9 @@ int ObTabletMergePrepareTask::process()
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not inited", K(ret));
|
||||
} else if (OB_ISNULL(ctx = &merge_dag_->get_ctx())) {
|
||||
} else if (OB_FAIL(merge_dag_->prepare_merge_ctx())) {
|
||||
LOG_WARN("failed to alloc merge ctx", K(ret));
|
||||
} else if (OB_ISNULL(ctx = merge_dag_->get_ctx())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ctx is unexpected null", K(ret), KP(ctx), KPC(merge_dag_));
|
||||
} else if (OB_UNLIKELY(is_major_merge_type(ctx->param_.merge_type_)
|
||||
@ -1033,26 +1044,29 @@ int ObTabletMergePrepareTask::build_merge_ctx(bool &skip_rest_operation)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
skip_rest_operation = false;
|
||||
ObTabletMergeCtx &ctx = merge_dag_->get_ctx();
|
||||
const common::ObTabletID &tablet_id = ctx.param_.tablet_id_;
|
||||
ObTabletMergeCtx *ctx = nullptr;
|
||||
const common::ObTabletID &tablet_id = ctx->param_.tablet_id_;
|
||||
|
||||
// only ctx.param_ is inited, fill other fields here
|
||||
if (OB_UNLIKELY(!ctx.param_.is_valid())) {
|
||||
if (OB_ISNULL(ctx = merge_dag_->get_ctx())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ctx is unexpected null", K(ret), KP(ctx), KPC(merge_dag_));
|
||||
} else if (OB_UNLIKELY(!ctx->param_.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(ctx));
|
||||
} else if (OB_FAIL(inner_init_ctx(ctx, skip_rest_operation))) {
|
||||
LOG_WARN("fail to inner init ctx", K(ret), K(tablet_id), K(ctx));
|
||||
} else if (OB_FAIL(inner_init_ctx(*ctx, skip_rest_operation))) {
|
||||
LOG_WARN("fail to inner init ctx", K(ret), "tablet_id", ctx->param_.tablet_id_, KPC(ctx));
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret) || skip_rest_operation) {
|
||||
} else if (FALSE_IT(ctx.merge_scn_ = ctx.scn_range_.end_scn_)) {
|
||||
} else if (OB_FAIL(ctx.init_merge_info())) {
|
||||
LOG_WARN("fail to init merge info", K(ret), K(tablet_id), K(ctx));
|
||||
} else if (OB_FAIL(ctx.prepare_index_tree())) {
|
||||
} else if (FALSE_IT(ctx->merge_scn_ = ctx->scn_range_.end_scn_)) {
|
||||
} else if (OB_FAIL(ctx->init_merge_info())) {
|
||||
LOG_WARN("fail to init merge info", K(ret), "tablet_id", ctx->param_.tablet_id_, KPC(ctx));
|
||||
} else if (OB_FAIL(ctx->prepare_index_tree())) {
|
||||
LOG_WARN("fail to prepare sstable index tree", K(ret), K(ctx));
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
FLOG_INFO("succeed to build merge ctx", K(tablet_id), K(ctx), K(skip_rest_operation));
|
||||
FLOG_INFO("succeed to build merge ctx", "tablet_id", ctx->param_.tablet_id_, KPC(ctx), K(skip_rest_operation));
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -1118,9 +1132,9 @@ int ObTabletMergeFinishTask::init()
|
||||
LOG_ERROR("dag type not match", K(ret), KPC(dag_));
|
||||
} else {
|
||||
merge_dag_ = static_cast<ObTabletMergeDag *>(dag_);
|
||||
if (OB_UNLIKELY(!merge_dag_->get_ctx().is_valid())) {
|
||||
if (OB_UNLIKELY(nullptr == merge_dag_->get_ctx() || !merge_dag_->get_ctx()->is_valid())) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("ctx not valid", K(ret), K(merge_dag_->get_ctx()));
|
||||
LOG_WARN("ctx not valid", K(ret), KPC(merge_dag_->get_ctx()));
|
||||
} else {
|
||||
is_inited_ = true;
|
||||
}
|
||||
@ -1132,13 +1146,13 @@ int ObTabletMergeFinishTask::init()
|
||||
int ObTabletMergeFinishTask::create_sstable_after_merge()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTabletMergeCtx &ctx = merge_dag_->get_ctx();
|
||||
if (ctx.merged_sstable_.is_valid()) {
|
||||
if (OB_UNLIKELY(!is_major_merge_type(ctx.param_.merge_type_))) {
|
||||
ObTabletMergeCtx *ctx = merge_dag_->get_ctx();
|
||||
if (ctx->merged_sstable_.is_valid()) {
|
||||
if (OB_UNLIKELY(!is_major_merge_type(ctx->param_.merge_type_))) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_ERROR("Unxpected valid merged table handle with other merge", K(ret), K(ctx));
|
||||
LOG_ERROR("Unxpected valid merged table handle with other merge", K(ret), KPC(ctx));
|
||||
}
|
||||
} else if (OB_FAIL(get_merged_sstable(ctx))) {
|
||||
} else if (OB_FAIL(get_merged_sstable(*ctx))) {
|
||||
LOG_WARN("failed to finish_merge_sstable", K(ret));
|
||||
}
|
||||
return ret;
|
||||
@ -1149,47 +1163,49 @@ int ObTabletMergeFinishTask::process()
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObTaskController::get().switch_task(share::ObTaskType::DATA_MAINTAIN);
|
||||
|
||||
ObTabletMergeCtx *ctx = nullptr;
|
||||
DEBUG_SYNC(MERGE_PARTITION_FINISH_TASK);
|
||||
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not inited yet", K(ret));
|
||||
} else if (OB_ISNULL(ctx = merge_dag_->get_ctx())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ctx is unexpected null", K(ret), KP(ctx), KPC(merge_dag_));
|
||||
} else {
|
||||
ObTabletMergeCtx &ctx = merge_dag_->get_ctx();
|
||||
ObLSID &ls_id = ctx.param_.ls_id_;
|
||||
ObTabletID &tablet_id = ctx.param_.tablet_id_;
|
||||
ObLSID &ls_id = ctx->param_.ls_id_;
|
||||
ObTabletID &tablet_id = ctx->param_.tablet_id_;
|
||||
|
||||
ctx.time_guard_.click(ObCompactionTimeGuard::EXECUTE);
|
||||
ctx->time_guard_.click(ObCompactionTimeGuard::EXECUTE);
|
||||
if (OB_FAIL(create_sstable_after_merge())) {
|
||||
LOG_WARN("failed to create sstable after merge", K(ret), K(tablet_id));
|
||||
} else if (FALSE_IT(ctx.time_guard_.click(ObCompactionTimeGuard::CREATE_SSTABLE))) {
|
||||
} else if (OB_FAIL(add_sstable_for_merge(ctx))) {
|
||||
} else if (FALSE_IT(ctx->time_guard_.click(ObCompactionTimeGuard::CREATE_SSTABLE))) {
|
||||
} else if (OB_FAIL(add_sstable_for_merge(*ctx))) {
|
||||
LOG_WARN("failed to add sstable for merge", K(ret));
|
||||
}
|
||||
if (OB_SUCC(ret) && is_major_merge_type(ctx.param_.merge_type_) && NULL != ctx.param_.report_) {
|
||||
if (OB_SUCC(ret) && is_major_merge_type(ctx->param_.merge_type_) && NULL != ctx->param_.report_) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(ctx.param_.report_->submit_tablet_update_task(MTL_ID(), ctx.param_.ls_id_, tablet_id))) {
|
||||
LOG_WARN("failed to submit tablet update task to report", K(tmp_ret), K(MTL_ID()), K(ctx.param_.ls_id_), K(tablet_id));
|
||||
} else if (OB_TMP_FAIL(ctx.ls_handle_.get_ls()->get_tablet_svr()->update_tablet_report_status(tablet_id))) {
|
||||
if (OB_TMP_FAIL(ctx->param_.report_->submit_tablet_update_task(MTL_ID(), ctx->param_.ls_id_, tablet_id))) {
|
||||
LOG_WARN("failed to submit tablet update task to report", K(tmp_ret), K(MTL_ID()), K(ctx->param_.ls_id_), K(tablet_id));
|
||||
} else if (OB_TMP_FAIL(ctx->ls_handle_.get_ls()->get_tablet_svr()->update_tablet_report_status(tablet_id))) {
|
||||
LOG_WARN("failed to update tablet report status", K(tmp_ret), K(tablet_id));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(ctx.merge_progress_)) {
|
||||
if (OB_TMP_FAIL(ctx.merge_progress_->update_merge_info(ctx.merge_info_.get_sstable_merge_info()))) {
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(ctx->merge_progress_)) {
|
||||
if (OB_TMP_FAIL(ctx->merge_progress_->update_merge_info(ctx->merge_info_.get_sstable_merge_info()))) {
|
||||
STORAGE_LOG(WARN, "fail to update update merge info", K(tmp_ret));
|
||||
}
|
||||
|
||||
if (OB_TMP_FAIL(compaction::ObCompactionSuggestionMgr::get_instance().analyze_merge_info(
|
||||
ctx.merge_info_,
|
||||
*ctx.merge_progress_))) {
|
||||
ctx->merge_info_,
|
||||
*ctx->merge_progress_))) {
|
||||
STORAGE_LOG(WARN, "fail to analyze merge info", K(tmp_ret));
|
||||
}
|
||||
ObSSTableMetaHandle sst_meta_hdl;
|
||||
if (OB_TMP_FAIL(ctx.merged_sstable_.get_meta(sst_meta_hdl))) {
|
||||
if (OB_TMP_FAIL(ctx->merged_sstable_.get_meta(sst_meta_hdl))) {
|
||||
STORAGE_LOG(WARN, "fail to get sstable meta handle", K(tmp_ret));
|
||||
} else if (OB_TMP_FAIL(ctx.merge_progress_->finish_merge_progress(
|
||||
} else if (OB_TMP_FAIL(ctx->merge_progress_->finish_merge_progress(
|
||||
sst_meta_hdl.get_sstable_meta().get_total_macro_block_count()))) {
|
||||
STORAGE_LOG(WARN, "fail to update final merge progress", K(tmp_ret));
|
||||
}
|
||||
@ -1198,15 +1214,14 @@ int ObTabletMergeFinishTask::process()
|
||||
|
||||
|
||||
if (NULL != merge_dag_) {
|
||||
ObTabletMergeCtx &ctx = merge_dag_->get_ctx();
|
||||
if (OB_FAIL(ret)) {
|
||||
FLOG_WARN("sstable merge finish", K(ret), K(ctx), "task", *(static_cast<ObITask *>(this)));
|
||||
FLOG_WARN("sstable merge finish", K(ret), KPC(ctx), "task", *(static_cast<ObITask *>(this)));
|
||||
} else {
|
||||
ctx.time_guard_.click(ObCompactionTimeGuard::DAG_FINISH);
|
||||
(void)ctx.collect_running_info();
|
||||
ctx->time_guard_.click(ObCompactionTimeGuard::DAG_FINISH);
|
||||
(void)ctx->collect_running_info();
|
||||
// ATTENTION! Critical diagnostic log, DO NOT CHANGE!!!
|
||||
FLOG_INFO("sstable merge finish", K(ret), "merge_info", ctx.get_merge_info(),
|
||||
K(ctx.merged_sstable_), "compat_mode", merge_dag_->get_compat_mode(), K(ctx.time_guard_));
|
||||
FLOG_INFO("sstable merge finish", K(ret), "merge_info", ctx->get_merge_info(),
|
||||
K(ctx->merged_sstable_), "compat_mode", merge_dag_->get_compat_mode(), K(ctx->time_guard_));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -204,7 +204,7 @@ class ObBasicTabletMergeDag: public share::ObIDag, public ObMergeDagHash
|
||||
public:
|
||||
ObBasicTabletMergeDag(const share::ObDagType::ObDagTypeEnum type);
|
||||
virtual ~ObBasicTabletMergeDag();
|
||||
ObTabletMergeCtx &get_ctx() { return *ctx_; }
|
||||
ObTabletMergeCtx *get_ctx() { return ctx_; }
|
||||
ObTabletMergeDagParam &get_param() { return param_; }
|
||||
virtual const share::ObLSID & get_ls_id() const { return param_.ls_id_; }
|
||||
virtual bool operator == (const ObIDag &other) const override;
|
||||
@ -220,6 +220,7 @@ public:
|
||||
|| OB_TABLET_NOT_EXIST == dag_ret_
|
||||
|| OB_CANCELED == dag_ret_;
|
||||
}
|
||||
int prepare_merge_ctx();
|
||||
int get_tablet_and_compat_mode();
|
||||
virtual int64_t to_string(char* buf, const int64_t buf_len) const override;
|
||||
virtual lib::Worker::CompatMode get_compat_mode() const override { return compat_mode_; }
|
||||
@ -229,7 +230,6 @@ public:
|
||||
ObBasicTabletMergeDag &merge_dag,
|
||||
ObTabletMergeCtx &ctx,
|
||||
share::ObITask *prepare_task = nullptr);
|
||||
|
||||
protected:
|
||||
int alloc_merge_ctx();
|
||||
int inner_init(const ObTabletMergeDagParam ¶m);
|
||||
|
||||
@ -708,6 +708,8 @@ int ObProhibitScheduleMediumMap::add_flag(const ObLSID &ls_id, const ProhibitFla
|
||||
if (OB_HASH_NOT_EXIST == ret) {
|
||||
if (OB_FAIL(ls_id_map_.set_refactored(ls_id, input_flag))) {
|
||||
LOG_WARN("failed to stop ls schedule medium", K(ret), K(ls_id), K(input_flag));
|
||||
} else if (TRANSFER == input_flag) {
|
||||
++transfer_flag_cnt_;
|
||||
}
|
||||
} else {
|
||||
LOG_WARN("failed to get map", K(ret), K(ls_id), K(tmp_flag));
|
||||
@ -736,6 +738,8 @@ int ObProhibitScheduleMediumMap::clear_flag(const ObLSID &ls_id, const ProhibitF
|
||||
LOG_TRACE("flag in conflict", K(ret), K(ls_id), K(tmp_flag), K(input_flag));
|
||||
} else if (OB_FAIL(ls_id_map_.erase_refactored(ls_id))) {
|
||||
LOG_WARN("failed to resume ls schedule medium", K(ret), K(ls_id), K(tmp_flag));
|
||||
} else if (TRANSFER == input_flag) {
|
||||
--transfer_flag_cnt_;
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -776,10 +780,10 @@ int64_t ObProhibitScheduleMediumMap::to_string(char *buf, const int64_t buf_len)
|
||||
return pos;
|
||||
}
|
||||
|
||||
int64_t ObProhibitScheduleMediumMap::get_cnt() const
|
||||
int64_t ObProhibitScheduleMediumMap::get_transfer_flag_cnt() const
|
||||
{
|
||||
obsys::ObRLockGuard lock_guard(lock_);
|
||||
return ls_id_map_.size();
|
||||
return transfer_flag_cnt_;
|
||||
}
|
||||
|
||||
int ObTenantTabletScheduler::stop_ls_schedule_medium(const ObLSID &ls_id)
|
||||
@ -1559,7 +1563,7 @@ int ObTenantTabletScheduler::schedule_all_tablets_medium()
|
||||
"cost_time",
|
||||
current_time - schedule_stats_.start_timestamp_);
|
||||
}
|
||||
if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL) && prohibit_medium_map_.get_cnt() > 0) {
|
||||
if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL) && prohibit_medium_map_.get_transfer_flag_cnt() > 0) {
|
||||
LOG_INFO("tenant is blocking schedule medium", KR(ret), K_(prohibit_medium_map));
|
||||
}
|
||||
|
||||
|
||||
@ -153,8 +153,9 @@ public:
|
||||
int clear_flag(const share::ObLSID &ls_id, const ProhibitFlag &input_flag);
|
||||
int add_flag(const share::ObLSID &ls_id, const ProhibitFlag &input_flag);
|
||||
int64_t to_string(char *buf, const int64_t buf_len) const;
|
||||
int64_t get_cnt() const;
|
||||
int64_t get_transfer_flag_cnt() const;
|
||||
private:
|
||||
int64_t transfer_flag_cnt_;
|
||||
mutable obsys::ObRWLock lock_;
|
||||
common::hash::ObHashMap<share::ObLSID, ProhibitFlag> ls_id_map_;
|
||||
};
|
||||
|
||||
@ -72,35 +72,39 @@ int ObMdsTableMergeTask::process()
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
DEBUG_SYNC(AFTER_EMPTY_SHELL_TABLET_CREATE);
|
||||
|
||||
ObTabletMergeCtx *ctx = nullptr;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not inited", K(ret), K_(is_inited));
|
||||
} else if (OB_FAIL(mds_merge_dag_->prepare_merge_ctx())) {
|
||||
LOG_WARN("failed to alloc merge ctx", K(ret));
|
||||
} else if (OB_ISNULL(ctx = mds_merge_dag_->get_ctx())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ctx is unexpected null", K(ret), KP(ctx), KPC(mds_merge_dag_));
|
||||
} else {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
ObLS *ls = nullptr;
|
||||
ObTablet *tablet = nullptr;
|
||||
ObTabletMergeCtx &ctx = mds_merge_dag_->get_ctx();
|
||||
const share::ObLSID &ls_id = ctx.param_.ls_id_;
|
||||
const common::ObTabletID &tablet_id = ctx.param_.tablet_id_;
|
||||
const share::ObLSID &ls_id = ctx->param_.ls_id_;
|
||||
const common::ObTabletID &tablet_id = ctx->param_.tablet_id_;
|
||||
const share::SCN &flush_scn = mds_merge_dag_->get_flush_scn();
|
||||
int64_t ls_rebuild_seq = -1;
|
||||
|
||||
if (OB_ISNULL(ls = ctx.ls_handle_.get_ls())) {
|
||||
if (OB_ISNULL(ls = ctx->ls_handle_.get_ls())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ls is null", K(ret), K(ls_id), "ls_handle", ctx.ls_handle_);
|
||||
LOG_WARN("ls is null", K(ret), K(ls_id), "ls_handle", ctx->ls_handle_);
|
||||
} else if (ls->is_offline()) {
|
||||
ret = OB_CANCELED;
|
||||
LOG_INFO("ls offline, skip merge", K(ret), K(ctx));
|
||||
} else if (OB_FAIL(ctx.get_merge_info().init(ctx, flush_scn))) {
|
||||
} else if (OB_FAIL(ctx->get_merge_info().init(*ctx, flush_scn))) {
|
||||
LOG_WARN("failed to init merge info", K(ret), K(ls_id), K(tablet_id), K(flush_scn), K(ctx));
|
||||
} else if (FALSE_IT(ctx.time_guard_.click(ObCompactionTimeGuard::DAG_WAIT_TO_SCHEDULE))) {
|
||||
} else if (MDS_FAIL(ls->get_tablet(tablet_id, ctx.tablet_handle_, 0, ObMDSGetTabletMode::READ_WITHOUT_CHECK))) {
|
||||
} else if (FALSE_IT(ctx->time_guard_.click(ObCompactionTimeGuard::DAG_WAIT_TO_SCHEDULE))) {
|
||||
} else if (MDS_FAIL(ls->get_tablet(tablet_id, ctx->tablet_handle_, 0, ObMDSGetTabletMode::READ_WITHOUT_CHECK))) {
|
||||
LOG_WARN("failed to get tablet", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (OB_ISNULL(tablet = ctx.tablet_handle_.get_obj())) {
|
||||
} else if (OB_ISNULL(tablet = ctx->tablet_handle_.get_obj())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("tablet is null", K(ret), K(ls_id), K(tablet_id), "tablet_handle", ctx.tablet_handle_);
|
||||
} else if (FALSE_IT(ctx.time_guard_.click(ObCompactionTimeGuard::GET_TABLET))) {
|
||||
LOG_WARN("tablet is null", K(ret), K(ls_id), K(tablet_id), "tablet_handle", ctx->tablet_handle_);
|
||||
} else if (FALSE_IT(ctx->time_guard_.click(ObCompactionTimeGuard::GET_TABLET))) {
|
||||
} else if (FALSE_IT(ls_rebuild_seq = ls->get_rebuild_seq())) {
|
||||
} else if (MDS_FAIL(ls->build_new_tablet_from_mds_table(ls_rebuild_seq,
|
||||
tablet_id,
|
||||
@ -108,7 +112,7 @@ int ObMdsTableMergeTask::process()
|
||||
flush_scn))) {
|
||||
LOG_WARN("failed to build new tablet from mds table", K(ret), K(ls_id), K(tablet_id), K(ls_rebuild_seq), K(flush_scn));
|
||||
} else {
|
||||
ctx.time_guard_.click(ObCompactionTimeGuard::EXECUTE);
|
||||
ctx->time_guard_.click(ObCompactionTimeGuard::EXECUTE);
|
||||
share::dag_yield();
|
||||
}
|
||||
|
||||
@ -120,9 +124,9 @@ int ObMdsTableMergeTask::process()
|
||||
ret = tmp_ret;
|
||||
}
|
||||
}
|
||||
ctx.time_guard_.click(ObCompactionTimeGuard::DAG_FINISH);
|
||||
set_merge_finish_time(ctx);
|
||||
(void)ctx.collect_running_info();
|
||||
ctx->time_guard_.click(ObCompactionTimeGuard::DAG_FINISH);
|
||||
set_merge_finish_time(*ctx);
|
||||
(void)ctx->collect_running_info();
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
||||
Reference in New Issue
Block a user