[CP] fix schedule minor dag failed

This commit is contained in:
yangqise7en 2024-10-18 05:43:48 +00:00 committed by ob-robot
parent 80f9aee8fc
commit a6fd265a51
5 changed files with 36 additions and 27 deletions

View File

@ -420,7 +420,7 @@ GLOBAL_ERRSIM_POINT_DEF(717, EN_RS_CANT_GET_ALL_TABLET_CHECKSUM, "");
GLOBAL_ERRSIM_POINT_DEF(718, EN_SWAP_TABLET_IN_COMPACTION, "");
GLOBAL_ERRSIM_POINT_DEF(719, EN_COMPACTION_CO_MERGE_PREPARE_CTX_FAILED, "");
GLOBAL_ERRSIM_POINT_DEF(720, EN_COMPACTION_CO_MERGE_PREPARE_FAILED, "");
GLOBAL_ERRSIM_POINT_DEF(721, EN_COMPACTION_CO_MERGE_PREPARE_MINOR_FAILED, "");
GLOBAL_ERRSIM_POINT_DEF(721, EN_COMPACTION_SCHEDULE_MINOR_FAIL, "");
GLOBAL_ERRSIM_POINT_DEF(722, EN_COMPACTION_CO_MERGE_FINISH_FAILED, "");
GLOBAL_ERRSIM_POINT_DEF(723, EN_COMPACTION_ITER_TABLET_NOT_EXIST, "");
GLOBAL_ERRSIM_POINT_DEF(724, EN_COMPACTION_ITER_LS_NOT_EXIST, "");

View File

@ -89,7 +89,7 @@ int ObCOTabletMergeCtx::schedule_minor_errsim(bool &schedule_minor) const
} while(0);
SCHEDULE_MINOR_ERRSIM(EN_SWAP_TABLET_IN_COMPACTION);
SCHEDULE_MINOR_ERRSIM(EN_COMPACTION_CO_MERGE_PREPARE_MINOR_FAILED);
SCHEDULE_MINOR_ERRSIM(EN_COMPACTION_SCHEDULE_MINOR_FAIL);
SCHEDULE_MINOR_ERRSIM(EN_COMPACTION_CO_MERGE_SCHEDULE_FAILED);
#endif
return ret;

View File

