diff --git a/deps/oblib/src/lib/utility/ob_tracepoint_def.h b/deps/oblib/src/lib/utility/ob_tracepoint_def.h index 24aeaf832..7d6a4f965 100644 --- a/deps/oblib/src/lib/utility/ob_tracepoint_def.h +++ b/deps/oblib/src/lib/utility/ob_tracepoint_def.h @@ -420,7 +420,7 @@ GLOBAL_ERRSIM_POINT_DEF(717, EN_RS_CANT_GET_ALL_TABLET_CHECKSUM, ""); GLOBAL_ERRSIM_POINT_DEF(718, EN_SWAP_TABLET_IN_COMPACTION, ""); GLOBAL_ERRSIM_POINT_DEF(719, EN_COMPACTION_CO_MERGE_PREPARE_CTX_FAILED, ""); GLOBAL_ERRSIM_POINT_DEF(720, EN_COMPACTION_CO_MERGE_PREPARE_FAILED, ""); -GLOBAL_ERRSIM_POINT_DEF(721, EN_COMPACTION_CO_MERGE_PREPARE_MINOR_FAILED, ""); +GLOBAL_ERRSIM_POINT_DEF(721, EN_COMPACTION_SCHEDULE_MINOR_FAIL, ""); GLOBAL_ERRSIM_POINT_DEF(722, EN_COMPACTION_CO_MERGE_FINISH_FAILED, ""); GLOBAL_ERRSIM_POINT_DEF(723, EN_COMPACTION_ITER_TABLET_NOT_EXIST, ""); GLOBAL_ERRSIM_POINT_DEF(724, EN_COMPACTION_ITER_LS_NOT_EXIST, ""); diff --git a/src/storage/column_store/ob_co_merge_ctx.cpp b/src/storage/column_store/ob_co_merge_ctx.cpp index 18e0087d7..f9e14df5f 100644 --- a/src/storage/column_store/ob_co_merge_ctx.cpp +++ b/src/storage/column_store/ob_co_merge_ctx.cpp @@ -89,7 +89,7 @@ int ObCOTabletMergeCtx::schedule_minor_errsim(bool &schedule_minor) const } while(0); SCHEDULE_MINOR_ERRSIM(EN_SWAP_TABLET_IN_COMPACTION); - SCHEDULE_MINOR_ERRSIM(EN_COMPACTION_CO_MERGE_PREPARE_MINOR_FAILED); + SCHEDULE_MINOR_ERRSIM(EN_COMPACTION_SCHEDULE_MINOR_FAIL); SCHEDULE_MINOR_ERRSIM(EN_COMPACTION_CO_MERGE_SCHEDULE_FAILED); #endif return ret; diff --git a/src/storage/column_store/ob_co_merge_dag.cpp b/src/storage/column_store/ob_co_merge_dag.cpp index 7ccea3b42..dade6b256 100644 --- a/src/storage/column_store/ob_co_merge_dag.cpp +++ b/src/storage/column_store/ob_co_merge_dag.cpp @@ -36,7 +36,6 @@ using namespace storage; namespace compaction { ERRSIM_POINT_DEF(EN_COMPACTION_ADD_CO_MREGE_FINISH_DAG_INTO_DAG_NET_FAILED); - ObCOMergeDagParam::ObCOMergeDagParam() : ObTabletMergeDagParam(), start_cg_idx_(0), @@ -187,6 +186,8 @@ int ObCOMergePrepareTask::create_schedule_dag(ObCOTabletMergeCtx &ctx) if (is_convert_co_major_merge(ctx.get_merge_type())) { // convert co major merge only rely on major sstable + } else if (!MTL(ObTenantTabletScheduler *)->enable_adaptive_merge_schedule()) { + // don't schedule minor dag if enable_adaptive_merge_schedule=false } else if (OB_FAIL(ctx.check_need_schedule_minor(schedule_minor))) { LOG_WARN("failed to check need chedule minor", K(ret), K(schedule_minor)); } else if (schedule_minor) { @@ -267,6 +268,11 @@ int ObCOMergePrepareTask::schedule_minor_exec_dag( // will add ObCOMergeScheduleDag into scheduler, but have minor_exe_dag as parent // alloc schedule_dag will be destroy in create_dag() LOG_WARN("failed to create schedule dag", K(ret)); +#ifdef ERRSIM + } else if (OB_FAIL(ret = OB_E(EventTable::EN_COMPACTION_SCHEDULE_MINOR_FAIL) OB_SUCCESS)) { + FLOG_INFO("ERRSIM EN_COMPACTION_SCHEDULE_MINOR_FAIL", KR(ret), K(ctx)); + SERVER_EVENT_SYNC_ADD("merge_errsim", "schedule_minor_failure", "tablet_id", ctx.get_tablet_id().id()); +#endif } else if (OB_FAIL(MTL(share::ObTenantDagScheduler *)->add_dag(minor_exe_dag, true/*is_emergency*/))) { if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) { LOG_WARN("failed to add dag", K(ret), KPC(minor_exe_dag)); @@ -276,19 +282,16 @@ int ObCOMergePrepareTask::schedule_minor_exec_dag( LOG_INFO("success to add minor dag before schedule dag", K(ret), KP(minor_exe_dag), KP(schedule_dag)); } - if (OB_FAIL(ret) && OB_NOT_NULL(minor_exe_dag)) { - MTL(share::ObTenantDagScheduler *)->free_dag(*minor_exe_dag); - minor_exe_dag = nullptr; - } -#ifdef ERRSIM - if (OB_SUCC(ret)) { - ret = OB_E(EventTable::EN_COMPACTION_CO_MERGE_PREPARE_MINOR_FAILED) OB_SUCCESS; - if (OB_FAIL(ret)) { - SERVER_EVENT_SYNC_ADD("merge_errsim", "prepare_co_minor_failed", "ret_code", ret); - STORAGE_LOG(INFO, "ERRSIM EN_COMPACTION_CO_MERGE_PREPARE_MINOR_FAILED", K(ret)); + if (OB_FAIL(ret)) { + if (OB_NOT_NULL(schedule_dag)) { + MTL(share::ObTenantDagScheduler *)->cancel_dag(schedule_dag); + schedule_dag = nullptr; + } + if (OB_NOT_NULL(minor_exe_dag)) { + MTL(share::ObTenantDagScheduler *)->free_dag(*minor_exe_dag); + minor_exe_dag = nullptr; } } -#endif return ret; } @@ -937,7 +940,7 @@ int ObCOMergeFinishDag::create_first_task() bool ObCOMergeFinishDag::check_can_schedule() { ObCOMergeDagNet *dag_net = static_cast(get_dag_net()); - return dag_net->check_merge_finished(); + return dag_net->is_cancel() || dag_net->check_merge_finished(); } ObCOMergeFinishTask::ObCOMergeFinishTask() @@ -978,6 +981,9 @@ int ObCOMergeFinishTask::process() ret = OB_ERR_UNEXPECTED; LOG_WARN("ctx or dag_net is unexpected null", K(ret), KP(ctx_), KP(dag_net_)); } else if (FALSE_IT(ctx_->time_guard_click(ObStorageCompactionTimeGuard::EXECUTE))) { + } else if (dag_net_->is_cancel()) { + ret = OB_CANCELED; + LOG_INFO("dag net is canceled", K(ret), KP(ctx_), KP(dag_net_)); } else if (FALSE_IT(SET_MEM_CTX(ctx_->mem_ctx_))) { } else if (FALSE_IT(ctx_->mem_ctx_.mem_click())) { } else if (OB_FAIL(ctx_->update_tablet_after_merge())) { @@ -1220,9 +1226,14 @@ int ObCOMergeDagNet::inner_schedule_finish_dag(ObIDag *parent_dag) ATOMIC_SET(&finish_added_, true); } } else if (OB_ISNULL(parent_dag) && check_merge_finished()) { // dag net finish when called by schedule_rest_dag - if (OB_FAIL(create_dag(0, 0, finish_dag_))) { // already add into scheduler + ObCOMergeFinishDag *tmp_finish_dag = nullptr; + if (OB_FAIL(create_dag(0, 0, tmp_finish_dag))) { // already add into scheduler LOG_WARN("failed to create and add finish dag", K(ret)); + } else if (OB_ISNULL(tmp_finish_dag)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("tmp_finish_dag is unexpected", KR(ret)); } else { + finish_dag_ = tmp_finish_dag; ATOMIC_SET(&finish_added_, true); } } @@ -1284,9 +1295,15 @@ int ObCOMergeDagNet::inner_create_and_schedule_dags(ObIDag *parent_dag) LOG_INFO("ERRSIM EN_COMPACTION_ADD_CO_MREGE_FINISH_DAG_INTO_DAG_NET_FAILED", K(ret), KPC(parent_dag)); } #endif + ObCOMergeFinishDag *tmp_finish_dag = nullptr; // add into dag_scheduler after parent-child relation generated - if (FAILEDx(create_dag(0, 0, finish_dag_, parent_dag/*parent*/, false/*add_scheduler_flag*/))) { - LOG_WARN("failed to create finish dag", K(ret), K_(finish_dag)); + if (FAILEDx(create_dag(0, 0, tmp_finish_dag, parent_dag/*parent*/, false/*add_scheduler_flag*/))) { + LOG_WARN("failed to create finish dag", K(ret), K(tmp_finish_dag)); + } else if (OB_ISNULL(tmp_finish_dag)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("tmp finish dag is unexpected null", KR(ret)); + } else { + finish_dag_ = tmp_finish_dag; } } // refine merge_batch_size_ with tenant memory diff --git a/src/storage/column_store/ob_co_merge_dag.h b/src/storage/column_store/ob_co_merge_dag.h index 301a73e8e..afc3efb41 100644 --- a/src/storage/column_store/ob_co_merge_dag.h +++ b/src/storage/column_store/ob_co_merge_dag.h @@ -275,14 +275,6 @@ public: { return ATOMIC_LOAD(&finish_added_); } - virtual int deal_with_cancel() override - { - if (!inner_check_finished() && OB_NOT_NULL(finish_dag_)) { - (void)MTL(share::ObTenantDagScheduler*)->free_dag(*finish_dag_); - finish_dag_ = nullptr; - } - return OB_SUCCESS; - } void cancel_dag_net(const int error_code); int create_co_execute_dags(share::ObIDag &schedule_dag); bool check_merge_finished(); diff --git a/src/storage/ob_storage_schema.cpp b/src/storage/ob_storage_schema.cpp index f7422c35f..1d4203597 100644 --- a/src/storage/ob_storage_schema.cpp +++ b/src/storage/ob_storage_schema.cpp @@ -1627,7 +1627,7 @@ int ObStorageSchema::generate_column_array(const ObTableSchema &input_schema) } else { col_schema.default_checksum_ = datum.checksum(0); } - if (OB_FAIL(col_schema.deep_copy_default_val(*allocator_, col->get_orig_default_value()))) { + if (FAILEDx(col_schema.deep_copy_default_val(*allocator_, col->get_orig_default_value()))) { STORAGE_LOG(WARN, "failed to deep copy", K(ret), K(col->get_orig_default_value())); } else if (OB_FAIL(column_array_.push_back(col_schema))) { STORAGE_LOG(WARN, "Fail to push into column array", K(ret), K(col_schema));