change barrier type for medium clog
This commit is contained in:
parent
bbf81bb574
commit
90cb52ce38
@ -378,7 +378,7 @@ int ObCompactionDiagnoseMgr::diagnose_tenant_tablet()
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
bool diagnose_major_flag = false;
|
||||
ObTenantTabletScheduler *scheduler = MTL(ObTenantTabletScheduler*);
|
||||
int64_t compaction_scn = scheduler->get_frozen_version();
|
||||
int64_t compaction_scn = MAX(scheduler->get_frozen_version(), MTL(ObTenantFreezeInfoMgr*)->get_latest_frozen_version());
|
||||
ObTenantFreezeInfoMgr::FreezeInfo freeze_info;
|
||||
|
||||
if (compaction_scn > scheduler->get_merged_version()) { // check major merge
|
||||
@ -422,7 +422,7 @@ int ObCompactionDiagnoseMgr::diagnose_tenant_tablet()
|
||||
}
|
||||
}
|
||||
|
||||
while (OB_SUCC(ret)) { // loop all log_stream
|
||||
while (OB_SUCC(ret) && can_add_diagnose_info()) { // loop all log_stream
|
||||
bool need_merge = false;
|
||||
if (OB_FAIL(ls_iter_guard.get_ptr()->get_next(ls))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
@ -470,7 +470,7 @@ int ObCompactionDiagnoseMgr::diagnose_tenant_tablet()
|
||||
LOG_WARN("failed to build ls tablet iter", K(ret), K(ls));
|
||||
} else {
|
||||
ObTabletHandle tablet_handle;
|
||||
while (OB_SUCC(ret)) { // loop all tablets in ls
|
||||
while (OB_SUCC(ret) && can_add_diagnose_info()) { // loop all tablets in ls
|
||||
if (OB_FAIL(tablet_iter.get_next_tablet(tablet_handle))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
@ -689,16 +689,15 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_medium_merge(
|
||||
const storage::ObMergeType merge_type = MEDIUM_MERGE;
|
||||
const ObTabletID &tablet_id = tablet.get_tablet_meta().tablet_id_;
|
||||
const int64_t max_serialized_medium_scn = tablet.get_tablet_meta().max_serialized_medium_scn_;
|
||||
const ObMediumCompactionInfoList &medium_list = tablet.get_medium_compaction_info_list();
|
||||
ObITable *table = tablet.get_table_store().get_major_sstables().get_boundary_table(true/*last*/);
|
||||
const int64_t wait_check_medium_scn = medium_list.get_wait_check_medium_scn();
|
||||
ObITable *last_major = tablet.get_table_store().get_major_sstables().get_boundary_table(true/*last*/);
|
||||
int64_t max_sync_medium_scn = 0;
|
||||
|
||||
if (OB_FAIL(tablet.get_max_sync_medium_scn(max_sync_medium_scn))){
|
||||
if (OB_ISNULL(last_major)) {
|
||||
} else if (OB_FAIL(tablet.get_max_sync_medium_scn(max_sync_medium_scn))){
|
||||
LOG_WARN("failed to get max sync medium scn", K(ret), K(ls_id), K(tablet_id));
|
||||
} else if (max_sync_medium_scn > max_serialized_medium_scn) {
|
||||
// wait memtable dump
|
||||
if (ObTimeUtility::fast_current_time() > max_sync_medium_scn + WAIT_MEDIUM_SCHEDULE_INTERVAL * 2
|
||||
} else if (max_sync_medium_scn > last_major->get_snapshot_version()) {
|
||||
if (tablet.get_snapshot_version() < max_sync_medium_scn) { // wait mini compaction or tablet freeze
|
||||
if (ObTimeUtility::fast_current_time() > max_sync_medium_scn + WAIT_MEDIUM_SCHEDULE_INTERVAL * 2
|
||||
&& can_add_diagnose_info()
|
||||
&& OB_FAIL(SET_DIAGNOSE_INFO(
|
||||
info_array_[idx_++],
|
||||
@ -709,20 +708,20 @@ int ObCompactionDiagnoseMgr::diagnose_tablet_medium_merge(
|
||||
ObCompactionDiagnoseInfo::DIA_STATUS_NOT_SCHEDULE,
|
||||
ObTimeUtility::fast_current_time(),
|
||||
"max_receive_medium_scn", max_sync_medium_scn,
|
||||
"max_serialized_medium_scn", max_serialized_medium_scn))) {
|
||||
LOG_WARN("failed to add diagnose info", K(ret), K(ls_id), K(tablet_id));
|
||||
}
|
||||
} else if (0 == wait_check_medium_scn) {
|
||||
// do nothing
|
||||
} else if (OB_NOT_NULL(table) && table->get_snapshot_version() < wait_check_medium_scn) {
|
||||
ObTabletMajorMergeDag dag;
|
||||
if (OB_FAIL(diagnose_tablet_merge(
|
||||
dag,
|
||||
merge_type,
|
||||
ls_id,
|
||||
tablet_id,
|
||||
wait_check_medium_scn))) {
|
||||
LOG_WARN("diagnose failed", K(ret), K(ls_id), K(tablet_id), KPC(table));
|
||||
"max_serialized_medium_scn", max_serialized_medium_scn,
|
||||
"tablet_snapshot", tablet.get_snapshot_version()))) {
|
||||
LOG_WARN("failed to add diagnose info", K(ret), K(ls_id), K(tablet_id));
|
||||
}
|
||||
} else {
|
||||
ObTabletMajorMergeDag dag;
|
||||
if (OB_FAIL(diagnose_tablet_merge(
|
||||
dag,
|
||||
merge_type,
|
||||
ls_id,
|
||||
tablet_id,
|
||||
max_sync_medium_scn))) {
|
||||
LOG_WARN("diagnose failed", K(ret), K(ls_id), K(tablet_id), KPC(last_major));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -944,7 +943,7 @@ int ObCompactionDiagnoseMgr::diagnose_medium_scn_table(const int64_t compaction_
|
||||
MTL_ID(),
|
||||
ObLSID(INT64_MAX),
|
||||
ObTabletID(INT64_MAX),
|
||||
ObCompactionDiagnoseInfo::DIA_STATUS_RUNNING,
|
||||
ObCompactionDiagnoseInfo::DIA_STATUS_FAILED,
|
||||
ObTimeUtility::fast_current_time(),
|
||||
"error_tablet_cnt", error_tablet_cnt))) {
|
||||
LOG_WARN("failed to add diagnose info", K(ret));
|
||||
|
@ -323,10 +323,10 @@ int ObMediumCompactionScheduleFunc::decide_medium_snapshot(
|
||||
if (OB_NO_NEED_MERGE != ret) {
|
||||
LOG_WARN("failed to choose medium snapshot", K(ret), KPC(this));
|
||||
}
|
||||
} else if (is_major) {
|
||||
// do nothing
|
||||
} else if (medium_info.medium_snapshot_ <= max_sync_medium_scn) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
} else if (is_major) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(get_max_reserved_snapshot(max_reserved_snapshot))) {
|
||||
LOG_WARN("failed to get multi_version_start", K(ret), KPC(this));
|
||||
} else if (medium_info.medium_snapshot_ < max_reserved_snapshot) {
|
||||
|
@ -236,7 +236,9 @@ int ObTabletMediumCompactionInfoRecorder::prepare_struct_in_lock(
|
||||
clog_len = 0;
|
||||
const logservice::ObLogBaseHeader log_header(
|
||||
logservice::ObLogBaseType::MEDIUM_COMPACTION_LOG_BASE_TYPE,
|
||||
logservice::ObReplayBarrierType::PRE_BARRIER);
|
||||
logservice::ObReplayBarrierType::NO_NEED_BARRIER,
|
||||
tablet_id_.id());
|
||||
// record tablet_id as trans_id to make medium clogs of same tablet replay serially
|
||||
|
||||
int64_t pos = 0;
|
||||
char *buf = nullptr;
|
||||
|
@ -195,7 +195,7 @@ private:
|
||||
class MediumLoopTask : public common::ObTimerTask
|
||||
{
|
||||
public:
|
||||
MediumLoopTask() = default;
|
||||
MediumLoopTask() { disable_timeout_check(); }
|
||||
virtual ~MediumLoopTask() = default;
|
||||
virtual void runTimerTask() override;
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user