fix major_freeze bug: improve last_merged_scn & all_merged_scn logic
This commit is contained in:
@ -572,50 +572,48 @@ int ObMajorMergeScheduler::update_merge_status(const int64_t expected_epoch)
|
|||||||
|
|
||||||
int64_t cur_all_merged_scn = 0;
|
int64_t cur_all_merged_scn = 0;
|
||||||
const uint64_t ori_all_merged_scn = info.all_merged_scn_.value_;
|
const uint64_t ori_all_merged_scn = info.all_merged_scn_.value_;
|
||||||
int64_t last_merged_scn = (merged ? info.broadcast_scn_ : info.last_merged_scn_);
|
int64_t cur_merged_scn = (merged ? info.broadcast_scn_ : info.last_merged_scn_);
|
||||||
|
|
||||||
if (progress->smallest_snapshot_version_ <= 0) {
|
if (progress->smallest_snapshot_version_ <= 0) {
|
||||||
cur_all_merged_scn = info.broadcast_scn_;
|
cur_all_merged_scn = info.broadcast_scn_;
|
||||||
} else {
|
} else {
|
||||||
cur_all_merged_scn = progress->smallest_snapshot_version_;
|
cur_all_merged_scn = progress->smallest_snapshot_version_;
|
||||||
}
|
}
|
||||||
|
LOG_INFO("check updating merge status", KR(ret), K_(tenant_id), K(zone), K(merged), K(cur_all_merged_scn),
|
||||||
|
K(cur_merged_scn), "smallest_snapshot_version", progress->smallest_snapshot_version_, K(info));
|
||||||
|
|
||||||
// cur_all_merged_scn >= last_merged_scn
|
if (OB_SUCC(ret) && merged) {
|
||||||
|
// cur_all_merged_scn >= cur_merged_scn
|
||||||
// 1. Equal: snapshot_version of all tablets change to frozen_scn after major compaction
|
// 1. Equal: snapshot_version of all tablets change to frozen_scn after major compaction
|
||||||
// 2. Greater: In backup-restore situation, tablets may have higher snapshot_version, which
|
// 2. Greater: In backup-restore situation, tablets may have higher snapshot_version, which
|
||||||
// is larger than current frozen_scn. https://work.aone.alibaba-inc.com/issue/45933591
|
// is larger than current frozen_scn. https://work.aone.alibaba-inc.com/issue/45933591
|
||||||
if ((cur_all_merged_scn < last_merged_scn) || (ori_all_merged_scn > cur_all_merged_scn)) {
|
//
|
||||||
|
// cur_all_merged_scn >= ori_all_merged_scn
|
||||||
|
// 1. Greater: all_merged_scn will increase like last_merged_scn after major compaction
|
||||||
|
// 2. Equal: In backup-restore situation, tablets may have higher snapshot_version(eg. version=10).
|
||||||
|
// If major_freeze with version=4, all_merged_scn will be updated to 10; if major_freeze with version=5,
|
||||||
|
// all_merged_scn will still be 10.
|
||||||
|
if ((cur_all_merged_scn < cur_merged_scn) || (cur_all_merged_scn < ori_all_merged_scn)
|
||||||
|
|| (cur_merged_scn < info.last_merged_scn_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_ERROR("unexpect merge scn", KR(ret), K(last_merged_scn), K(cur_all_merged_scn),
|
LOG_ERROR("unexpected merged scn", KR(ret), K(merged), K(cur_merged_scn), K(cur_all_merged_scn),
|
||||||
K(ori_all_merged_scn));
|
K(ori_all_merged_scn), K(info));
|
||||||
} else {
|
} else if (cur_merged_scn > info.last_merged_scn_) {
|
||||||
// TODO 'zone merge finish' & 'zone all tablet merge finish' should be handled in same procedure.
|
LOG_INFO("this zone merge finished", K(zone), K_(tenant_id), K(cur_all_merged_scn), K(ori_all_merged_scn),
|
||||||
if (info.last_merged_scn_ != last_merged_scn) {
|
K(cur_merged_scn), "last_merged_scn", info.last_merged_scn_);
|
||||||
LOG_INFO("this zone merge finished", K(zone), K_(tenant_id), K(cur_all_merged_scn),
|
// update last_merged_scn & all_merged_scn in all_zone_merge_info
|
||||||
K(ori_all_merged_scn), K(last_merged_scn));
|
if (OB_FAIL(zone_merge_mgr_->finish_zone_merge(zone, expected_epoch, cur_merged_scn, cur_all_merged_scn))) {
|
||||||
// update last_merged_scn in all_merge_info table
|
LOG_WARN("fail to finish zone merge", KR(ret), K(zone), K(expected_epoch), K(cur_merged_scn),
|
||||||
if (OB_FAIL(zone_merge_mgr_->finish_zone_merge(zone, expected_epoch, last_merged_scn,
|
K(cur_all_merged_scn), K(info));
|
||||||
ori_all_merged_scn))) {
|
|
||||||
LOG_WARN("fail to finish zone merge", KR(ret), K(zone), K(expected_epoch));
|
|
||||||
} else {
|
} else {
|
||||||
ROOTSERVICE_EVENT_ADD("daily_merge", "zone_merge_finish", K_(tenant_id),
|
ROOTSERVICE_EVENT_ADD("daily_merge", "zone_merge_finish", K_(tenant_id),
|
||||||
"last_merged_scn", info.last_merged_scn_, K(zone));
|
"last_merged_scn", cur_merged_scn,
|
||||||
|
"all_merged_scn", cur_all_merged_scn,
|
||||||
|
K(zone));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OB_SUCC(ret) && (ori_all_merged_scn != cur_all_merged_scn)) {
|
|
||||||
LOG_INFO("this zone all tablet merge finished", K(zone), K_(tenant_id), K(cur_all_merged_scn),
|
|
||||||
K(ori_all_merged_scn), K(last_merged_scn));
|
|
||||||
// update all_merged_scn in all_merge_info table
|
|
||||||
if (OB_FAIL(zone_merge_mgr_->finish_zone_merge(zone, expected_epoch, last_merged_scn,
|
|
||||||
cur_all_merged_scn))) {
|
|
||||||
LOG_WARN("fail to finish zone merge", KR(ret), K(zone), K(expected_epoch));
|
|
||||||
} else {
|
|
||||||
ROOTSERVICE_EVENT_ADD("daily_merge", "zone_all_tablet_merged", K_(tenant_id),
|
|
||||||
"all_merged_scn", cur_all_merged_scn, K(zone));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -361,8 +361,8 @@ int ObZoneMergeManagerBase::start_zone_merge(
|
|||||||
int ObZoneMergeManagerBase::finish_zone_merge(
|
int ObZoneMergeManagerBase::finish_zone_merge(
|
||||||
const ObZone &zone,
|
const ObZone &zone,
|
||||||
const int64_t expected_epoch,
|
const int64_t expected_epoch,
|
||||||
const int64_t last_merged_scn,
|
const int64_t new_last_merged_scn,
|
||||||
const int64_t all_merged_scn)
|
const int64_t new_all_merged_scn)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int64_t idx = OB_INVALID_INDEX;
|
int64_t idx = OB_INVALID_INDEX;
|
||||||
@ -372,15 +372,16 @@ int ObZoneMergeManagerBase::finish_zone_merge(
|
|||||||
|
|
||||||
if (OB_FAIL(check_valid(zone, idx))) {
|
if (OB_FAIL(check_valid(zone, idx))) {
|
||||||
LOG_WARN("fail to check valid", KR(ret), K(zone), K_(tenant_id));
|
LOG_WARN("fail to check valid", KR(ret), K(zone), K_(tenant_id));
|
||||||
} else if ((last_merged_scn <= 0) || (all_merged_scn <= 0)) {
|
} else if ((new_last_merged_scn <= 0) || (new_all_merged_scn <= 0)) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("invalid argument", KR(ret), K(zone), K_(tenant_id),
|
LOG_WARN("invalid argument", KR(ret), K(zone), K_(tenant_id),
|
||||||
K(last_merged_scn), K(all_merged_scn));
|
K(new_last_merged_scn), K(new_all_merged_scn));
|
||||||
} else if ((last_merged_scn != zone_merge_infos_[idx].broadcast_scn_)) {
|
} else if ((new_last_merged_scn != zone_merge_infos_[idx].broadcast_scn_)
|
||||||
|
|| (new_last_merged_scn <= zone_merge_infos_[idx].last_merged_scn_)) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_ERROR("invalid merged_scn", KR(ret), K(zone), K_(tenant_id),
|
LOG_ERROR("invalid merged_scn", KR(ret), K(zone), K_(tenant_id),
|
||||||
K(last_merged_scn), K(all_merged_scn), "zone broadcast_scn",
|
K(new_last_merged_scn), K(new_all_merged_scn),
|
||||||
zone_merge_infos_[idx].broadcast_scn_);
|
"zone_merge_info", zone_merge_infos_[idx]);
|
||||||
} else if (OB_FAIL(trans.start(proxy_, meta_tenant_id))) {
|
} else if (OB_FAIL(trans.start(proxy_, meta_tenant_id))) {
|
||||||
LOG_WARN("fail to start transaction", KR(ret), K_(tenant_id), K(meta_tenant_id));
|
LOG_WARN("fail to start transaction", KR(ret), K_(tenant_id), K(meta_tenant_id));
|
||||||
} else if (OB_FAIL(check_freeze_service_epoch(trans, expected_epoch))) {
|
} else if (OB_FAIL(check_freeze_service_epoch(trans, expected_epoch))) {
|
||||||
@ -392,17 +393,15 @@ int ObZoneMergeManagerBase::finish_zone_merge(
|
|||||||
} else {
|
} else {
|
||||||
ObZoneMergeInfo::MergeStatus status = static_cast<ObZoneMergeInfo::MergeStatus>(
|
ObZoneMergeInfo::MergeStatus status = static_cast<ObZoneMergeInfo::MergeStatus>(
|
||||||
zone_merge_infos_[idx].merge_status_.value_);
|
zone_merge_infos_[idx].merge_status_.value_);
|
||||||
if (last_merged_scn > zone_merge_infos_[idx].last_merged_scn_) {
|
|
||||||
const int64_t is_merging = 0;
|
const int64_t is_merging = 0;
|
||||||
tmp_info.is_merging_.set_val(is_merging, true);
|
tmp_info.is_merging_.set_val(is_merging, true);
|
||||||
tmp_info.last_merged_scn_.set_val(last_merged_scn, true);
|
tmp_info.last_merged_scn_.set_val(new_last_merged_scn, true);
|
||||||
tmp_info.last_merged_time_.set_val(cur_time, true);
|
tmp_info.last_merged_time_.set_val(cur_time, true);
|
||||||
status = ObZoneMergeInfo::MERGE_STATUS_IDLE;
|
status = ObZoneMergeInfo::MERGE_STATUS_IDLE;
|
||||||
tmp_info.merge_status_.set_val(status, true);
|
tmp_info.merge_status_.set_val(status, true);
|
||||||
}
|
|
||||||
|
|
||||||
if (all_merged_scn > zone_merge_infos_[idx].all_merged_scn_) {
|
if (new_all_merged_scn > zone_merge_infos_[idx].all_merged_scn_) {
|
||||||
tmp_info.all_merged_scn_.set_val(all_merged_scn, true);
|
tmp_info.all_merged_scn_.set_val(new_all_merged_scn, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OB_FAIL(ObZoneMergeTableOperator::update_partial_zone_merge_info(trans, tenant_id_, tmp_info))) {
|
if (OB_FAIL(ObZoneMergeTableOperator::update_partial_zone_merge_info(trans, tenant_id_, tmp_info))) {
|
||||||
@ -419,7 +418,7 @@ int ObZoneMergeManagerBase::finish_zone_merge(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_INFO("finish zone merge", KR(ret), K_(tenant_id), K(zone), K(last_merged_scn), K(all_merged_scn));
|
LOG_INFO("finish zone merge", KR(ret), K_(tenant_id), K(zone), K(new_last_merged_scn), K(new_all_merged_scn));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1217,8 +1216,8 @@ int ObZoneMergeManager::start_zone_merge(const ObZone &zone, const int64_t expec
|
|||||||
int ObZoneMergeManager::finish_zone_merge(
|
int ObZoneMergeManager::finish_zone_merge(
|
||||||
const ObZone &zone,
|
const ObZone &zone,
|
||||||
const int64_t expected_epoch,
|
const int64_t expected_epoch,
|
||||||
const int64_t last_merged_scn,
|
const int64_t new_last_merged_scn,
|
||||||
const int64_t all_merged_scn)
|
const int64_t new_all_merged_scn)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
SpinWLockGuard guard(write_lock_);
|
SpinWLockGuard guard(write_lock_);
|
||||||
@ -1228,7 +1227,7 @@ int ObZoneMergeManager::finish_zone_merge(
|
|||||||
ObZoneMergeMgrGuard shadow_guard(lock_,
|
ObZoneMergeMgrGuard shadow_guard(lock_,
|
||||||
*(static_cast<ObZoneMergeManagerBase *> (this)), shadow_, ret);
|
*(static_cast<ObZoneMergeManagerBase *> (this)), shadow_, ret);
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
ret = shadow_.finish_zone_merge(zone, expected_epoch, last_merged_scn, all_merged_scn);
|
ret = shadow_.finish_zone_merge(zone, expected_epoch, new_last_merged_scn, new_all_merged_scn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
@ -48,8 +48,8 @@ public:
|
|||||||
virtual int start_zone_merge(const common::ObZone &zone, const int64_t expected_epoch);
|
virtual int start_zone_merge(const common::ObZone &zone, const int64_t expected_epoch);
|
||||||
virtual int finish_zone_merge(const common::ObZone &zone,
|
virtual int finish_zone_merge(const common::ObZone &zone,
|
||||||
const int64_t expected_epoch,
|
const int64_t expected_epoch,
|
||||||
const int64_t last_merged_scn,
|
const int64_t new_last_merged_scn,
|
||||||
const int64_t all_merged_scn);
|
const int64_t new_all_merged_scn);
|
||||||
int suspend_merge(const int64_t expected_epoch);
|
int suspend_merge(const int64_t expected_epoch);
|
||||||
int resume_merge(const int64_t expected_epoch);
|
int resume_merge(const int64_t expected_epoch);
|
||||||
int set_merge_error(const int64_t merge_error, const int64_t expected_epoch);
|
int set_merge_error(const int64_t merge_error, const int64_t expected_epoch);
|
||||||
@ -119,8 +119,8 @@ public:
|
|||||||
virtual int start_zone_merge(const common::ObZone &zone, const int64_t expected_epoch);
|
virtual int start_zone_merge(const common::ObZone &zone, const int64_t expected_epoch);
|
||||||
virtual int finish_zone_merge(const common::ObZone &zone,
|
virtual int finish_zone_merge(const common::ObZone &zone,
|
||||||
const int64_t expected_epoch,
|
const int64_t expected_epoch,
|
||||||
const int64_t last_merged_scn,
|
const int64_t new_last_merged_scn,
|
||||||
const int64_t all_merged_scn);
|
const int64_t new_all_merged_scn);
|
||||||
virtual int suspend_merge(const int64_t expected_epoch);
|
virtual int suspend_merge(const int64_t expected_epoch);
|
||||||
virtual int resume_merge(const int64_t expected_epoch);
|
virtual int resume_merge(const int64_t expected_epoch);
|
||||||
virtual int set_merge_error(const int64_t merge_error, const int64_t expected_epoch);
|
virtual int set_merge_error(const int64_t merge_error, const int64_t expected_epoch);
|
||||||
|
|||||||
Reference in New Issue
Block a user