[MDS] fix clog may full because MDS dag not scheduled
This commit is contained in:
@ -2106,7 +2106,7 @@ int ObTenantDagScheduler::get_all_compaction_dag_info(
|
|||||||
ObIDag *cur = head->get_next();
|
ObIDag *cur = head->get_next();
|
||||||
while (head != cur && idx < total_dag_cnt && prio_cnt < MAX_SHOW_DAG_CNT_PER_PRIO) {
|
while (head != cur && idx < total_dag_cnt && prio_cnt < MAX_SHOW_DAG_CNT_PER_PRIO) {
|
||||||
if (OB_UNLIKELY(OB_TMP_FAIL(cur->gene_compaction_info(progress[idx])))) {
|
if (OB_UNLIKELY(OB_TMP_FAIL(cur->gene_compaction_info(progress[idx])))) {
|
||||||
if (OB_EAGAIN != tmp_ret) {
|
if (OB_EAGAIN != tmp_ret && OB_NOT_IMPLEMENT != tmp_ret) {
|
||||||
COMMON_LOG(WARN, "failed to generate compaction dag info", K(tmp_ret), KPC(cur));
|
COMMON_LOG(WARN, "failed to generate compaction dag info", K(tmp_ret), KPC(cur));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -2249,7 +2249,11 @@ int ObTenantDagScheduler::diagnose_minor_exe_dag(
|
|||||||
compaction::ObTabletMergeExecuteDag *exe_dag = static_cast<compaction::ObTabletMergeExecuteDag *>(cur);
|
compaction::ObTabletMergeExecuteDag *exe_dag = static_cast<compaction::ObTabletMergeExecuteDag *>(cur);
|
||||||
if (exe_dag->belong_to_same_tablet(merge_dag_info)) {
|
if (exe_dag->belong_to_same_tablet(merge_dag_info)) {
|
||||||
if (OB_FAIL(exe_dag->diagnose_compaction_info(progress))) {
|
if (OB_FAIL(exe_dag->diagnose_compaction_info(progress))) {
|
||||||
LOG_WARN("failed to diagnose compaction dag", K(ret), K(exe_dag));
|
if (OB_NOT_IMPLEMENT != ret) {
|
||||||
|
LOG_WARN("failed to diagnose compaction dag", K(ret), K(exe_dag));
|
||||||
|
} else {
|
||||||
|
ret = OB_SUCCESS;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
find = true;
|
find = true;
|
||||||
break;
|
break;
|
||||||
@ -2323,7 +2327,11 @@ int ObTenantDagScheduler::diagnose_dag(
|
|||||||
ret = OB_ERR_SYS;
|
ret = OB_ERR_SYS;
|
||||||
LOG_WARN("dag is null", K(ret));
|
LOG_WARN("dag is null", K(ret));
|
||||||
} else if (OB_FAIL(stored_dag->diagnose_compaction_info(progress))) {
|
} else if (OB_FAIL(stored_dag->diagnose_compaction_info(progress))) {
|
||||||
LOG_WARN("failed to generate compaction info", K(ret));
|
if (OB_NOT_IMPLEMENT != ret) {
|
||||||
|
LOG_WARN("failed to generate compaction info", K(ret));
|
||||||
|
} else {
|
||||||
|
ret = OB_SUCCESS;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
@ -43,7 +43,7 @@ DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_MERGE_EXECUTE, ObDagPrio::DAG_PRIO_COMPACTIO
|
|||||||
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_MAJOR_MERGE, ObDagPrio::DAG_PRIO_COMPACTION_LOW, ObSysTaskType::SSTABLE_MAJOR_MERGE_TASK, "MAJOR_MERGE", "COMPACTION")
|
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_MAJOR_MERGE, ObDagPrio::DAG_PRIO_COMPACTION_LOW, ObSysTaskType::SSTABLE_MAJOR_MERGE_TASK, "MAJOR_MERGE", "COMPACTION")
|
||||||
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_TX_TABLE_MERGE, ObDagPrio::DAG_PRIO_COMPACTION_HIGH, ObSysTaskType::SPECIAL_TABLE_MERGE_TASK, "TX_TABLE_MERGE", "COMPACTION")
|
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_TX_TABLE_MERGE, ObDagPrio::DAG_PRIO_COMPACTION_HIGH, ObSysTaskType::SPECIAL_TABLE_MERGE_TASK, "TX_TABLE_MERGE", "COMPACTION")
|
||||||
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_WRITE_CKPT, ObDagPrio::DAG_PRIO_COMPACTION_LOW, ObSysTaskType::WRITE_CKPT_TASK, "WRITE_CKPT", "COMPACTION")
|
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_WRITE_CKPT, ObDagPrio::DAG_PRIO_COMPACTION_LOW, ObSysTaskType::WRITE_CKPT_TASK, "WRITE_CKPT", "COMPACTION")
|
||||||
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_MDS_TABLE_MERGE, ObDagPrio::DAG_PRIO_COMPACTION_MID, ObSysTaskType::MDS_TABLE_MERGE_TASK, "MDS_TABLE_MERGE", "COMPACTION")
|
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_MDS_TABLE_MERGE, ObDagPrio::DAG_PRIO_COMPACTION_HIGH, ObSysTaskType::MDS_TABLE_MERGE_TASK, "MDS_TABLE_MERGE", "COMPACTION")
|
||||||
|
|
||||||
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_DDL, ObDagPrio::DAG_PRIO_DDL, ObSysTaskType::DDL_TASK, "DDL", "DDL")
|
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_DDL, ObDagPrio::DAG_PRIO_DDL, ObSysTaskType::DDL_TASK, "DDL", "DDL")
|
||||||
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_UNIQUE_CHECKING, ObDagPrio::DAG_PRIO_DDL, ObSysTaskType::DDL_TASK, "UNIQUE_CHECK", "DDL")
|
DAG_SCHEDULER_DAG_TYPE_DEF(DAG_TYPE_UNIQUE_CHECKING, ObDagPrio::DAG_PRIO_DDL, ObSysTaskType::DDL_TASK, "UNIQUE_CHECK", "DDL")
|
||||||
|
|||||||
@ -152,6 +152,7 @@ int MdsTableBase::merge(const share::SCN &flushing_scn)
|
|||||||
param.ls_id_ = ls_id_;
|
param.ls_id_ = ls_id_;
|
||||||
param.tablet_id_ = tablet_id_;
|
param.tablet_id_ = tablet_id_;
|
||||||
param.flush_scn_ = flushing_scn;
|
param.flush_scn_ = flushing_scn;
|
||||||
|
param.generate_ts_ = ObClockGenerator::getCurrentTime();
|
||||||
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_mds_table_merge_dag(param))) {
|
if (OB_FAIL(compaction::ObScheduleDagFunc::schedule_mds_table_merge_dag(param))) {
|
||||||
if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) {
|
if (OB_EAGAIN != ret && OB_SIZE_OVERFLOW != ret) {
|
||||||
MDS_LOG(WARN, "failed to schedule mds table merge dag", K(ret), K(param));
|
MDS_LOG(WARN, "failed to schedule mds table merge dag", K(ret), K(param));
|
||||||
|
|||||||
@ -969,20 +969,24 @@ int MdsTableImpl<MdsTableType>::flush(share::SCN recycle_scn, bool need_freeze)
|
|||||||
if (OB_SUCC(ret) &&
|
if (OB_SUCC(ret) &&
|
||||||
(flushing_scn_.is_valid() || // need_freeze is false, not calculate temp_flushing_scn, but need generate dag
|
(flushing_scn_.is_valid() || // need_freeze is false, not calculate temp_flushing_scn, but need generate dag
|
||||||
temp_flushing_scn.is_valid())) {// need_freeze is true, calculated a temp flushing scn
|
temp_flushing_scn.is_valid())) {// need_freeze is true, calculated a temp flushing scn
|
||||||
|
if (flushing_scn_.is_valid() && temp_flushing_scn.is_valid()) {// can not both be valid
|
||||||
|
MDS_LOG_FLUSH(ERROR, "both flushing_scn_ and temp_flushing_scn is valid");
|
||||||
|
} else {
|
||||||
// if we get a valid flushing_scn, schedule mini merge
|
// if we get a valid flushing_scn, schedule mini merge
|
||||||
#ifndef UNITTEST_DEBUG
|
#ifndef UNITTEST_DEBUG
|
||||||
share::SCN do_merge_scn = temp_flushing_scn.is_valid() ? temp_flushing_scn : flushing_scn_;
|
share::SCN do_merge_scn = temp_flushing_scn.is_valid() ? temp_flushing_scn : flushing_scn_;
|
||||||
if (MDS_FAIL(merge(do_merge_scn))) {
|
if (MDS_FAIL(merge(do_merge_scn))) {
|
||||||
MDS_LOG_FLUSH(WARN, "failed to merge mds table");
|
MDS_LOG_FLUSH(WARN, "failed to merge mds table");
|
||||||
} else {
|
} else {
|
||||||
if (temp_flushing_scn.is_valid()) {
|
if (temp_flushing_scn.is_valid()) {
|
||||||
flushing_scn_ = temp_flushing_scn;// if and only if calculated valid temp_flushing_scn(need_freeze is true) and generate dag success, set flushing_scn
|
flushing_scn_ = temp_flushing_scn;// if and only if calculated valid temp_flushing_scn(need_freeze is true) and generate dag success, set flushing_scn
|
||||||
|
}
|
||||||
|
report_flush_event_("DO_FLUSH", flushing_scn_, need_freeze);
|
||||||
}
|
}
|
||||||
report_flush_event_("DO_FLUSH", flushing_scn_, need_freeze);
|
|
||||||
}
|
|
||||||
#else
|
#else
|
||||||
flushing_scn_ = temp_flushing_scn;
|
flushing_scn_ = temp_flushing_scn;
|
||||||
#endif
|
#endif
|
||||||
|
}
|
||||||
}
|
}
|
||||||
MDS_LOG_FLUSH(DEBUG, "call flush mds_table");
|
MDS_LOG_FLUSH(DEBUG, "call flush mds_table");
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
@ -23,7 +23,8 @@ namespace mds
|
|||||||
ObMdsTableMergeDagParam::ObMdsTableMergeDagParam()
|
ObMdsTableMergeDagParam::ObMdsTableMergeDagParam()
|
||||||
: ls_id_(),
|
: ls_id_(),
|
||||||
tablet_id_(),
|
tablet_id_(),
|
||||||
flush_scn_(share::SCN::invalid_scn())
|
flush_scn_(share::SCN::invalid_scn()),
|
||||||
|
generate_ts_(0)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
} // namespace mds
|
} // namespace mds
|
||||||
|
|||||||
@ -33,11 +33,12 @@ public:
|
|||||||
virtual bool is_valid() const override;
|
virtual bool is_valid() const override;
|
||||||
bool operator==(const ObMdsTableMergeDagParam &other) const;
|
bool operator==(const ObMdsTableMergeDagParam &other) const;
|
||||||
|
|
||||||
TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(flush_scn));
|
TO_STRING_KV(K_(ls_id), K_(tablet_id), K_(flush_scn), KTIME_(generate_ts));
|
||||||
public:
|
public:
|
||||||
share::ObLSID ls_id_;
|
share::ObLSID ls_id_;
|
||||||
common::ObTabletID tablet_id_;
|
common::ObTabletID tablet_id_;
|
||||||
share::SCN flush_scn_;
|
share::SCN flush_scn_;
|
||||||
|
int64_t generate_ts_;
|
||||||
};
|
};
|
||||||
|
|
||||||
inline bool ObMdsTableMergeDagParam::is_valid() const
|
inline bool ObMdsTableMergeDagParam::is_valid() const
|
||||||
|
|||||||
@ -41,6 +41,7 @@ int ObMdsTableMergeTask::init(const ObMdsTableMergeDagParam ¶m)
|
|||||||
ret = OB_INIT_TWICE;
|
ret = OB_INIT_TWICE;
|
||||||
} else {
|
} else {
|
||||||
param_ = param;
|
param_ = param;
|
||||||
|
param_.generate_ts_ = ObClockGenerator::getClock();
|
||||||
is_inited_ = true;
|
is_inited_ = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -77,6 +78,8 @@ int ObMdsTableMergeTask::process()
|
|||||||
LOG_WARN("tablet is null", K(ret), K(ls_id), K(tablet_handle));
|
LOG_WARN("tablet is null", K(ret), K(ls_id), K(tablet_handle));
|
||||||
} else if (OB_FAIL(ls->get_tablet_svr()->build_new_tablet_from_mds_table(tablet_id, flush_scn))) {
|
} else if (OB_FAIL(ls->get_tablet_svr()->build_new_tablet_from_mds_table(tablet_id, flush_scn))) {
|
||||||
LOG_WARN("failed to build new tablet from mds table", K(ret), K(ls_id), K(tablet_id), K(flush_scn));
|
LOG_WARN("failed to build new tablet from mds table", K(ret), K(ls_id), K(tablet_id), K(flush_scn));
|
||||||
|
} else {
|
||||||
|
share::dag_yield();
|
||||||
}
|
}
|
||||||
|
|
||||||
// always notify flush ret
|
// always notify flush ret
|
||||||
|
|||||||
Reference in New Issue
Block a user