enhance compaction diagnose
This commit is contained in:
		@ -218,8 +218,7 @@ const char *ObCompactionDiagnoseInfo::ObDiagnoseStatusStr[DIA_STATUS_MAX] = {
 | 
			
		||||
    "RUNNING",
 | 
			
		||||
    "FAILED",
 | 
			
		||||
    "FINISH",
 | 
			
		||||
    "RS_UNCOMPACTED",
 | 
			
		||||
    "DIA_FAILED"
 | 
			
		||||
    "RS_UNCOMPACTED"
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
const char * ObCompactionDiagnoseInfo::get_diagnose_status_str(ObDiagnoseStatus status)
 | 
			
		||||
@ -430,6 +429,7 @@ int ObCompactionDiagnoseMgr::diagnose_tenant_tablet()
 | 
			
		||||
      bool tenant_major_finish = true;
 | 
			
		||||
      bool ls_major_finish = true;
 | 
			
		||||
      bool tablet_major_finish = true;
 | 
			
		||||
      ObSEArray<ObLSID, 64> abnormal_ls_id;
 | 
			
		||||
      while (OB_SUCC(ret) && can_add_diagnose_info()) { // loop all log_stream
 | 
			
		||||
        bool need_merge = false;
 | 
			
		||||
        if (OB_FAIL(ls_iter_guard.get_ptr()->get_next(ls))) {
 | 
			
		||||
@ -481,17 +481,7 @@ int ObCompactionDiagnoseMgr::diagnose_tenant_tablet()
 | 
			
		||||
          if (OB_FAIL(ls->get_ls_info(ls_info))) {
 | 
			
		||||
            LOG_WARN("failed to get ls info", K(ret), K(ls));
 | 
			
		||||
          } else if (MAX_LS_TABLET_CNT < ls_info.tablet_count_) {
 | 
			
		||||
            if (can_add_diagnose_info()) {
 | 
			
		||||
              SET_DIAGNOSE_INFO(
 | 
			
		||||
                  info_array_[idx_++],
 | 
			
		||||
                  MERGE_TYPE_MAX,
 | 
			
		||||
                  MTL_ID(),
 | 
			
		||||
                  ls_id,
 | 
			
		||||
                  ObTabletID(INT64_MAX),
 | 
			
		||||
                  ObCompactionDiagnoseInfo::DIA_STATUS_DIA_FAILED,
 | 
			
		||||
                  ObTimeUtility::fast_current_time(),
 | 
			
		||||
                  "there is too many tablets. tablet count", ls_info.tablet_count_);
 | 
			
		||||
            }
 | 
			
		||||
            LOG_INFO("there is too many tablets, skip diagnose in this ls", K(ls_id), "tablet_count", ls_info.tablet_count_);
 | 
			
		||||
          } else if (OB_FAIL(ls->build_tablet_iter(tablet_iter))) {
 | 
			
		||||
            LOG_WARN("failed to build ls tablet iter", K(ret), K(ls));
 | 
			
		||||
          } else {
 | 
			
		||||
@ -525,7 +515,7 @@ int ObCompactionDiagnoseMgr::diagnose_tenant_tablet()
 | 
			
		||||
                if (OB_TMP_FAIL(diagnose_tablet_minor_merge(ls_id, *tablet_handle.get_obj()))) {
 | 
			
		||||
                  LOG_WARN("failed to get diagnose minor merge", K(tmp_ret));
 | 
			
		||||
                }
 | 
			
		||||
                if (OB_TMP_FAIL(diagnose_tablet_medium_merge(ls_id, *tablet_handle.get_obj()))) {
 | 
			
		||||
                if (OB_TMP_FAIL(diagnose_tablet_medium_merge(compaction_scn, ls_id, *tablet_handle.get_obj()))) {
 | 
			
		||||
                  LOG_WARN("failed to get diagnose medium merge", K(tmp_ret));
 | 
			
		||||
                }
 | 
			
		||||
              }
 | 
			
		||||
@ -533,18 +523,27 @@ int ObCompactionDiagnoseMgr::diagnose_tenant_tablet()
 | 
			
		||||
            tenant_major_finish &= ls_major_finish;
 | 
			
		||||
            LOG_INFO("finish ls merge diagnose", K(ret), K(ls_id));
 | 
			
		||||
          }
 | 
			
		||||
        } else {
 | 
			
		||||
          (void)abnormal_ls_id.push_back(ls->get_ls_id());
 | 
			
		||||
        }
 | 
			
		||||
      } // end of while
 | 
			
		||||
      if (diagnose_major_flag && tenant_major_finish && can_add_diagnose_info()) {
 | 
			
		||||
        ObCompactionDiagnoseInfo &info = info_array_[idx_++];
 | 
			
		||||
        SET_DIAGNOSE_INFO(
 | 
			
		||||
            info_array_[idx_++],
 | 
			
		||||
            MEDIUM_MERGE,
 | 
			
		||||
            MTL_ID(),
 | 
			
		||||
            share::ObLSID(INT64_MAX),
 | 
			
		||||
            ObTabletID(INT64_MAX),
 | 
			
		||||
            ObCompactionDiagnoseInfo::DIA_STATUS_FINISH,
 | 
			
		||||
            ObTimeUtility::fast_current_time(),
 | 
			
		||||
            "compaction has finished in storage. compaction_scn", compaction_scn);
 | 
			
		||||
          info,
 | 
			
		||||
          MEDIUM_MERGE,
 | 
			
		||||
          MTL_ID(),
 | 
			
		||||
          share::ObLSID(INT64_MAX),
 | 
			
		||||
          ObTabletID(INT64_MAX),
 | 
			
		||||
          ObCompactionDiagnoseInfo::DIA_STATUS_FINISH,
 | 
			
		||||
          ObTimeUtility::fast_current_time(),
 | 
			
		||||
          "test: compaction has finished in storage, please check RS. compaction_scn", compaction_scn);
 | 
			
		||||
        if (!abnormal_ls_id.empty()) {
 | 
			
		||||
          char * buf = info.diagnose_info_;
 | 
			
		||||
          const int64_t buf_len = common::OB_DIAGNOSE_INFO_LENGTH;
 | 
			
		||||
          ADD_COMPACTION_INFO_PARAM(buf, buf_len,
 | 
			
		||||
              "some ls may offline", abnormal_ls_id);
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
@ -733,6 +732,7 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_minor_merge(const ObLSID &ls_id, Ob
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObCompactionDiagnoseMgr::diagnose_tablet_medium_merge(
 | 
			
		||||
    const int64_t compaction_scn,
 | 
			
		||||
    const ObLSID &ls_id,
 | 
			
		||||
    ObTablet &tablet)
 | 
			
		||||
{
 | 
			
		||||
@ -763,7 +763,7 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_medium_merge(
 | 
			
		||||
                "tablet_snapshot", tablet.get_snapshot_version()))) {
 | 
			
		||||
        LOG_WARN("failed to add diagnose info", K(ret), K(ls_id), K(tablet_id));
 | 
			
		||||
      }
 | 
			
		||||
    } else {
 | 
			
		||||
    } else if (max_sync_medium_scn != compaction_scn) {
 | 
			
		||||
      ObTabletMajorMergeDag dag;
 | 
			
		||||
      if (OB_FAIL(diagnose_tablet_merge(
 | 
			
		||||
              dag,
 | 
			
		||||
@ -851,6 +851,7 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_merge(
 | 
			
		||||
  } else if (progress.is_valid()) { // dag exist, means compaction is running
 | 
			
		||||
    // check progress is normal
 | 
			
		||||
    if (progress.is_suspect_abormal_) { // progress is abnomal
 | 
			
		||||
      const char* current_status = progress.is_waiting_schedule_ ? "dag may be waiting for schedule" : "dag may hang";
 | 
			
		||||
      if (can_add_diagnose_info()
 | 
			
		||||
            && OB_FAIL(SET_DIAGNOSE_INFO(
 | 
			
		||||
              info_array_[idx_++],
 | 
			
		||||
@ -860,7 +861,7 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_merge(
 | 
			
		||||
              tablet_id,
 | 
			
		||||
              ObCompactionDiagnoseInfo::DIA_STATUS_RUNNING,
 | 
			
		||||
              ObTimeUtility::fast_current_time(),
 | 
			
		||||
              "current_status", "dag may hang",
 | 
			
		||||
              K(current_status),
 | 
			
		||||
              "merge_progress", progress))) {
 | 
			
		||||
        LOG_WARN("failed to add diagnose info", K(ret), K(ls_id), K(tablet_id), K(progress));
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
@ -94,7 +94,6 @@ struct ObCompactionDiagnoseInfo
 | 
			
		||||
    DIA_STATUS_FAILED = 2,
 | 
			
		||||
    DIA_STATUS_FINISH = 3,
 | 
			
		||||
    DIA_STATUS_RS_UNCOMPACTED = 4, // RS diagnose
 | 
			
		||||
    DIA_STATUS_DIA_FAILED = 5,
 | 
			
		||||
    DIA_STATUS_MAX
 | 
			
		||||
  };
 | 
			
		||||
  const static char *ObDiagnoseStatusStr[DIA_STATUS_MAX];
 | 
			
		||||
@ -134,6 +133,7 @@ private:
 | 
			
		||||
  int diagnose_tablet_mini_merge(const ObLSID &ls_id, ObTablet &tablet);
 | 
			
		||||
  int diagnose_tablet_minor_merge(const ObLSID &ls_id, ObTablet &tablet);
 | 
			
		||||
  int diagnose_tablet_medium_merge(
 | 
			
		||||
      const int64_t compaction_scn,
 | 
			
		||||
      const ObLSID &ls_id,
 | 
			
		||||
      ObTablet &tablet);
 | 
			
		||||
  int diagnose_tablet_major_merge(
 | 
			
		||||
 | 
			
		||||
@ -45,6 +45,7 @@ ObPartitionMergeProgress::ObPartitionMergeProgress(common::ObIAllocator &allocat
 | 
			
		||||
    pre_scanned_row_cnt_(0),
 | 
			
		||||
    pre_output_block_cnt_(0),
 | 
			
		||||
    is_updating_(false),
 | 
			
		||||
    is_waiting_schedule_(true),
 | 
			
		||||
    is_inited_(false)
 | 
			
		||||
{
 | 
			
		||||
}
 | 
			
		||||
@ -72,6 +73,7 @@ void ObPartitionMergeProgress::reset()
 | 
			
		||||
  pre_output_block_cnt_ = 0;
 | 
			
		||||
  concurrent_cnt_ = 0;
 | 
			
		||||
  is_updating_ = false;
 | 
			
		||||
  is_waiting_schedule_ = true;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -112,6 +114,7 @@ int ObPartitionMergeProgress::init(ObTabletMergeCtx *ctx, const ObTableReadInfo
 | 
			
		||||
    MEMSET(buf, 0, sizeof(int64_t) * concurrent_cnt * 2);
 | 
			
		||||
    scanned_row_cnt_arr_ = buf;
 | 
			
		||||
    output_block_cnt_arr_ = buf + concurrent_cnt;
 | 
			
		||||
    is_waiting_schedule_ = false;
 | 
			
		||||
 | 
			
		||||
    concurrent_cnt_ = concurrent_cnt;
 | 
			
		||||
    merge_dag_ = merge_dag;
 | 
			
		||||
@ -224,21 +227,6 @@ int ObPartitionMergeProgress::estimate(ObTabletMergeCtx *ctx)
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObPartitionMergeProgress::update_row_count(const int64_t idx, const int64_t incre_row_cnt)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (IS_NOT_INIT) {
 | 
			
		||||
    ret = OB_NOT_INIT;
 | 
			
		||||
    LOG_WARN("ObPartitionMergeProgress not inited", K(ret));
 | 
			
		||||
  } else if (incre_row_cnt > 0) {
 | 
			
		||||
    scanned_row_cnt_arr_[idx] += incre_row_cnt;
 | 
			
		||||
    if (REACH_TENANT_TIME_INTERVAL(UPDATE_INTERVAL)) {
 | 
			
		||||
      latest_update_ts_ = ObTimeUtility::fast_current_time();
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObPartitionMergeProgress::update_merge_progress(
 | 
			
		||||
    const int64_t idx,
 | 
			
		||||
    const int64_t scanned_row_cnt,
 | 
			
		||||
@ -254,7 +242,6 @@ int ObPartitionMergeProgress::update_merge_progress(
 | 
			
		||||
  } else if (scanned_row_cnt > scanned_row_cnt_arr_[idx] || output_block_cnt > output_block_cnt_arr_[idx]) {
 | 
			
		||||
    scanned_row_cnt_arr_[idx] = MAX(scanned_row_cnt_arr_[idx], scanned_row_cnt);
 | 
			
		||||
    output_block_cnt_arr_[idx] = MAX(output_block_cnt_arr_[idx], output_block_cnt);
 | 
			
		||||
 | 
			
		||||
    if (REACH_TENANT_TIME_INTERVAL(UPDATE_INTERVAL)) {
 | 
			
		||||
      if (!ATOMIC_CAS(&is_updating_, false, true)) {
 | 
			
		||||
        latest_update_ts_ = ObTimeUtility::fast_current_time();
 | 
			
		||||
@ -351,7 +338,9 @@ int ObPartitionMergeProgress::diagnose_progress(ObDiagnoseTabletCompProgress &in
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (ObTimeUtility::fast_current_time() - latest_update_ts_ > UPDATE_INTERVAL * NORMAL_UPDATE_PARAM) {
 | 
			
		||||
    input_progress.is_suspect_abormal_ = true;
 | 
			
		||||
    input_progress.is_waiting_schedule_ = is_waiting_schedule_;
 | 
			
		||||
  }
 | 
			
		||||
  input_progress.latest_update_ts_ = latest_update_ts_;
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -37,7 +37,6 @@ public:
 | 
			
		||||
  void reset();
 | 
			
		||||
  OB_INLINE bool is_inited() const { return is_inited_; }
 | 
			
		||||
  int init(ObTabletMergeCtx *ctx, const storage::ObTableReadInfo &read_info);
 | 
			
		||||
  int update_row_count(const int64_t idx, const int64_t incre_row_cnt);
 | 
			
		||||
  virtual int update_merge_progress(const int64_t idx, const int64_t scanned_row_count, const int64_t output_block_cnt);
 | 
			
		||||
  virtual int finish_merge_progress(const int64_t output_cnt);
 | 
			
		||||
  int update_merge_info(storage::ObSSTableMergeInfo &merge_info);
 | 
			
		||||
@ -73,6 +72,7 @@ protected:
 | 
			
		||||
  int64_t pre_scanned_row_cnt_; // for smooth the progress curve
 | 
			
		||||
  int64_t pre_output_block_cnt_;
 | 
			
		||||
  bool is_updating_; // atomic lock
 | 
			
		||||
  bool is_waiting_schedule_;
 | 
			
		||||
  bool is_inited_;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -251,7 +251,7 @@ int ObBasicTabletMergeDag::get_tablet_and_compat_mode()
 | 
			
		||||
      ++inc_sstable_cnt;
 | 
			
		||||
    }
 | 
			
		||||
    if (OB_SUCC(ret) && inc_sstable_cnt >= MAX_SSTABLE_CNT_IN_STORAGE) {
 | 
			
		||||
      ret = OB_EAGAIN;
 | 
			
		||||
      ret = OB_TOO_MANY_SSTABLE;
 | 
			
		||||
      LOG_WARN("Too many sstables in tablet, cannot schdule mini compaction, retry later",
 | 
			
		||||
          K(ret), K_(ls_id), K_(tablet_id), K(inc_sstable_cnt), K(tmp_tablet_handle.get_obj()));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -120,8 +120,10 @@ void ObDiagnoseTabletCompProgress::reset()
 | 
			
		||||
{
 | 
			
		||||
  ObCompactionProgress::reset();
 | 
			
		||||
  is_suspect_abormal_ = false;
 | 
			
		||||
  is_waiting_schedule_ = false;
 | 
			
		||||
  dag_id_.reset();
 | 
			
		||||
  create_time_ = 0;
 | 
			
		||||
  latest_update_ts_ = 0;
 | 
			
		||||
  base_version_ = 0;
 | 
			
		||||
  snapshot_version_ = 0;
 | 
			
		||||
}
 | 
			
		||||
@ -398,6 +400,9 @@ int ObTenantCompactionProgressMgr::update_progress(
 | 
			
		||||
      } else if (array_[pos].estimated_finish_time_ < estimate_finish_time) {
 | 
			
		||||
        array_[pos].estimated_finish_time_  = estimate_finish_time;
 | 
			
		||||
      }
 | 
			
		||||
      if (ObPartitionMergeProgress::MAX_ESTIMATE_SPEND_TIME < array_[pos].estimated_finish_time_ - array_[pos].start_time_) {
 | 
			
		||||
        array_[pos].estimated_finish_time_ = array_[pos].start_time_ + ObPartitionMergeProgress::MAX_ESTIMATE_SPEND_TIME;
 | 
			
		||||
      }
 | 
			
		||||
      LOG_DEBUG("success to update progress", K(ret), K(total_data_size_delta), K(output_block_cnt_delta),
 | 
			
		||||
          K(scanned_data_size_delta), K(array_[pos]));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -179,6 +179,7 @@ struct ObDiagnoseTabletCompProgress : public ObCompactionProgress
 | 
			
		||||
  ObDiagnoseTabletCompProgress()
 | 
			
		||||
    : ObCompactionProgress(),
 | 
			
		||||
      is_suspect_abormal_(false),
 | 
			
		||||
      is_waiting_schedule_(false),
 | 
			
		||||
      dag_id_(),
 | 
			
		||||
      create_time_(0),
 | 
			
		||||
      latest_update_ts_(0),
 | 
			
		||||
@ -188,10 +189,11 @@ struct ObDiagnoseTabletCompProgress : public ObCompactionProgress
 | 
			
		||||
  }
 | 
			
		||||
  bool is_valid() const;
 | 
			
		||||
  void reset();
 | 
			
		||||
  INHERIT_TO_STRING_KV("ObCompactionProgress", ObCompactionProgress, K_(is_suspect_abormal), K_(create_time),
 | 
			
		||||
      K_(dag_id), K_(base_version), K_(snapshot_version), K_(status));
 | 
			
		||||
  INHERIT_TO_STRING_KV("ObCompactionProgress", ObCompactionProgress, K_(is_suspect_abormal), K_(is_waiting_schedule),
 | 
			
		||||
      K_(create_time), K_(latest_update_ts), K_(dag_id), K_(base_version), K_(snapshot_version), K_(status));
 | 
			
		||||
 | 
			
		||||
  bool is_suspect_abormal_;
 | 
			
		||||
  bool is_waiting_schedule_;
 | 
			
		||||
  share::ObDagId dag_id_;
 | 
			
		||||
  int64_t create_time_;
 | 
			
		||||
  int64_t latest_update_ts_;
 | 
			
		||||
 | 
			
		||||
@ -2089,19 +2089,6 @@ int ObMemtable::flush(share::ObLSID ls_id)
 | 
			
		||||
  if (is_flushed_) {
 | 
			
		||||
    ret = OB_NO_NEED_UPDATE;
 | 
			
		||||
  } else {
 | 
			
		||||
    if (mt_stat_.create_flush_dag_time_ == 0 &&
 | 
			
		||||
        mt_stat_.ready_for_flush_time_ != 0 &&
 | 
			
		||||
        cur_time - mt_stat_.ready_for_flush_time_ > 30 * 1000 * 1000) {
 | 
			
		||||
      STORAGE_LOG(WARN, "memtable can not create dag successfully for long time",
 | 
			
		||||
                K(ls_id), K(*this), K(mt_stat_.ready_for_flush_time_));
 | 
			
		||||
      ADD_SUSPECT_INFO(MINI_MERGE,
 | 
			
		||||
                       ls_id, get_tablet_id(),
 | 
			
		||||
                       "memtable can not create dag successfully",
 | 
			
		||||
                       "has been ready for flush time:",
 | 
			
		||||
                       cur_time - mt_stat_.ready_for_flush_time_,
 | 
			
		||||
                       "ready for flush time:",
 | 
			
		||||
                       mt_stat_.ready_for_flush_time_);
 | 
			
		||||
    }
 | 
			
		||||
    ObTabletMergeDagParam param;
 | 
			
		||||
    param.ls_id_ = ls_id;
 | 
			
		||||
    param.tablet_id_ = key_.tablet_id_;
 | 
			
		||||
@ -2116,6 +2103,23 @@ int ObMemtable::flush(share::ObLSID ls_id)
 | 
			
		||||
      mt_stat_.create_flush_dag_time_ = cur_time;
 | 
			
		||||
      TRANS_LOG(INFO, "schedule tablet merge dag successfully", K(ret), K(param), KPC(this));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (OB_FAIL(ret) && mt_stat_.create_flush_dag_time_ == 0 &&
 | 
			
		||||
        mt_stat_.ready_for_flush_time_ != 0 &&
 | 
			
		||||
        cur_time - mt_stat_.ready_for_flush_time_ > 30 * 1000 * 1000) {
 | 
			
		||||
      STORAGE_LOG(WARN, "memtable can not create dag successfully for long time",
 | 
			
		||||
                K(ls_id), K(*this), K(mt_stat_.ready_for_flush_time_));
 | 
			
		||||
      const char *str_user_error = ob_errpkt_str_user_error(ret, false);
 | 
			
		||||
      ADD_SUSPECT_INFO(MINI_MERGE,
 | 
			
		||||
                       ls_id, get_tablet_id(),
 | 
			
		||||
                       "memtable can not create dag successfully",
 | 
			
		||||
                       "extra info",
 | 
			
		||||
                       str_user_error,
 | 
			
		||||
                       "has been ready for flush time:",
 | 
			
		||||
                       cur_time - mt_stat_.ready_for_flush_time_,
 | 
			
		||||
                       "ready for flush time:",
 | 
			
		||||
                       mt_stat_.ready_for_flush_time_);
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return ret;
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user