add dag scheduled count & fix diagnose
This commit is contained in:
@ -1531,6 +1531,7 @@ ObTenantDagScheduler::ObTenantDagScheduler()
|
|||||||
work_thread_num_(0),
|
work_thread_num_(0),
|
||||||
default_work_thread_num_(0),
|
default_work_thread_num_(0),
|
||||||
total_running_task_cnt_(0),
|
total_running_task_cnt_(0),
|
||||||
|
scheduled_task_cnt_(0),
|
||||||
tg_id_(-1)
|
tg_id_(-1)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -1607,6 +1608,7 @@ int ObTenantDagScheduler::init(
|
|||||||
check_period_ = check_period;
|
check_period_ = check_period;
|
||||||
loop_waiting_dag_list_period_ = loop_waiting_list_period;
|
loop_waiting_dag_list_period_ = loop_waiting_list_period;
|
||||||
MEMSET(dag_cnts_, 0, sizeof(dag_cnts_));
|
MEMSET(dag_cnts_, 0, sizeof(dag_cnts_));
|
||||||
|
MEMSET(scheduled_task_cnts_, 0, sizeof(scheduled_task_cnts_));
|
||||||
MEMSET(dag_net_cnts_, 0, sizeof(dag_net_cnts_));
|
MEMSET(dag_net_cnts_, 0, sizeof(dag_net_cnts_));
|
||||||
MEMSET(running_task_cnts_, 0, sizeof(running_task_cnts_));
|
MEMSET(running_task_cnts_, 0, sizeof(running_task_cnts_));
|
||||||
|
|
||||||
@ -1705,8 +1707,10 @@ void ObTenantDagScheduler::destroy()
|
|||||||
total_worker_cnt_ = 0;
|
total_worker_cnt_ = 0;
|
||||||
work_thread_num_ = 0;
|
work_thread_num_ = 0;
|
||||||
total_running_task_cnt_ = 0;
|
total_running_task_cnt_ = 0;
|
||||||
|
scheduled_task_cnt_ = 0;
|
||||||
MEMSET(running_task_cnts_, 0, sizeof(running_task_cnts_));
|
MEMSET(running_task_cnts_, 0, sizeof(running_task_cnts_));
|
||||||
MEMSET(dag_cnts_, 0, sizeof(dag_cnts_));
|
MEMSET(dag_cnts_, 0, sizeof(dag_cnts_));
|
||||||
|
MEMSET(scheduled_task_cnts_, 0, sizeof(scheduled_task_cnts_));
|
||||||
MEMSET(dag_net_cnts_, 0, sizeof(dag_net_cnts_));
|
MEMSET(dag_net_cnts_, 0, sizeof(dag_net_cnts_));
|
||||||
waiting_workers_.reset();
|
waiting_workers_.reset();
|
||||||
running_workers_.reset();
|
running_workers_.reset();
|
||||||
@ -1896,15 +1900,19 @@ int ObTenantDagScheduler::add_dag(
|
|||||||
void ObTenantDagScheduler::dump_dag_status()
|
void ObTenantDagScheduler::dump_dag_status()
|
||||||
{
|
{
|
||||||
if (REACH_TENANT_TIME_INTERVAL(DUMP_DAG_STATUS_INTERVAL)) {
|
if (REACH_TENANT_TIME_INTERVAL(DUMP_DAG_STATUS_INTERVAL)) {
|
||||||
|
int64_t scheduled_task_cnt = 0;
|
||||||
int64_t running_task[ObDagPrio::DAG_PRIO_MAX];
|
int64_t running_task[ObDagPrio::DAG_PRIO_MAX];
|
||||||
int64_t low_limits[ObDagPrio::DAG_PRIO_MAX];
|
int64_t low_limits[ObDagPrio::DAG_PRIO_MAX];
|
||||||
int64_t up_limits[ObDagPrio::DAG_PRIO_MAX];
|
int64_t up_limits[ObDagPrio::DAG_PRIO_MAX];
|
||||||
int64_t dag_count[ObDagType::DAG_TYPE_MAX];
|
int64_t dag_count[ObDagType::DAG_TYPE_MAX];
|
||||||
|
int64_t scheduled_task_count[ObDagType::DAG_TYPE_MAX];
|
||||||
int64_t dag_net_count[ObDagNetType::DAG_NET_TYPE_MAX];
|
int64_t dag_net_count[ObDagNetType::DAG_NET_TYPE_MAX];
|
||||||
int64_t ready_dag_count[ObDagPrio::DAG_PRIO_MAX];
|
int64_t ready_dag_count[ObDagPrio::DAG_PRIO_MAX];
|
||||||
int64_t waiting_dag_count[ObDagPrio::DAG_PRIO_MAX];
|
int64_t waiting_dag_count[ObDagPrio::DAG_PRIO_MAX];
|
||||||
{
|
{
|
||||||
ObThreadCondGuard guard(scheduler_sync_);
|
ObThreadCondGuard guard(scheduler_sync_);
|
||||||
|
scheduled_task_cnt = scheduled_task_cnt_;
|
||||||
|
scheduled_task_cnt_ = 0;
|
||||||
for (int64_t i = 0; i < ObDagPrio::DAG_PRIO_MAX; ++i) {
|
for (int64_t i = 0; i < ObDagPrio::DAG_PRIO_MAX; ++i) {
|
||||||
running_task[i] = running_task_cnts_[i];
|
running_task[i] = running_task_cnts_[i];
|
||||||
low_limits[i] = low_limits_[i];
|
low_limits[i] = low_limits_[i];
|
||||||
@ -1914,7 +1922,9 @@ void ObTenantDagScheduler::dump_dag_status()
|
|||||||
}
|
}
|
||||||
for (int64_t i = 0; i < ObDagType::DAG_TYPE_MAX; ++i) {
|
for (int64_t i = 0; i < ObDagType::DAG_TYPE_MAX; ++i) {
|
||||||
dag_count[i] = dag_cnts_[i];
|
dag_count[i] = dag_cnts_[i];
|
||||||
|
scheduled_task_count[i] = scheduled_task_cnts_[i];
|
||||||
}
|
}
|
||||||
|
MEMSET(scheduled_task_cnts_, 0, sizeof(scheduled_task_cnts_));
|
||||||
COMMON_LOG(INFO, "dump_dag_status", K_(dag_cnt), "map_size", dag_map_.size());
|
COMMON_LOG(INFO, "dump_dag_status", K_(dag_cnt), "map_size", dag_map_.size());
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
@ -1938,13 +1948,14 @@ void ObTenantDagScheduler::dump_dag_status()
|
|||||||
}
|
}
|
||||||
for (int64_t i = 0; i < ObDagType::DAG_TYPE_MAX; ++i) {
|
for (int64_t i = 0; i < ObDagType::DAG_TYPE_MAX; ++i) {
|
||||||
COMMON_LOG(INFO, "dump_dag_status", "type", OB_DAG_TYPES[i], "dag_count", dag_count[i]);
|
COMMON_LOG(INFO, "dump_dag_status", "type", OB_DAG_TYPES[i], "dag_count", dag_count[i]);
|
||||||
|
COMMON_LOG(INFO, "dump_dag_status", "type", OB_DAG_TYPES[i], "scheduled_task_count", scheduled_task_count[i]);
|
||||||
}
|
}
|
||||||
for (int64_t i = 0; i < ObDagNetType::DAG_NET_TYPE_MAX; ++i) {
|
for (int64_t i = 0; i < ObDagNetType::DAG_NET_TYPE_MAX; ++i) {
|
||||||
COMMON_LOG(INFO, "dump_dag_status[DAG_NET]", "type", OB_DAG_NET_TYPES[i].dag_net_type_str_,
|
COMMON_LOG(INFO, "dump_dag_status[DAG_NET]", "type", OB_DAG_NET_TYPES[i].dag_net_type_str_,
|
||||||
"dag_count", dag_net_count[i]);
|
"dag_net_count", dag_net_count[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
COMMON_LOG(INFO, "dump_dag_status", K_(total_worker_cnt), K_(total_running_task_cnt), K_(work_thread_num));
|
COMMON_LOG(INFO, "dump_dag_status", K_(total_worker_cnt), K_(total_running_task_cnt), K_(work_thread_num), K(scheduled_task_cnt));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -2977,6 +2988,8 @@ int ObTenantDagScheduler::schedule_one(const int64_t priority)
|
|||||||
if (OB_SUCC(ret) && NULL != worker) {
|
if (OB_SUCC(ret) && NULL != worker) {
|
||||||
++running_task_cnts_[priority];
|
++running_task_cnts_[priority];
|
||||||
++total_running_task_cnt_;
|
++total_running_task_cnt_;
|
||||||
|
++scheduled_task_cnt_;
|
||||||
|
++scheduled_task_cnts_[worker->get_task()->get_dag()->get_type()];
|
||||||
running_workers_.add_last(worker, priority);
|
running_workers_.add_last(worker, priority);
|
||||||
if (task != NULL) {
|
if (task != NULL) {
|
||||||
COMMON_LOG(INFO, "schedule one task", KP(task), "priority", OB_DAG_PRIOS[priority].dag_prio_str_,
|
COMMON_LOG(INFO, "schedule one task", KP(task), "priority", OB_DAG_PRIOS[priority].dag_prio_str_,
|
||||||
|
|||||||
@ -945,9 +945,11 @@ private:
|
|||||||
int64_t work_thread_num_;
|
int64_t work_thread_num_;
|
||||||
int64_t default_work_thread_num_;
|
int64_t default_work_thread_num_;
|
||||||
int64_t total_running_task_cnt_;
|
int64_t total_running_task_cnt_;
|
||||||
|
int64_t scheduled_task_cnt_; // interval scheduled task count
|
||||||
int64_t running_task_cnts_[ObDagPrio::DAG_PRIO_MAX];
|
int64_t running_task_cnts_[ObDagPrio::DAG_PRIO_MAX];
|
||||||
int64_t low_limits_[ObDagPrio::DAG_PRIO_MAX]; // wait to delete
|
int64_t low_limits_[ObDagPrio::DAG_PRIO_MAX]; // wait to delete
|
||||||
int64_t up_limits_[ObDagPrio::DAG_PRIO_MAX]; // wait to delete
|
int64_t up_limits_[ObDagPrio::DAG_PRIO_MAX]; // wait to delete
|
||||||
|
int64_t scheduled_task_cnts_[ObDagType::DAG_TYPE_MAX]; // interval scheduled dag count
|
||||||
int64_t dag_cnts_[ObDagType::DAG_TYPE_MAX];
|
int64_t dag_cnts_[ObDagType::DAG_TYPE_MAX];
|
||||||
int64_t dag_net_cnts_[ObDagNetType::DAG_NET_TYPE_MAX];
|
int64_t dag_net_cnts_[ObDagNetType::DAG_NET_TYPE_MAX];
|
||||||
common::ObFIFOAllocator allocator_;
|
common::ObFIFOAllocator allocator_;
|
||||||
|
|||||||
@ -1122,7 +1122,6 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_major_and_medium(
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int tmp_ret = OB_SUCCESS;
|
int tmp_ret = OB_SUCCESS;
|
||||||
tablet_major_finish = false;
|
tablet_major_finish = false;
|
||||||
const storage::ObMergeType merge_type = MEDIUM_MERGE;
|
|
||||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||||
ObITable *last_major_sstable = nullptr;
|
ObITable *last_major_sstable = nullptr;
|
||||||
int64_t max_sync_medium_scn = 0;
|
int64_t max_sync_medium_scn = 0;
|
||||||
@ -1141,34 +1140,36 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_major_and_medium(
|
|||||||
} else {
|
} else {
|
||||||
// diagnose medium
|
// diagnose medium
|
||||||
LOG_TRACE("diagnose tablet medium merge", K(max_sync_medium_scn));
|
LOG_TRACE("diagnose tablet medium merge", K(max_sync_medium_scn));
|
||||||
if (max_sync_medium_scn > last_major_sstable->get_snapshot_version()) {
|
if (!diagnose_major_flag || (diagnose_major_flag && max_sync_medium_scn < compaction_scn)) {
|
||||||
if (tablet.get_snapshot_version() < max_sync_medium_scn) { // wait mini compaction or tablet freeze
|
if (max_sync_medium_scn > last_major_sstable->get_snapshot_version()) {
|
||||||
if (ObTimeUtility::current_time_ns() > max_sync_medium_scn + WAIT_MEDIUM_SCHEDULE_INTERVAL) {
|
if (tablet.get_snapshot_version() < max_sync_medium_scn) { // wait mini compaction or tablet freeze
|
||||||
if (DIAGNOSE_TABELT_MAX_COUNT > medium_not_schedule_count_ && can_add_diagnose_info()) {
|
if (ObTimeUtility::current_time_ns() > max_sync_medium_scn + WAIT_MEDIUM_SCHEDULE_INTERVAL) {
|
||||||
SET_DIAGNOSE_INFO(
|
if (DIAGNOSE_TABELT_MAX_COUNT > medium_not_schedule_count_ && can_add_diagnose_info()) {
|
||||||
info_array_[idx_++],
|
SET_DIAGNOSE_INFO(
|
||||||
merge_type,
|
info_array_[idx_++],
|
||||||
MTL_ID(),
|
MEDIUM_MERGE,
|
||||||
ls_id,
|
MTL_ID(),
|
||||||
tablet_id,
|
ls_id,
|
||||||
gen_diagnose_status(max_sync_medium_scn),
|
tablet_id,
|
||||||
ObTimeUtility::fast_current_time(),
|
gen_diagnose_status(max_sync_medium_scn),
|
||||||
"medium wait for freeze, interval", static_cast<int64_t>(WAIT_MEDIUM_SCHEDULE_INTERVAL),
|
ObTimeUtility::fast_current_time(),
|
||||||
"max_receive_medium_scn", max_sync_medium_scn,
|
"medium wait for freeze, interval", static_cast<int64_t>(WAIT_MEDIUM_SCHEDULE_INTERVAL / NS_TIME),
|
||||||
"tablet_snapshot", tablet.get_snapshot_version());
|
"max_receive_medium_scn", max_sync_medium_scn,
|
||||||
|
"tablet_snapshot", tablet.get_snapshot_version());
|
||||||
|
}
|
||||||
|
++medium_not_schedule_count_;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// last medium not finish or schedule
|
||||||
|
ObTabletMajorMergeDag dag;
|
||||||
|
if (OB_TMP_FAIL(diagnose_tablet_merge(
|
||||||
|
dag,
|
||||||
|
MEDIUM_MERGE,
|
||||||
|
ls_id,
|
||||||
|
tablet_id,
|
||||||
|
max_sync_medium_scn))) {
|
||||||
|
LOG_WARN("diagnose failed", K(tmp_ret), K(ls_id), K(tablet_id), KPC(last_major_sstable));
|
||||||
}
|
}
|
||||||
++medium_not_schedule_count_;
|
|
||||||
}
|
|
||||||
} else if (max_sync_medium_scn != compaction_scn) {
|
|
||||||
// last medium not finish or schedule
|
|
||||||
ObTabletMajorMergeDag dag;
|
|
||||||
if (OB_TMP_FAIL(diagnose_tablet_merge(
|
|
||||||
dag,
|
|
||||||
merge_type,
|
|
||||||
ls_id,
|
|
||||||
tablet_id,
|
|
||||||
max_sync_medium_scn))) {
|
|
||||||
LOG_WARN("diagnose failed", K(tmp_ret), K(ls_id), K(tablet_id), KPC(last_major_sstable));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1185,7 +1186,7 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_major_and_medium(
|
|||||||
if (max_sync_medium_scn < compaction_scn
|
if (max_sync_medium_scn < compaction_scn
|
||||||
&& max_sync_medium_scn == last_major_sstable->get_snapshot_version()) {
|
&& max_sync_medium_scn == last_major_sstable->get_snapshot_version()) {
|
||||||
// last compaction finish
|
// last compaction finish
|
||||||
if (OB_TMP_FAIL(get_suspect_info_and_print(merge_type, ls_id, tablet_id))) {
|
if (OB_TMP_FAIL(get_suspect_info_and_print(MEDIUM_MERGE, ls_id, tablet_id))) {
|
||||||
if (OB_HASH_NOT_EXIST != tmp_ret) {
|
if (OB_HASH_NOT_EXIST != tmp_ret) {
|
||||||
LOG_WARN("failed get major merge suspect info", K(ret), K(ls_id));
|
LOG_WARN("failed get major merge suspect info", K(ret), K(ls_id));
|
||||||
}
|
}
|
||||||
@ -1202,7 +1203,7 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_major_and_medium(
|
|||||||
tablet_id,
|
tablet_id,
|
||||||
gen_diagnose_status(compaction_scn),
|
gen_diagnose_status(compaction_scn),
|
||||||
ObTimeUtility::fast_current_time(),
|
ObTimeUtility::fast_current_time(),
|
||||||
"major not schedule for long time, interval", static_cast<int64_t>(WAIT_MEDIUM_SCHEDULE_INTERVAL * 2),
|
"major not schedule for long time, interval", static_cast<int64_t>(WAIT_MEDIUM_SCHEDULE_INTERVAL * 2 / NS_TIME),
|
||||||
"max_receive_medium_snapshot", max_sync_medium_scn,
|
"max_receive_medium_snapshot", max_sync_medium_scn,
|
||||||
"compaction_scn", compaction_scn))) {
|
"compaction_scn", compaction_scn))) {
|
||||||
LOG_WARN("failed to add diagnose info", K(ret), K(ls_id), K(tablet_id));
|
LOG_WARN("failed to add diagnose info", K(ret), K(ls_id), K(tablet_id));
|
||||||
@ -1215,13 +1216,13 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_major_and_medium(
|
|||||||
if (DIAGNOSE_TABELT_MAX_COUNT > major_not_schedule_count_ && can_add_diagnose_info()) {
|
if (DIAGNOSE_TABELT_MAX_COUNT > major_not_schedule_count_ && can_add_diagnose_info()) {
|
||||||
SET_DIAGNOSE_INFO(
|
SET_DIAGNOSE_INFO(
|
||||||
info_array_[idx_++],
|
info_array_[idx_++],
|
||||||
merge_type,
|
MAJOR_MERGE,
|
||||||
MTL_ID(),
|
MTL_ID(),
|
||||||
ls_id,
|
ls_id,
|
||||||
tablet_id,
|
tablet_id,
|
||||||
gen_diagnose_status(compaction_scn),
|
gen_diagnose_status(compaction_scn),
|
||||||
ObTimeUtility::fast_current_time(),
|
ObTimeUtility::fast_current_time(),
|
||||||
"major wait for freeze, interval", static_cast<int64_t>(WAIT_MEDIUM_SCHEDULE_INTERVAL),
|
"major wait for freeze, interval", static_cast<int64_t>(WAIT_MEDIUM_SCHEDULE_INTERVAL / NS_TIME),
|
||||||
"compaction_scn", compaction_scn,
|
"compaction_scn", compaction_scn,
|
||||||
"tablet_snapshot", tablet.get_snapshot_version());
|
"tablet_snapshot", tablet.get_snapshot_version());
|
||||||
}
|
}
|
||||||
@ -1231,7 +1232,7 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_major_and_medium(
|
|||||||
ObTabletMajorMergeDag dag;
|
ObTabletMajorMergeDag dag;
|
||||||
if (OB_TMP_FAIL(diagnose_tablet_merge(
|
if (OB_TMP_FAIL(diagnose_tablet_merge(
|
||||||
dag,
|
dag,
|
||||||
merge_type,
|
MEDIUM_MERGE,
|
||||||
ls_id,
|
ls_id,
|
||||||
tablet.get_tablet_meta().tablet_id_,
|
tablet.get_tablet_meta().tablet_id_,
|
||||||
compaction_scn))) {
|
compaction_scn))) {
|
||||||
|
|||||||
@ -486,8 +486,9 @@ private:
|
|||||||
public:
|
public:
|
||||||
typedef common::hash::ObHashMap<ObLSID, ObLSCheckStatus> LSStatusMap;
|
typedef common::hash::ObHashMap<ObLSID, ObLSCheckStatus> LSStatusMap;
|
||||||
private:
|
private:
|
||||||
static const int64_t WAIT_MEDIUM_SCHEDULE_INTERVAL = 1000L * 1000L * 1000L * 60L * 5; // 5 min // ns
|
static const int64_t NS_TIME = 1000L * 1000L * 1000L;
|
||||||
static const int64_t TOLERATE_MEDIUM_SCHEDULE_INTERVAL = 1000L * 1000L * 1000L * 60L * 60L * 5; // 5 hour
|
static const int64_t WAIT_MEDIUM_SCHEDULE_INTERVAL = NS_TIME * 60L * 5; // 5 min // ns
|
||||||
|
static const int64_t TOLERATE_MEDIUM_SCHEDULE_INTERVAL = NS_TIME * 60L * 60L * 36; // 36 hour
|
||||||
static const int64_t MAX_LS_TABLET_CNT = 10 * 10000; // TODO(@jingshui): tmp solution
|
static const int64_t MAX_LS_TABLET_CNT = 10 * 10000; // TODO(@jingshui): tmp solution
|
||||||
static const int64_t DIAGNOSE_TABELT_MAX_COUNT = 10; // same type diagnose info max count
|
static const int64_t DIAGNOSE_TABELT_MAX_COUNT = 10; // same type diagnose info max count
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
|
|||||||
Reference in New Issue
Block a user