diff --git a/src/share/scheduler/ob_dag_scheduler.cpp b/src/share/scheduler/ob_dag_scheduler.cpp index 08d8ce36dd..c0ecdb6fa2 100755 --- a/src/share/scheduler/ob_dag_scheduler.cpp +++ b/src/share/scheduler/ob_dag_scheduler.cpp @@ -2297,14 +2297,15 @@ int ObTenantDagScheduler::get_max_major_finish_time(const int64_t version, int64 ObThreadCondGuard guard(scheduler_sync_); ObIDag *head = dag_list_[READY_DAG_LIST].get_head(ObDagPrio::DAG_PRIO_COMPACTION_LOW); ObIDag *cur = head->get_next(); + compaction::ObTabletMergeCtx *ctx = nullptr; while (head != cur) { if (ObDagType::DAG_TYPE_MAJOR_MERGE == cur->get_type()) { dag = static_cast(cur); if (ObIDag::DAG_STATUS_NODE_RUNNING == dag->get_dag_status()) { - if (dag->get_ctx().param_.merge_version_ == version) { - if (OB_NOT_NULL(dag->get_ctx().merge_progress_) - && dag->get_ctx().merge_progress_->get_estimated_finish_time() > estimated_finish_time) { - estimated_finish_time = dag->get_ctx().merge_progress_->get_estimated_finish_time(); + if (nullptr != (ctx = dag->get_ctx()) && ctx->param_.merge_version_ == version) { + if (OB_NOT_NULL(ctx->merge_progress_) + && ctx->merge_progress_->get_estimated_finish_time() > estimated_finish_time) { + estimated_finish_time = ctx->merge_progress_->get_estimated_finish_time(); } } } else { diff --git a/src/storage/compaction/ob_compaction_diagnose.cpp b/src/storage/compaction/ob_compaction_diagnose.cpp index f9da1bb5df..f91d08aad9 100755 --- a/src/storage/compaction/ob_compaction_diagnose.cpp +++ b/src/storage/compaction/ob_compaction_diagnose.cpp @@ -859,8 +859,9 @@ int ObCompactionDiagnoseMgr::diagnose_tenant_tablet() if (OB_TMP_FAIL(get_suspect_info_and_print(MEDIUM_MERGE, share::ObLSID(INT64_MAX), ObTabletID(INT64_MAX)))) { LOG_WARN("failed get tenant merge suspect info", K(tmp_ret)); } - if (scheduler->could_major_merge_start() && can_add_diagnose_info() - && scheduler->get_prohibit_medium_ls_map().get_cnt() > 0) { + if ((!scheduler->could_major_merge_start() + || scheduler->get_prohibit_medium_ls_map().get_transfer_flag_cnt() > 0) + && can_add_diagnose_info()) { SET_DIAGNOSE_INFO( info_array_[idx_++], !scheduler->could_major_merge_start() ? MAJOR_MERGE : MEDIUM_MERGE, diff --git a/src/storage/compaction/ob_partition_merge_policy.cpp b/src/storage/compaction/ob_partition_merge_policy.cpp index eee71ac446..8c2c5f9733 100755 --- a/src/storage/compaction/ob_partition_merge_policy.cpp +++ b/src/storage/compaction/ob_partition_merge_policy.cpp @@ -157,6 +157,11 @@ int ObPartitionMergePolicy::get_medium_merge_tables( return ret; } +bool ObPartitionMergePolicy::is_sstable_count_not_safe(const int64_t minor_table_cnt) +{ + return minor_table_cnt >= MAX_SSTABLE_CNT_IN_STORAGE; +} + int ObPartitionMergePolicy::get_mini_merge_tables( const ObGetMergeTablesParam ¶m, ObLS &ls, @@ -180,7 +185,7 @@ int ObPartitionMergePolicy::get_mini_merge_tables( } else if (OB_UNLIKELY(nullptr == tablet.get_memtable_mgr() || !table_store_wrapper.get_member()->is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get unexpected null memtable mgr from tablet or invalid table store", K(ret), K(tablet), K(table_store_wrapper)); - } else if (table_store_wrapper.get_member()->get_minor_sstables().count() >= MAX_SSTABLE_CNT_IN_STORAGE) { + } else if (is_sstable_count_not_safe(table_store_wrapper.get_member()->get_minor_sstables().count())) { ret = OB_SIZE_OVERFLOW; LOG_ERROR("Too many sstables, delay mini merge until sstable count falls below MAX_SSTABLE_CNT", K(ret), K(PRINT_TS_WRAPPER(table_store_wrapper)), K(tablet)); diff --git a/src/storage/compaction/ob_partition_merge_policy.h b/src/storage/compaction/ob_partition_merge_policy.h index fb0350a099..5a71423ff6 100644 --- a/src/storage/compaction/ob_partition_merge_policy.h +++ b/src/storage/compaction/ob_partition_merge_policy.h @@ -100,7 +100,7 @@ public: ObLS &ls, const ObTablet &tablet, ObVersionRange &result_version_range); - + static bool is_sstable_count_not_safe(const int64_t minor_table_cnt); private: static int find_mini_merge_tables( const storage::ObGetMergeTablesParam ¶m, diff --git a/src/storage/compaction/ob_partition_merge_progress.cpp b/src/storage/compaction/ob_partition_merge_progress.cpp index da2c7579b1..d94889ab39 100644 --- a/src/storage/compaction/ob_partition_merge_progress.cpp +++ b/src/storage/compaction/ob_partition_merge_progress.cpp @@ -209,7 +209,7 @@ int ObPartitionMergeProgress::estimate(ObTabletMergeCtx *ctx) update_estimated_finish_time_(); if (ctx->param_.is_tenant_major_merge_) { if (OB_FAIL(MTL(ObTenantCompactionProgressMgr*)->update_progress( - merge_dag_->get_ctx().param_.merge_version_, + merge_dag_->get_ctx()->param_.merge_version_, estimate_occupy_size_ - old_major_data_size, // estimate_occupy_size_delta 0, // scanned_data_size_delta 0, // output_block_cnt_delta @@ -218,7 +218,7 @@ int ObPartitionMergeProgress::estimate(ObTabletMergeCtx *ctx) LOG_WARN("failed to update progress", K(ret), K(old_major_data_size)); } else { LOG_DEBUG("init() success to update progress", K(ret), - "param", merge_dag_->get_ctx().param_, K_(estimate_row_cnt), K_(estimate_occupy_size), + "param", merge_dag_->get_ctx()->param_, K_(estimate_row_cnt), K_(estimate_occupy_size), K(old_major_data_size)); } } @@ -398,7 +398,7 @@ int ObPartitionMajorMergeProgress::update_merge_progress( update_estimated_finish_time_(); if (OB_FAIL(MTL(ObTenantCompactionProgressMgr*)->update_progress( - merge_dag_->get_ctx().param_.merge_version_, + merge_dag_->get_ctx()->param_.merge_version_, 0, // estimate_occupy_size_delta scan_data_size_delta, output_block_cnt_delta, @@ -407,7 +407,7 @@ int ObPartitionMajorMergeProgress::update_merge_progress( LOG_WARN("failed to update progress", K(ret), K(idx), K(scan_data_size_delta), K(output_block_cnt_delta)); } else { LOG_DEBUG("update() success to update progress", K(ret), - "param", merge_dag_->get_ctx().param_, K(scan_data_size_delta), K(output_block_cnt_delta)); + "param", merge_dag_->get_ctx()->param_, K(scan_data_size_delta), K(output_block_cnt_delta)); } ATOMIC_STORE(&is_updating_, false); } @@ -423,22 +423,22 @@ int ObPartitionMajorMergeProgress::finish_merge_progress(const int64_t output_cn ret = OB_NOT_INIT; LOG_WARN("ObPartitionMajorMergeProgress not inited", K(ret)); } else if (OB_FAIL(MTL(ObTenantCompactionProgressMgr*)->update_progress( - merge_dag_->get_ctx().param_.merge_version_, + merge_dag_->get_ctx()->param_.merge_version_, 0, // estimate_occupy_size_delta estimate_occupy_size_ - pre_scanned_row_cnt_ * avg_row_length_,// scanned_data_size_delta output_cnt - pre_output_block_cnt_,// output_block_cnt_delta estimated_finish_time_, true/*finish_flag*/, - &merge_dag_->get_ctx().time_guard_))) { + &merge_dag_->get_ctx()->time_guard_))) { LOG_WARN("failed to update progress", K(ret), K(output_cnt), K(estimate_occupy_size_), K(pre_scanned_row_cnt_), K(avg_row_length_)); } else if (OB_FAIL(MTL(ObTenantCompactionProgressMgr*)->update_compression_ratio( - merge_dag_->get_ctx().param_.merge_version_, - merge_dag_->get_ctx().merge_info_.get_sstable_merge_info()))) { + merge_dag_->get_ctx()->param_.merge_version_, + merge_dag_->get_ctx()->merge_info_.get_sstable_merge_info()))) { LOG_WARN("failed to update progress", K(ret), K(output_cnt)); } else { LOG_DEBUG("finish() success to update progress", K(ret), - "param", merge_dag_->get_ctx().param_, K(output_cnt), + "param", merge_dag_->get_ctx()->param_, K(output_cnt), K(pre_scanned_row_cnt_), K(avg_row_length_)); } return ret; diff --git a/src/storage/compaction/ob_tablet_merge_ctx.cpp b/src/storage/compaction/ob_tablet_merge_ctx.cpp index 9c7dcd2e5d..2712e565d9 100755 --- a/src/storage/compaction/ob_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_tablet_merge_ctx.cpp @@ -895,7 +895,7 @@ int ObTabletMergeCtx::update_tablet_directly(const ObGetMergeTablesResult &get_m } else { merge_info_.get_sstable_merge_info().merge_finish_time_ = common::ObTimeUtility::fast_current_time(); (void)generate_participant_table_info(merge_info_.get_sstable_merge_info().participant_table_info_); - (void)merge_dag_->get_ctx().collect_running_info(); + (void)merge_dag_->get_ctx()->collect_running_info(); if (OB_TMP_FAIL(ObMediumCompactionScheduleFunc::schedule_tablet_medium_merge( *ls_handle_.get_ls(), *new_tablet_handle.get_obj()))) { diff --git a/src/storage/compaction/ob_tablet_merge_task.cpp b/src/storage/compaction/ob_tablet_merge_task.cpp index 91d809aeab..c1b94222db 100755 --- a/src/storage/compaction/ob_tablet_merge_task.cpp +++ b/src/storage/compaction/ob_tablet_merge_task.cpp @@ -261,23 +261,41 @@ int ObBasicTabletMergeDag::alloc_merge_ctx() } return ret; } + +int ObBasicTabletMergeDag::prepare_merge_ctx() +{ + int ret = OB_SUCCESS; + + if (OB_FAIL(alloc_merge_ctx())) { + LOG_WARN("failed to alloc memory for merge ctx", K(ret), KPC(this)); + } else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ctx_->ls_handle_, ObLSGetMod::STORAGE_MOD))) { + LOG_WARN("failed to get log stream", K(ret), K(ls_id_)); + } else { + int tmp_ret = OB_SUCCESS; + if (typeid(*this) != typeid(ObTxTableMergeDag) + && OB_TMP_FAIL(ctx_->init_merge_progress(param_.is_tenant_major_merge_))) { + LOG_WARN("failed to init merge progress", K(tmp_ret), K_(param)); + } + } + return ret; +} int ObBasicTabletMergeDag::get_tablet_and_compat_mode() { int ret = OB_SUCCESS; // can't get tablet_handle now! because this func is called in create dag, // the last compaction dag is not finished yet, tablet is in old version + ObLSHandle tmp_ls_handle; ObTabletHandle tmp_tablet_handle; ObTabletMemberWrapper table_store_wrapper; - if (OB_FAIL(ret)) { - } else if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, ctx_->ls_handle_, ObLSGetMod::STORAGE_MOD))) { + if (OB_FAIL(MTL(ObLSService *)->get_ls(ls_id_, tmp_ls_handle, ObLSGetMod::STORAGE_MOD))) { LOG_WARN("failed to get log stream", K(ret), K(ls_id_)); - } else if (OB_FAIL(ctx_->ls_handle_.get_ls()->get_tablet_svr()->get_tablet( + } else if (OB_FAIL(tmp_ls_handle.get_ls()->get_tablet_svr()->get_tablet( tablet_id_, tmp_tablet_handle, 0/*timeout*/, ObMDSGetTabletMode::READ_ALL_COMMITED))) { LOG_WARN("failed to get tablet", K(ret), K(ls_id_), K(tablet_id_)); - } else if (OB_FAIL(ObTabletMergeChecker::check_need_merge(ctx_->param_.merge_type_, *tmp_tablet_handle.get_obj()))) { + } else if (OB_FAIL(ObTabletMergeChecker::check_need_merge(param_.merge_type_, *tmp_tablet_handle.get_obj()))) { if (OB_NO_NEED_MERGE != ret) { LOG_WARN("failed to check need merge", K(ret)); } @@ -288,27 +306,20 @@ int ObBasicTabletMergeDag::get_tablet_and_compat_mode() } if (OB_SUCC(ret) && is_mini_merge(merge_type_)) { - int64_t inc_sstable_cnt = table_store_wrapper.get_member()->get_minor_sstables().count() + 1/*major table*/; bool is_exist = false; if (OB_FAIL(MTL(ObTenantDagScheduler *)->check_dag_exist(this, is_exist))) { LOG_WARN("failed to check dag exist", K(ret), K_(param)); - } else if (is_exist) { - ++inc_sstable_cnt; - } - if (OB_SUCC(ret) && inc_sstable_cnt >= MAX_SSTABLE_CNT_IN_STORAGE) { - ret = OB_TOO_MANY_SSTABLE; - LOG_WARN("Too many sstables in tablet, cannot schdule mini compaction, retry later", - K(ret), K_(ls_id), K_(tablet_id), K(inc_sstable_cnt), K(tmp_tablet_handle.get_obj())); + } else { + const int64_t inc_sstable_cnt = table_store_wrapper.get_member()->get_minor_sstables().count() + (is_exist ? 1 : 0); + if (ObPartitionMergePolicy::is_sstable_count_not_safe(inc_sstable_cnt)) { + ret = OB_TOO_MANY_SSTABLE; + LOG_WARN("Too many sstables in tablet, cannot schdule mini compaction, retry later", + K(ret), K_(ls_id), K_(tablet_id), K(tmp_tablet_handle.get_obj()), + "inc_sstable_cnt", table_store_wrapper.get_member()->get_minor_sstables().count()); + ObPartitionMergePolicy::diagnose_table_count_unsafe(MINI_MERGE, *tmp_tablet_handle.get_obj()); + } } } - - int tmp_ret = OB_SUCCESS; - if (OB_SUCC(ret) - && typeid(*this) != typeid(ObTxTableMergeDag) - && OB_UNLIKELY(OB_SUCCESS != (tmp_ret = ctx_->init_merge_progress(param_.is_tenant_major_merge_)))) { - LOG_WARN("failed to init merge progress", K(tmp_ret), K_(param)); - } - return ret; } @@ -346,7 +357,7 @@ int ObBasicTabletMergeDag::inner_init(const ObTabletMergeDagParam ¶m) if (is_inited_) { ret = OB_INIT_TWICE; LOG_WARN("cannot init twice", K(ret), K(param)); - } else if (!param.is_valid()) { + } else if (OB_UNLIKELY(!param.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", K(ret), K(param)); } else { @@ -355,8 +366,6 @@ int ObBasicTabletMergeDag::inner_init(const ObTabletMergeDagParam ¶m) ls_id_ = param.ls_id_; tablet_id_ = param.tablet_id_; if (param.for_diagnose_) { - } else if (OB_FAIL(alloc_merge_ctx())) { - LOG_WARN("failed to alloc merge ctx", K(ret)); } else if (OB_FAIL(get_tablet_and_compat_mode())) { LOG_WARN("failed to get tablet and compat mode", K(ret)); } @@ -929,7 +938,9 @@ int ObTabletMergePrepareTask::process() if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); - } else if (OB_ISNULL(ctx = &merge_dag_->get_ctx())) { + } else if (OB_FAIL(merge_dag_->prepare_merge_ctx())) { + LOG_WARN("failed to alloc merge ctx", K(ret)); + } else if (OB_ISNULL(ctx = merge_dag_->get_ctx())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ctx is unexpected null", K(ret), KP(ctx), KPC(merge_dag_)); } else if (OB_UNLIKELY(is_major_merge_type(ctx->param_.merge_type_) @@ -1033,26 +1044,29 @@ int ObTabletMergePrepareTask::build_merge_ctx(bool &skip_rest_operation) { int ret = OB_SUCCESS; skip_rest_operation = false; - ObTabletMergeCtx &ctx = merge_dag_->get_ctx(); - const common::ObTabletID &tablet_id = ctx.param_.tablet_id_; + ObTabletMergeCtx *ctx = nullptr; + const common::ObTabletID &tablet_id = ctx->param_.tablet_id_; // only ctx.param_ is inited, fill other fields here - if (OB_UNLIKELY(!ctx.param_.is_valid())) { + if (OB_ISNULL(ctx = merge_dag_->get_ctx())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ctx is unexpected null", K(ret), KP(ctx), KPC(merge_dag_)); + } else if (OB_UNLIKELY(!ctx->param_.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(ctx)); - } else if (OB_FAIL(inner_init_ctx(ctx, skip_rest_operation))) { - LOG_WARN("fail to inner init ctx", K(ret), K(tablet_id), K(ctx)); + } else if (OB_FAIL(inner_init_ctx(*ctx, skip_rest_operation))) { + LOG_WARN("fail to inner init ctx", K(ret), "tablet_id", ctx->param_.tablet_id_, KPC(ctx)); } if (OB_FAIL(ret) || skip_rest_operation) { - } else if (FALSE_IT(ctx.merge_scn_ = ctx.scn_range_.end_scn_)) { - } else if (OB_FAIL(ctx.init_merge_info())) { - LOG_WARN("fail to init merge info", K(ret), K(tablet_id), K(ctx)); - } else if (OB_FAIL(ctx.prepare_index_tree())) { + } else if (FALSE_IT(ctx->merge_scn_ = ctx->scn_range_.end_scn_)) { + } else if (OB_FAIL(ctx->init_merge_info())) { + LOG_WARN("fail to init merge info", K(ret), "tablet_id", ctx->param_.tablet_id_, KPC(ctx)); + } else if (OB_FAIL(ctx->prepare_index_tree())) { LOG_WARN("fail to prepare sstable index tree", K(ret), K(ctx)); } if (OB_SUCC(ret)) { - FLOG_INFO("succeed to build merge ctx", K(tablet_id), K(ctx), K(skip_rest_operation)); + FLOG_INFO("succeed to build merge ctx", "tablet_id", ctx->param_.tablet_id_, KPC(ctx), K(skip_rest_operation)); } return ret; @@ -1118,9 +1132,9 @@ int ObTabletMergeFinishTask::init() LOG_ERROR("dag type not match", K(ret), KPC(dag_)); } else { merge_dag_ = static_cast(dag_); - if (OB_UNLIKELY(!merge_dag_->get_ctx().is_valid())) { + if (OB_UNLIKELY(nullptr == merge_dag_->get_ctx() || !merge_dag_->get_ctx()->is_valid())) { ret = OB_ERR_SYS; - LOG_WARN("ctx not valid", K(ret), K(merge_dag_->get_ctx())); + LOG_WARN("ctx not valid", K(ret), KPC(merge_dag_->get_ctx())); } else { is_inited_ = true; } @@ -1132,13 +1146,13 @@ int ObTabletMergeFinishTask::init() int ObTabletMergeFinishTask::create_sstable_after_merge() { int ret = OB_SUCCESS; - ObTabletMergeCtx &ctx = merge_dag_->get_ctx(); - if (ctx.merged_sstable_.is_valid()) { - if (OB_UNLIKELY(!is_major_merge_type(ctx.param_.merge_type_))) { + ObTabletMergeCtx *ctx = merge_dag_->get_ctx(); + if (ctx->merged_sstable_.is_valid()) { + if (OB_UNLIKELY(!is_major_merge_type(ctx->param_.merge_type_))) { ret = OB_ERR_SYS; - LOG_ERROR("Unxpected valid merged table handle with other merge", K(ret), K(ctx)); + LOG_ERROR("Unxpected valid merged table handle with other merge", K(ret), KPC(ctx)); } - } else if (OB_FAIL(get_merged_sstable(ctx))) { + } else if (OB_FAIL(get_merged_sstable(*ctx))) { LOG_WARN("failed to finish_merge_sstable", K(ret)); } return ret; @@ -1149,47 +1163,49 @@ int ObTabletMergeFinishTask::process() int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; ObTaskController::get().switch_task(share::ObTaskType::DATA_MAINTAIN); - + ObTabletMergeCtx *ctx = nullptr; DEBUG_SYNC(MERGE_PARTITION_FINISH_TASK); if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited yet", K(ret)); + } else if (OB_ISNULL(ctx = merge_dag_->get_ctx())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ctx is unexpected null", K(ret), KP(ctx), KPC(merge_dag_)); } else { - ObTabletMergeCtx &ctx = merge_dag_->get_ctx(); - ObLSID &ls_id = ctx.param_.ls_id_; - ObTabletID &tablet_id = ctx.param_.tablet_id_; + ObLSID &ls_id = ctx->param_.ls_id_; + ObTabletID &tablet_id = ctx->param_.tablet_id_; - ctx.time_guard_.click(ObCompactionTimeGuard::EXECUTE); + ctx->time_guard_.click(ObCompactionTimeGuard::EXECUTE); if (OB_FAIL(create_sstable_after_merge())) { LOG_WARN("failed to create sstable after merge", K(ret), K(tablet_id)); - } else if (FALSE_IT(ctx.time_guard_.click(ObCompactionTimeGuard::CREATE_SSTABLE))) { - } else if (OB_FAIL(add_sstable_for_merge(ctx))) { + } else if (FALSE_IT(ctx->time_guard_.click(ObCompactionTimeGuard::CREATE_SSTABLE))) { + } else if (OB_FAIL(add_sstable_for_merge(*ctx))) { LOG_WARN("failed to add sstable for merge", K(ret)); } - if (OB_SUCC(ret) && is_major_merge_type(ctx.param_.merge_type_) && NULL != ctx.param_.report_) { + if (OB_SUCC(ret) && is_major_merge_type(ctx->param_.merge_type_) && NULL != ctx->param_.report_) { int tmp_ret = OB_SUCCESS; - if (OB_TMP_FAIL(ctx.param_.report_->submit_tablet_update_task(MTL_ID(), ctx.param_.ls_id_, tablet_id))) { - LOG_WARN("failed to submit tablet update task to report", K(tmp_ret), K(MTL_ID()), K(ctx.param_.ls_id_), K(tablet_id)); - } else if (OB_TMP_FAIL(ctx.ls_handle_.get_ls()->get_tablet_svr()->update_tablet_report_status(tablet_id))) { + if (OB_TMP_FAIL(ctx->param_.report_->submit_tablet_update_task(MTL_ID(), ctx->param_.ls_id_, tablet_id))) { + LOG_WARN("failed to submit tablet update task to report", K(tmp_ret), K(MTL_ID()), K(ctx->param_.ls_id_), K(tablet_id)); + } else if (OB_TMP_FAIL(ctx->ls_handle_.get_ls()->get_tablet_svr()->update_tablet_report_status(tablet_id))) { LOG_WARN("failed to update tablet report status", K(tmp_ret), K(tablet_id)); } } - if (OB_SUCC(ret) && OB_NOT_NULL(ctx.merge_progress_)) { - if (OB_TMP_FAIL(ctx.merge_progress_->update_merge_info(ctx.merge_info_.get_sstable_merge_info()))) { + if (OB_SUCC(ret) && OB_NOT_NULL(ctx->merge_progress_)) { + if (OB_TMP_FAIL(ctx->merge_progress_->update_merge_info(ctx->merge_info_.get_sstable_merge_info()))) { STORAGE_LOG(WARN, "fail to update update merge info", K(tmp_ret)); } if (OB_TMP_FAIL(compaction::ObCompactionSuggestionMgr::get_instance().analyze_merge_info( - ctx.merge_info_, - *ctx.merge_progress_))) { + ctx->merge_info_, + *ctx->merge_progress_))) { STORAGE_LOG(WARN, "fail to analyze merge info", K(tmp_ret)); } ObSSTableMetaHandle sst_meta_hdl; - if (OB_TMP_FAIL(ctx.merged_sstable_.get_meta(sst_meta_hdl))) { + if (OB_TMP_FAIL(ctx->merged_sstable_.get_meta(sst_meta_hdl))) { STORAGE_LOG(WARN, "fail to get sstable meta handle", K(tmp_ret)); - } else if (OB_TMP_FAIL(ctx.merge_progress_->finish_merge_progress( + } else if (OB_TMP_FAIL(ctx->merge_progress_->finish_merge_progress( sst_meta_hdl.get_sstable_meta().get_total_macro_block_count()))) { STORAGE_LOG(WARN, "fail to update final merge progress", K(tmp_ret)); } @@ -1198,15 +1214,14 @@ int ObTabletMergeFinishTask::process() if (NULL != merge_dag_) { - ObTabletMergeCtx &ctx = merge_dag_->get_ctx(); if (OB_FAIL(ret)) { - FLOG_WARN("sstable merge finish", K(ret), K(ctx), "task", *(static_cast(this))); + FLOG_WARN("sstable merge finish", K(ret), KPC(ctx), "task", *(static_cast(this))); } else { - ctx.time_guard_.click(ObCompactionTimeGuard::DAG_FINISH); - (void)ctx.collect_running_info(); + ctx->time_guard_.click(ObCompactionTimeGuard::DAG_FINISH); + (void)ctx->collect_running_info(); // ATTENTION! Critical diagnostic log, DO NOT CHANGE!!! - FLOG_INFO("sstable merge finish", K(ret), "merge_info", ctx.get_merge_info(), - K(ctx.merged_sstable_), "compat_mode", merge_dag_->get_compat_mode(), K(ctx.time_guard_)); + FLOG_INFO("sstable merge finish", K(ret), "merge_info", ctx->get_merge_info(), + K(ctx->merged_sstable_), "compat_mode", merge_dag_->get_compat_mode(), K(ctx->time_guard_)); } } diff --git a/src/storage/compaction/ob_tablet_merge_task.h b/src/storage/compaction/ob_tablet_merge_task.h index 895dbaa2b9..e65fea389e 100644 --- a/src/storage/compaction/ob_tablet_merge_task.h +++ b/src/storage/compaction/ob_tablet_merge_task.h @@ -204,7 +204,7 @@ class ObBasicTabletMergeDag: public share::ObIDag, public ObMergeDagHash public: ObBasicTabletMergeDag(const share::ObDagType::ObDagTypeEnum type); virtual ~ObBasicTabletMergeDag(); - ObTabletMergeCtx &get_ctx() { return *ctx_; } + ObTabletMergeCtx *get_ctx() { return ctx_; } ObTabletMergeDagParam &get_param() { return param_; } virtual const share::ObLSID & get_ls_id() const { return param_.ls_id_; } virtual bool operator == (const ObIDag &other) const override; @@ -220,6 +220,7 @@ public: || OB_TABLET_NOT_EXIST == dag_ret_ || OB_CANCELED == dag_ret_; } + int prepare_merge_ctx(); int get_tablet_and_compat_mode(); virtual int64_t to_string(char* buf, const int64_t buf_len) const override; virtual lib::Worker::CompatMode get_compat_mode() const override { return compat_mode_; } @@ -229,7 +230,6 @@ public: ObBasicTabletMergeDag &merge_dag, ObTabletMergeCtx &ctx, share::ObITask *prepare_task = nullptr); - protected: int alloc_merge_ctx(); int inner_init(const ObTabletMergeDagParam ¶m); diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp index f326316842..870b6edfb4 100755 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.cpp +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.cpp @@ -708,6 +708,8 @@ int ObProhibitScheduleMediumMap::add_flag(const ObLSID &ls_id, const ProhibitFla if (OB_HASH_NOT_EXIST == ret) { if (OB_FAIL(ls_id_map_.set_refactored(ls_id, input_flag))) { LOG_WARN("failed to stop ls schedule medium", K(ret), K(ls_id), K(input_flag)); + } else if (TRANSFER == input_flag) { + ++transfer_flag_cnt_; } } else { LOG_WARN("failed to get map", K(ret), K(ls_id), K(tmp_flag)); @@ -736,6 +738,8 @@ int ObProhibitScheduleMediumMap::clear_flag(const ObLSID &ls_id, const ProhibitF LOG_TRACE("flag in conflict", K(ret), K(ls_id), K(tmp_flag), K(input_flag)); } else if (OB_FAIL(ls_id_map_.erase_refactored(ls_id))) { LOG_WARN("failed to resume ls schedule medium", K(ret), K(ls_id), K(tmp_flag)); + } else if (TRANSFER == input_flag) { + --transfer_flag_cnt_; } } return ret; @@ -776,10 +780,10 @@ int64_t ObProhibitScheduleMediumMap::to_string(char *buf, const int64_t buf_len) return pos; } -int64_t ObProhibitScheduleMediumMap::get_cnt() const +int64_t ObProhibitScheduleMediumMap::get_transfer_flag_cnt() const { obsys::ObRLockGuard lock_guard(lock_); - return ls_id_map_.size(); + return transfer_flag_cnt_; } int ObTenantTabletScheduler::stop_ls_schedule_medium(const ObLSID &ls_id) @@ -1559,7 +1563,7 @@ int ObTenantTabletScheduler::schedule_all_tablets_medium() "cost_time", current_time - schedule_stats_.start_timestamp_); } - if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL) && prohibit_medium_map_.get_cnt() > 0) { + if (REACH_TENANT_TIME_INTERVAL(PRINT_LOG_INVERVAL) && prohibit_medium_map_.get_transfer_flag_cnt() > 0) { LOG_INFO("tenant is blocking schedule medium", KR(ret), K_(prohibit_medium_map)); } diff --git a/src/storage/compaction/ob_tenant_tablet_scheduler.h b/src/storage/compaction/ob_tenant_tablet_scheduler.h index e4784dc79d..44cd0cc60b 100755 --- a/src/storage/compaction/ob_tenant_tablet_scheduler.h +++ b/src/storage/compaction/ob_tenant_tablet_scheduler.h @@ -153,8 +153,9 @@ public: int clear_flag(const share::ObLSID &ls_id, const ProhibitFlag &input_flag); int add_flag(const share::ObLSID &ls_id, const ProhibitFlag &input_flag); int64_t to_string(char *buf, const int64_t buf_len) const; - int64_t get_cnt() const; + int64_t get_transfer_flag_cnt() const; private: + int64_t transfer_flag_cnt_; mutable obsys::ObRWLock lock_; common::hash::ObHashMap ls_id_map_; }; diff --git a/src/storage/multi_data_source/ob_mds_table_merge_task.cpp b/src/storage/multi_data_source/ob_mds_table_merge_task.cpp index c9148b9228..33801c5b45 100644 --- a/src/storage/multi_data_source/ob_mds_table_merge_task.cpp +++ b/src/storage/multi_data_source/ob_mds_table_merge_task.cpp @@ -72,35 +72,39 @@ int ObMdsTableMergeTask::process() int ret = OB_SUCCESS; DEBUG_SYNC(AFTER_EMPTY_SHELL_TABLET_CREATE); - + ObTabletMergeCtx *ctx = nullptr; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret), K_(is_inited)); + } else if (OB_FAIL(mds_merge_dag_->prepare_merge_ctx())) { + LOG_WARN("failed to alloc merge ctx", K(ret)); + } else if (OB_ISNULL(ctx = mds_merge_dag_->get_ctx())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ctx is unexpected null", K(ret), KP(ctx), KPC(mds_merge_dag_)); } else { int tmp_ret = OB_SUCCESS; ObLS *ls = nullptr; ObTablet *tablet = nullptr; - ObTabletMergeCtx &ctx = mds_merge_dag_->get_ctx(); - const share::ObLSID &ls_id = ctx.param_.ls_id_; - const common::ObTabletID &tablet_id = ctx.param_.tablet_id_; + const share::ObLSID &ls_id = ctx->param_.ls_id_; + const common::ObTabletID &tablet_id = ctx->param_.tablet_id_; const share::SCN &flush_scn = mds_merge_dag_->get_flush_scn(); int64_t ls_rebuild_seq = -1; - if (OB_ISNULL(ls = ctx.ls_handle_.get_ls())) { + if (OB_ISNULL(ls = ctx->ls_handle_.get_ls())) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("ls is null", K(ret), K(ls_id), "ls_handle", ctx.ls_handle_); + LOG_WARN("ls is null", K(ret), K(ls_id), "ls_handle", ctx->ls_handle_); } else if (ls->is_offline()) { ret = OB_CANCELED; LOG_INFO("ls offline, skip merge", K(ret), K(ctx)); - } else if (OB_FAIL(ctx.get_merge_info().init(ctx, flush_scn))) { + } else if (OB_FAIL(ctx->get_merge_info().init(*ctx, flush_scn))) { LOG_WARN("failed to init merge info", K(ret), K(ls_id), K(tablet_id), K(flush_scn), K(ctx)); - } else if (FALSE_IT(ctx.time_guard_.click(ObCompactionTimeGuard::DAG_WAIT_TO_SCHEDULE))) { - } else if (MDS_FAIL(ls->get_tablet(tablet_id, ctx.tablet_handle_, 0, ObMDSGetTabletMode::READ_WITHOUT_CHECK))) { + } else if (FALSE_IT(ctx->time_guard_.click(ObCompactionTimeGuard::DAG_WAIT_TO_SCHEDULE))) { + } else if (MDS_FAIL(ls->get_tablet(tablet_id, ctx->tablet_handle_, 0, ObMDSGetTabletMode::READ_WITHOUT_CHECK))) { LOG_WARN("failed to get tablet", K(ret), K(ls_id), K(tablet_id)); - } else if (OB_ISNULL(tablet = ctx.tablet_handle_.get_obj())) { + } else if (OB_ISNULL(tablet = ctx->tablet_handle_.get_obj())) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("tablet is null", K(ret), K(ls_id), K(tablet_id), "tablet_handle", ctx.tablet_handle_); - } else if (FALSE_IT(ctx.time_guard_.click(ObCompactionTimeGuard::GET_TABLET))) { + LOG_WARN("tablet is null", K(ret), K(ls_id), K(tablet_id), "tablet_handle", ctx->tablet_handle_); + } else if (FALSE_IT(ctx->time_guard_.click(ObCompactionTimeGuard::GET_TABLET))) { } else if (FALSE_IT(ls_rebuild_seq = ls->get_rebuild_seq())) { } else if (MDS_FAIL(ls->build_new_tablet_from_mds_table(ls_rebuild_seq, tablet_id, @@ -108,7 +112,7 @@ int ObMdsTableMergeTask::process() flush_scn))) { LOG_WARN("failed to build new tablet from mds table", K(ret), K(ls_id), K(tablet_id), K(ls_rebuild_seq), K(flush_scn)); } else { - ctx.time_guard_.click(ObCompactionTimeGuard::EXECUTE); + ctx->time_guard_.click(ObCompactionTimeGuard::EXECUTE); share::dag_yield(); } @@ -120,9 +124,9 @@ int ObMdsTableMergeTask::process() ret = tmp_ret; } } - ctx.time_guard_.click(ObCompactionTimeGuard::DAG_FINISH); - set_merge_finish_time(ctx); - (void)ctx.collect_running_info(); + ctx->time_guard_.click(ObCompactionTimeGuard::DAG_FINISH); + set_merge_finish_time(*ctx); + (void)ctx->collect_running_info(); } return ret;