@ -36,7 +36,6 @@ using namespace storage;
namespace compaction
{
ERRSIM_POINT_DEF(EN_COMPACTION_ADD_CO_MREGE_FINISH_DAG_INTO_DAG_NET_FAILED);
ObCOMergeDagParam::ObCOMergeDagParam()
: ObTabletMergeDagParam(),
start_cg_idx_(0),
@ -187,6 +186,8 @@ int ObCOMergePrepareTask::create_schedule_dag(ObCOTabletMergeCtx &ctx)
if (is_convert_co_major_merge(ctx.get_merge_type())) {
// convert co major merge only rely on major sstable
} else if (!MTL(ObTenantTabletScheduler *)->enable_adaptive_merge_schedule()) {
// don't schedule minor dag if enable_adaptive_merge_schedule=false
} else if (OB_FAIL(ctx.check_need_schedule_minor(schedule_minor))) {
LOG_WARN("failed to check need chedule minor", K(ret), K(schedule_minor));
} else if (schedule_minor) {
@ -267,6 +268,11 @@ int ObCOMergePrepareTask::schedule_minor_exec_dag(
// will add ObCOMergeScheduleDag into scheduler, but have minor_exe_dag as parent
// alloc schedule_dag will be destroy in create_dag()
LOG_WARN("failed to create schedule dag", K(ret));
#ifdef ERRSIM
} else if (OB_FAIL(ret = OB_E(EventTable::EN_COMPACTION_SCHEDULE_MINOR_FAIL) OB_SUCCESS)) {
FLOG_INFO("ERRSIM EN_COMPACTION_SCHEDULE_MINOR_FAIL", KR(ret), K(ctx));
SERVER_EVENT_SYNC_ADD("merge_errsim", "schedule_minor_failure", "tablet_id", ctx.get_tablet_id().id());
#endif
} else if (OB_FAIL(MTL(share::ObTenantDagScheduler *)->add_dag(minor_exe_dag, true/*is_emergency*/))) {
if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) {
LOG_WARN("failed to add dag", K(ret), KPC(minor_exe_dag));
@ -276,19 +282,16 @@ int ObCOMergePrepareTask::schedule_minor_exec_dag(
LOG_INFO("success to add minor dag before schedule dag", K(ret), KP(minor_exe_dag), KP(schedule_dag));
}
if (OB_FAIL(ret) && OB_NOT_NULL(minor_exe_dag)) {
MTL(share::ObTenantDagScheduler *)->free_dag(*minor_exe_dag);
minor_exe_dag = nullptr;
}
#ifdef ERRSIM
if (OB_SUCC(ret)) {
ret = OB_E(EventTable::EN_COMPACTION_CO_MERGE_PREPARE_MINOR_FAILED) OB_SUCCESS;
if (OB_FAIL(ret)) {
SERVER_EVENT_SYNC_ADD("merge_errsim", "prepare_co_minor_failed", "ret_code", ret);
STORAGE_LOG(INFO, "ERRSIM EN_COMPACTION_CO_MERGE_PREPARE_MINOR_FAILED", K(ret));
if (OB_FAIL(ret)) {
if (OB_NOT_NULL(schedule_dag)) {
MTL(share::ObTenantDagScheduler *)->cancel_dag(schedule_dag);
schedule_dag = nullptr;
}
if (OB_NOT_NULL(minor_exe_dag)) {
MTL(share::ObTenantDagScheduler *)->free_dag(*minor_exe_dag);
minor_exe_dag = nullptr;
}
}
#endif
return ret;
}
@ -937,7 +940,7 @@ int ObCOMergeFinishDag::create_first_task()
bool ObCOMergeFinishDag::check_can_schedule()
{
ObCOMergeDagNet *dag_net = static_cast<ObCOMergeDagNet*>(get_dag_net());
return dag_net->check_merge_finished();
return dag_net->is_cancel() || dag_net->check_merge_finished();
}
ObCOMergeFinishTask::ObCOMergeFinishTask()
@ -978,6 +981,9 @@ int ObCOMergeFinishTask::process()
ret = OB_ERR_UNEXPECTED;
LOG_WARN("ctx or dag_net is unexpected null", K(ret), KP(ctx_), KP(dag_net_));
} else if (FALSE_IT(ctx_->time_guard_click(ObStorageCompactionTimeGuard::EXECUTE))) {
} else if (dag_net_->is_cancel()) {
ret = OB_CANCELED;
LOG_INFO("dag net is canceled", K(ret), KP(ctx_), KP(dag_net_));
} else if (FALSE_IT(SET_MEM_CTX(ctx_->mem_ctx_))) {
} else if (FALSE_IT(ctx_->mem_ctx_.mem_click())) {
} else if (OB_FAIL(ctx_->update_tablet_after_merge())) {
@ -1220,9 +1226,14 @@ int ObCOMergeDagNet::inner_schedule_finish_dag(ObIDag *parent_dag)
ATOMIC_SET(&finish_added_, true);
}
} else if (OB_ISNULL(parent_dag) && check_merge_finished()) { // dag net finish when called by schedule_rest_dag
if (OB_FAIL(create_dag<ObCOMergeFinishDag>(0, 0, finish_dag_))) { // already add into scheduler
ObCOMergeFinishDag *tmp_finish_dag = nullptr;
if (OB_FAIL(create_dag<ObCOMergeFinishDag>(0, 0, tmp_finish_dag))) { // already add into scheduler
LOG_WARN("failed to create and add finish dag", K(ret));
} else if (OB_ISNULL(tmp_finish_dag)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tmp_finish_dag is unexpected", KR(ret));
} else {
finish_dag_ = tmp_finish_dag;
ATOMIC_SET(&finish_added_, true);
}
}
@ -1284,9 +1295,15 @@ int ObCOMergeDagNet::inner_create_and_schedule_dags(ObIDag *parent_dag)
LOG_INFO("ERRSIM EN_COMPACTION_ADD_CO_MREGE_FINISH_DAG_INTO_DAG_NET_FAILED", K(ret), KPC(parent_dag));
}
#endif
ObCOMergeFinishDag *tmp_finish_dag = nullptr;
// add into dag_scheduler after parent-child relation generated
if (FAILEDx(create_dag<ObCOMergeFinishDag>(0, 0, finish_dag_, parent_dag/*parent*/, false/*add_scheduler_flag*/))) {
LOG_WARN("failed to create finish dag", K(ret), K_(finish_dag));
if (FAILEDx(create_dag<ObCOMergeFinishDag>(0, 0, tmp_finish_dag, parent_dag/*parent*/, false/*add_scheduler_flag*/))) {
LOG_WARN("failed to create finish dag", K(ret), K(tmp_finish_dag));
} else if (OB_ISNULL(tmp_finish_dag)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tmp finish dag is unexpected null", KR(ret));
} else {
finish_dag_ = tmp_finish_dag;
}
}
// refine merge_batch_size_ with tenant memory

View File

@ -275,14 +275,6 @@ public:
{
return ATOMIC_LOAD(&finish_added_);
}
virtual int deal_with_cancel() override
{
if (!inner_check_finished() && OB_NOT_NULL(finish_dag_)) {
(void)MTL(share::ObTenantDagScheduler*)->free_dag(*finish_dag_);
finish_dag_ = nullptr;
}
return OB_SUCCESS;
}
void cancel_dag_net(const int error_code);
int create_co_execute_dags(share::ObIDag &schedule_dag);
bool check_merge_finished();

View File

@ -1627,7 +1627,7 @@ int ObStorageSchema::generate_column_array(const ObTableSchema &input_schema)
} else {
col_schema.default_checksum_ = datum.checksum(0);
}
if (OB_FAIL(col_schema.deep_copy_default_val(*allocator_, col->get_orig_default_value()))) {
if (FAILEDx(col_schema.deep_copy_default_val(*allocator_, col->get_orig_default_value()))) {
STORAGE_LOG(WARN, "failed to deep copy", K(ret), K(col->get_orig_default_value()));
} else if (OB_FAIL(column_array_.push_back(col_schema))) {
STORAGE_LOG(WARN, "Fail to push into column array", K(ret), K(col_schema));