remove zone_merge_info in ObTenantFreezeInfoMgr
This commit is contained in:
@ -73,6 +73,7 @@ int ObMajorMergeInfoDetector::start()
|
||||
return ret;
|
||||
}
|
||||
|
||||
ERRSIM_POINT_DEF(SKIP_REFRESH_ZONE_INFO)
|
||||
void ObMajorMergeInfoDetector::run3()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -89,6 +90,7 @@ void ObMajorMergeInfoDetector::run3()
|
||||
LOG_TRACE("run freeze info detector", K_(tenant_id));
|
||||
|
||||
bool can_work = false;
|
||||
bool skip_refresh_zone_info = false;
|
||||
int64_t proposal_id = 0;
|
||||
ObRole role = ObRole::INVALID_ROLE;
|
||||
|
||||
@ -142,7 +144,14 @@ void ObMajorMergeInfoDetector::run3()
|
||||
}
|
||||
|
||||
ret = OB_SUCCESS;
|
||||
if (OB_FAIL(try_update_zone_info(proposal_id))) {
|
||||
#ifdef ERRSIM
|
||||
if (OB_UNLIKELY(SKIP_REFRESH_ZONE_INFO)) {
|
||||
skip_refresh_zone_info = true;
|
||||
LOG_INFO("ERRSIM SKIP_REFRESH_ZONE_INFO", K(ret));
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
#endif
|
||||
if (OB_FAIL(!skip_refresh_zone_info && try_update_zone_info(proposal_id))) {
|
||||
LOG_WARN("fail to try update zone info", KR(ret), K_(tenant_id), K(proposal_id));
|
||||
}
|
||||
}
|
||||
|
||||
@ -14,7 +14,6 @@
|
||||
#include "ob_tenant_freeze_info_mgr.h"
|
||||
#include "share/ob_zone_merge_info.h"
|
||||
#include "share/ob_global_merge_table_operator.h"
|
||||
#include "share/ob_zone_merge_table_operator.h"
|
||||
#include "storage/compaction/ob_compaction_schedule_util.h"
|
||||
#include "storage/concurrency_control/ob_multi_version_garbage_collector.h"
|
||||
#include "storage/tx_storage/ob_ls_service.h"
|
||||
@ -84,7 +83,6 @@ ObTenantFreezeInfoMgr::ObTenantFreezeInfoMgr()
|
||||
lock_(),
|
||||
cur_idx_(0),
|
||||
last_change_ts_(0),
|
||||
global_broadcast_scn_(),
|
||||
tenant_id_(OB_INVALID_ID),
|
||||
tg_id_(-1),
|
||||
inited_(false)
|
||||
@ -130,7 +128,6 @@ int ObTenantFreezeInfoMgr::init(const uint64_t tenant_id, ObISQLClient &sql_prox
|
||||
} else {
|
||||
tenant_id_ = tenant_id;
|
||||
last_change_ts_ = ObTimeUtility::current_time();
|
||||
global_broadcast_scn_ = share::SCN::min_scn();
|
||||
inited_ = true;
|
||||
}
|
||||
return ret;
|
||||
@ -582,24 +579,21 @@ int ObTenantFreezeInfoMgr::ReloadTask::refresh_merge_info()
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
const uint64_t tenant_id = MTL_ID();
|
||||
ObZoneMergeInfo zone_merge_info;
|
||||
zone_merge_info.tenant_id_ = tenant_id;
|
||||
zone_merge_info.zone_ = GCTX.config_->zone.str();
|
||||
|
||||
// remove zone_merge_info // only use global_merge_info
|
||||
ObGlobalMergeInfo global_merge_info;
|
||||
global_merge_info.tenant_id_ = tenant_id;
|
||||
int64_t cur_broadcast_version = 0;
|
||||
int64_t global_broadcast_version = 0;
|
||||
|
||||
if (OB_FAIL(ObGlobalMergeTableOperator::load_global_merge_info(*GCTX.sql_proxy_, tenant_id, global_merge_info))) {
|
||||
LOG_WARN("failed to load global merge info", KR(ret), K(global_merge_info));
|
||||
} else if (OB_FAIL(ObZoneMergeTableOperator::load_zone_merge_info(*GCTX.sql_proxy_, tenant_id, zone_merge_info))) {
|
||||
LOG_WARN("fail to load zone merge info", KR(ret), K(zone_merge_info));
|
||||
} else {
|
||||
// set merged version
|
||||
MERGE_SCHEDULER_PTR->set_inner_table_merged_scn(global_merge_info.last_merged_scn_.get_scn().get_val_for_tx());
|
||||
mgr_.set_global_broadcast_scn(global_merge_info.global_broadcast_scn_.get_scn());
|
||||
if (global_merge_info.suspend_merging_.get_value()) { // suspend_merge
|
||||
MERGE_SCHEDULER_PTR->stop_major_merge();
|
||||
LOG_INFO("schedule zone to stop major merge", K(tenant_id), K(zone_merge_info), K(global_merge_info));
|
||||
LOG_INFO("schedule zone to stop major merge", K(tenant_id), K(global_merge_info));
|
||||
} else {
|
||||
if (check_tenant_status_) {
|
||||
if (is_sys_tenant(tenant_id) || is_meta_tenant(tenant_id)) {
|
||||
@ -619,14 +613,14 @@ int ObTenantFreezeInfoMgr::ReloadTask::refresh_merge_info()
|
||||
}
|
||||
if (!check_tenant_status_) {
|
||||
MERGE_SCHEDULER_PTR->resume_major_merge();
|
||||
const int64_t cur_broadcast_version = MERGE_SCHEDULER_PTR->get_frozen_version();
|
||||
if (zone_merge_info.broadcast_scn_.get_scn().get_val_for_tx() > cur_broadcast_version) {
|
||||
FLOG_INFO("try to schedule merge", K(tenant_id), "zone", zone_merge_info.zone_, "broadcast_scn",
|
||||
zone_merge_info.broadcast_scn_, K(cur_broadcast_version));
|
||||
if (OB_FAIL(MERGE_SCHEDULER_PTR->schedule_merge(zone_merge_info.broadcast_scn_.get_scn().get_val_for_tx()))) {
|
||||
LOG_WARN("fail to schedule merge", K(ret), K(zone_merge_info));
|
||||
} else if (OB_FAIL(MTL(ObTenantFreezer*)->update_frozen_scn(zone_merge_info.broadcast_scn_.get_scn().get_val_for_tx()))) {
|
||||
LOG_WARN("update frozen scn failed", K(ret), K(zone_merge_info.broadcast_scn_.get_scn()));
|
||||
cur_broadcast_version = MERGE_SCHEDULER_PTR->get_frozen_version();
|
||||
global_broadcast_version = global_merge_info.global_broadcast_scn_.get_scn().get_val_for_tx();
|
||||
if (global_broadcast_version > cur_broadcast_version) {
|
||||
FLOG_INFO("try to schedule merge", K(tenant_id), "zone", GCTX.config_->zone.str(), K(global_broadcast_version), K(cur_broadcast_version));
|
||||
if (OB_FAIL(MERGE_SCHEDULER_PTR->schedule_merge(global_broadcast_version))) {
|
||||
LOG_WARN("fail to schedule merge", K(ret), "zone", GCTX.config_->zone.str(), K(global_broadcast_version));
|
||||
} else if (OB_FAIL(MTL(ObTenantFreezer*)->update_frozen_scn(global_broadcast_version))) {
|
||||
LOG_WARN("update frozen scn failed", K(ret), "zone", GCTX.config_->zone.str(), K(global_broadcast_version));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -634,8 +628,7 @@ int ObTenantFreezeInfoMgr::ReloadTask::refresh_merge_info()
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
LOG_TRACE("refresh merge info", K(tenant_id), "zone", zone_merge_info.zone_, "broadcast_scn",
|
||||
zone_merge_info.broadcast_scn_);
|
||||
LOG_TRACE("refresh merge info", K(tenant_id), "zone", GCTX.config_->zone.str(), K(global_merge_info));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -115,7 +115,6 @@ public:
|
||||
int get_neighbour_major_freeze(const int64_t snapshot_version, NeighbourFreezeInfo &info);
|
||||
|
||||
int64_t get_min_reserved_snapshot_for_tx();
|
||||
void set_global_broadcast_scn(const share::SCN &global_broadcast_scn) { global_broadcast_scn_ = global_broadcast_scn; }
|
||||
int get_min_reserved_snapshot(
|
||||
const ObTabletID &tablet_id,
|
||||
const int64_t merged_version,
|
||||
@ -194,7 +193,6 @@ private:
|
||||
common::RWLock lock_;
|
||||
int64_t cur_idx_;
|
||||
int64_t last_change_ts_;
|
||||
share::SCN global_broadcast_scn_;
|
||||
uint64_t tenant_id_;
|
||||
int tg_id_;
|
||||
bool inited_;
|
||||
|
||||
Reference in New Issue
Block a user