optimize lock in FreezeInfoMgr
This commit is contained in:
@ -151,44 +151,68 @@ int ObFreezeInfoManager::init(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// reload will acquire latest freeze info from __all_freeze_info.
|
// reload will acquire latest freeze info from __all_freeze_info.
|
||||||
int ObFreezeInfoManager::reload(
|
int ObFreezeInfoManager::reload(const share::SCN &min_frozen_scn)
|
||||||
const share::SCN &min_frozen_scn,
|
|
||||||
const bool reset_on_fail)
|
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
ObSEArray<ObFreezeInfo, 8> freeze_infos;
|
||||||
share::SCN latest_snapshot_gc_scn;
|
share::SCN latest_snapshot_gc_scn;
|
||||||
ObSEArray<ObFreezeInfo, 4> freeze_infos;
|
|
||||||
ObFreezeInfoProxy freeze_info_proxy(tenant_id_);
|
if (OB_FAIL(fetch_new_freeze_info(tenant_id_, min_frozen_scn, *sql_proxy_, freeze_infos, latest_snapshot_gc_scn))) {
|
||||||
|
LOG_WARN("failed to load updated info", K(ret));
|
||||||
|
} else if (OB_FAIL(update_freeze_info(freeze_infos, latest_snapshot_gc_scn))) {
|
||||||
|
LOG_WARN("failed to update freeze info", K(ret));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
reset_freeze_info();
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObFreezeInfoManager::fetch_new_freeze_info(
|
||||||
|
const int64_t tenant_id,
|
||||||
|
const share::SCN &min_frozen_scn,
|
||||||
|
common::ObMySQLProxy &sql_proxy,
|
||||||
|
common::ObIArray<ObFreezeInfo> &freeze_infos,
|
||||||
|
share::SCN &latest_snapshot_gc_scn)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObFreezeInfoProxy freeze_info_proxy(tenant_id);
|
||||||
|
|
||||||
// 1. get snapshot_gc_scn
|
// 1. get snapshot_gc_scn
|
||||||
if (OB_FAIL(ObGlobalStatProxy::get_snapshot_gc_scn(
|
if (OB_FAIL(ObGlobalStatProxy::get_snapshot_gc_scn(
|
||||||
*sql_proxy_, tenant_id_, latest_snapshot_gc_scn))) {
|
sql_proxy, tenant_id, latest_snapshot_gc_scn))) {
|
||||||
LOG_WARN("fail to select for update snapshot_gc_scn", KR(ret), K_(tenant_id));
|
LOG_WARN("fail to select for update snapshot_gc_scn", KR(ret), K(tenant_id));
|
||||||
// 2. acquire freeze info in same trans, ensure we can get the latest freeze info
|
// 2. acquire freeze info in same trans, ensure we can get the latest freeze info
|
||||||
} else if (OB_FAIL(freeze_info_proxy.get_freeze_info_larger_or_equal_than(
|
} else if (OB_FAIL(freeze_info_proxy.get_freeze_info_larger_or_equal_than(
|
||||||
*sql_proxy_, min_frozen_scn, freeze_infos))) {
|
sql_proxy, min_frozen_scn, freeze_infos))) {
|
||||||
LOG_WARN("fail to get freeze info", KR(ret), K(min_frozen_scn));
|
LOG_WARN("fail to get freeze info", KR(ret), K(min_frozen_scn));
|
||||||
} else if (OB_UNLIKELY(freeze_infos.empty())) {
|
} else if (OB_UNLIKELY(freeze_infos.empty())) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("get invalid frozen status", KR(ret), K(min_frozen_scn));
|
LOG_WARN("get invalid frozen status", KR(ret), K(min_frozen_scn));
|
||||||
} else if (freeze_infos.count() > 1) {
|
}
|
||||||
std::sort(freeze_infos.begin(), freeze_infos.end(),
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObFreezeInfoManager::update_freeze_info(
|
||||||
|
const common::ObIArray<ObFreezeInfo> &freeze_infos,
|
||||||
|
const share::SCN &latest_snapshot_gc_scn)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_FAIL(freeze_info_.frozen_statuses_.prepare_allocate(freeze_infos.count()))) {
|
||||||
|
LOG_WARN("failed to prepare allocate mem for new freeze info", KR(ret), K(freeze_infos), K(freeze_info_));
|
||||||
|
} else if (OB_FAIL(freeze_info_.frozen_statuses_.assign(freeze_infos))) {
|
||||||
|
LOG_WARN("fail to assign", KR(ret), K(freeze_infos));
|
||||||
|
} else if (freeze_info_.frozen_statuses_.count() > 1) {
|
||||||
|
std::sort(freeze_info_.frozen_statuses_.begin(), freeze_info_.frozen_statuses_.end(),
|
||||||
[](const ObFreezeInfo &a, const ObFreezeInfo &b)
|
[](const ObFreezeInfo &a, const ObFreezeInfo &b)
|
||||||
{ return a.frozen_scn_ < b.frozen_scn_; } );
|
{ return a.frozen_scn_ < b.frozen_scn_; } );
|
||||||
}
|
}
|
||||||
|
|
||||||
if (FAILEDx(freeze_info_.frozen_statuses_.prepare_allocate(freeze_infos.count()))) {
|
if (OB_SUCC(ret)) {
|
||||||
LOG_WARN("failed to prepare allocate mem for new freeze info", KR(ret), K(freeze_infos), K(freeze_info_));
|
|
||||||
} else if (OB_FAIL(freeze_info_.frozen_statuses_.assign(freeze_infos))) {
|
|
||||||
LOG_WARN("fail to assign", KR(ret), K(freeze_infos));
|
|
||||||
} else {
|
|
||||||
freeze_info_.latest_snapshot_gc_scn_ = latest_snapshot_gc_scn;
|
freeze_info_.latest_snapshot_gc_scn_ = latest_snapshot_gc_scn;
|
||||||
LOG_INFO("inner load succ", K(freeze_info_));
|
LOG_INFO("inner load succ", K(freeze_info_));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OB_FAIL(ret) && reset_on_fail) {
|
|
||||||
reset_freeze_info();
|
|
||||||
}
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -80,6 +80,13 @@ public:
|
|||||||
class ObFreezeInfoManager
|
class ObFreezeInfoManager
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
static int fetch_new_freeze_info(
|
||||||
|
const int64_t tenant_id,
|
||||||
|
const share::SCN &min_frozen_scn,
|
||||||
|
common::ObMySQLProxy &sql_proxy,
|
||||||
|
common::ObIArray<ObFreezeInfo> &freeze_infos,
|
||||||
|
share::SCN &latest_snapshot_gc_scn);
|
||||||
|
|
||||||
ObFreezeInfoManager()
|
ObFreezeInfoManager()
|
||||||
: is_inited_(false),
|
: is_inited_(false),
|
||||||
tenant_id_(common::OB_INVALID_ID),
|
tenant_id_(common::OB_INVALID_ID),
|
||||||
@ -92,7 +99,10 @@ public:
|
|||||||
share::SCN get_snapshot_gc_scn() const { return freeze_info_.latest_snapshot_gc_scn_; }
|
share::SCN get_snapshot_gc_scn() const { return freeze_info_.latest_snapshot_gc_scn_; }
|
||||||
void reset_freeze_info() { freeze_info_.set_invalid(); }
|
void reset_freeze_info() { freeze_info_.set_invalid(); }
|
||||||
int init(uint64_t tenant_id, common::ObMySQLProxy &proxy);
|
int init(uint64_t tenant_id, common::ObMySQLProxy &proxy);
|
||||||
int reload(const share::SCN &min_frozen_scn, const bool reset_on_fail = true);
|
int reload(const share::SCN &min_frozen_scn);
|
||||||
|
int update_freeze_info(
|
||||||
|
const common::ObIArray<ObFreezeInfo> &freeze_infos,
|
||||||
|
const share::SCN &latest_snapshot_gc_scn);
|
||||||
int add_freeze_info(const share::ObFreezeInfo &frozen_status);
|
int add_freeze_info(const share::ObFreezeInfo &frozen_status);
|
||||||
int update_snapshot_gc_scn(const share::SCN &new_snapshot_gc_scn);
|
int update_snapshot_gc_scn(const share::SCN &new_snapshot_gc_scn);
|
||||||
|
|
||||||
|
|||||||
@ -472,7 +472,7 @@ int ObPartitionMergePolicy::get_boundary_snapshot_version(
|
|||||||
min_snapshot = freeze_info.prev.frozen_scn_.get_val_for_tx();
|
min_snapshot = freeze_info.prev.frozen_scn_.get_val_for_tx();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (INT64_MAX == freeze_info.next.frozen_scn_.is_max() && is_multi_version_merge) {
|
if (freeze_info.next.frozen_scn_.is_max() && is_multi_version_merge) {
|
||||||
max_snapshot = MTL(ObTenantFreezeInfoMgr*)->get_snapshot_gc_ts();
|
max_snapshot = MTL(ObTenantFreezeInfoMgr*)->get_snapshot_gc_ts();
|
||||||
} else {
|
} else {
|
||||||
max_snapshot = freeze_info.next.frozen_scn_.get_val_for_tx();
|
max_snapshot = freeze_info.next.frozen_scn_.get_val_for_tx();
|
||||||
|
|||||||
@ -588,23 +588,45 @@ int ObTenantFreezeInfoMgr::try_update_info()
|
|||||||
|
|
||||||
DEBUG_SYNC(BEFORE_UPDATE_FREEZE_SNAPSHOT_INFO);
|
DEBUG_SYNC(BEFORE_UPDATE_FREEZE_SNAPSHOT_INFO);
|
||||||
ObSEArray<ObSnapshotInfo, 4> snapshots;
|
ObSEArray<ObSnapshotInfo, 4> snapshots;
|
||||||
share::ObFreezeInfo latest_freeze_info;
|
ObSEArray<ObFreezeInfo, 4> freeze_infos;
|
||||||
bool gc_snapshot_ts_changed = false;
|
share::SCN new_snapshot_gc_scn;
|
||||||
share::ObSnapshotTableProxy snapshot_proxy;
|
share::ObSnapshotTableProxy snapshot_proxy;
|
||||||
|
|
||||||
WLockGuard lock_guard(lock_);
|
if (OB_FAIL(ObFreezeInfoManager::fetch_new_freeze_info(
|
||||||
const int64_t old_snapshot_gc_ts = freeze_info_mgr_.get_snapshot_gc_scn().get_val_for_tx();
|
MTL_ID(), share::SCN::base_scn(), *GCTX.sql_proxy_, freeze_infos, new_snapshot_gc_scn))) {
|
||||||
int64_t snapshot_gc_ts = old_snapshot_gc_ts;
|
STORAGE_LOG(WARN, "failed to load updated info", K(ret));
|
||||||
if (OB_FAIL(freeze_info_mgr_.reload(share::SCN::base_scn(), false/*reset_on_fail*/))) {
|
|
||||||
STORAGE_LOG(WARN, "failed to reload freeze info mgr", K(ret));
|
|
||||||
} else if (OB_FAIL(snapshot_proxy.get_all_snapshots(*GCTX.sql_proxy_, MTL_ID(), snapshots))) {
|
} else if (OB_FAIL(snapshot_proxy.get_all_snapshots(*GCTX.sql_proxy_, MTL_ID(), snapshots))) {
|
||||||
STORAGE_LOG(WARN, "failed to get snapshots", K(ret));
|
STORAGE_LOG(WARN, "failed to get snapshots", K(ret));
|
||||||
} else if (OB_FAIL(update_next_snapshots(snapshots))) {
|
} else if (OB_FAIL(inner_update_info(new_snapshot_gc_scn, freeze_infos, snapshots))) {
|
||||||
|
STORAGE_LOG(WARN, "failed to update info", K(ret), K(freeze_infos), K(new_snapshot_gc_scn), K(snapshots));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObTenantFreezeInfoMgr::inner_update_info(
|
||||||
|
const share::SCN &new_snapshot_gc_scn,
|
||||||
|
const common::ObIArray<share::ObFreezeInfo> &new_freeze_infos,
|
||||||
|
const common::ObIArray<share::ObSnapshotInfo> &new_snapshots)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
bool gc_snapshot_ts_changed = false;
|
||||||
|
int64_t snapshot_gc_ts = 0;
|
||||||
|
{
|
||||||
|
WLockGuard lock_guard(lock_);
|
||||||
|
const int64_t old_snapshot_gc_ts = freeze_info_mgr_.get_snapshot_gc_scn().get_val_for_tx();
|
||||||
|
snapshot_gc_ts = old_snapshot_gc_ts;
|
||||||
|
if (OB_FAIL(freeze_info_mgr_.update_freeze_info(new_freeze_infos, new_snapshot_gc_scn))) {
|
||||||
|
STORAGE_LOG(WARN, "failed to reload freeze info mgr", K(ret));
|
||||||
|
} else if (OB_FAIL(update_next_snapshots(new_snapshots))) {
|
||||||
STORAGE_LOG(WARN, "fail to update next snapshots", K(ret));
|
STORAGE_LOG(WARN, "fail to update next snapshots", K(ret));
|
||||||
} else if (FALSE_IT(snapshot_gc_ts = freeze_info_mgr_.get_snapshot_gc_scn().get_val_for_tx())) {
|
|
||||||
} else if (FALSE_IT(gc_snapshot_ts_changed = (old_snapshot_gc_ts != snapshot_gc_ts))) {
|
|
||||||
} else {
|
} else {
|
||||||
if (gc_snapshot_ts_changed) {
|
snapshot_gc_ts = freeze_info_mgr_.get_snapshot_gc_scn().get_val_for_tx();
|
||||||
|
gc_snapshot_ts_changed = old_snapshot_gc_ts != snapshot_gc_ts;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
} else if (gc_snapshot_ts_changed) {
|
||||||
last_change_ts_ = ObTimeUtility::current_time();
|
last_change_ts_ = ObTimeUtility::current_time();
|
||||||
} else {
|
} else {
|
||||||
const int64_t last_not_change_interval_us = ObTimeUtility::current_time() - last_change_ts_;
|
const int64_t last_not_change_interval_us = ObTimeUtility::current_time() - last_change_ts_;
|
||||||
@ -612,21 +634,21 @@ int ObTenantFreezeInfoMgr::try_update_info()
|
|||||||
(0 != snapshot_gc_ts && 1 != snapshot_gc_ts)) {
|
(0 != snapshot_gc_ts && 1 != snapshot_gc_ts)) {
|
||||||
if (REACH_TENANT_TIME_INTERVAL(60L * 1000L * 1000L)) {
|
if (REACH_TENANT_TIME_INTERVAL(60L * 1000L * 1000L)) {
|
||||||
STORAGE_LOG(WARN, "snapshot_gc_ts not refresh too long",
|
STORAGE_LOG(WARN, "snapshot_gc_ts not refresh too long",
|
||||||
K(snapshot_gc_ts), K(snapshots), K(last_change_ts_),
|
K(snapshot_gc_ts), K(new_snapshots), K(last_change_ts_),
|
||||||
K(last_not_change_interval_us));
|
K(last_not_change_interval_us));
|
||||||
}
|
}
|
||||||
} else if (FLUSH_GC_SNAPSHOT_TS_REFRESH_TS <= last_not_change_interval_us) {
|
} else if (FLUSH_GC_SNAPSHOT_TS_REFRESH_TS <= last_not_change_interval_us) {
|
||||||
STORAGE_LOG(WARN, "snapshot_gc_ts not refresh too long",
|
STORAGE_LOG(WARN, "snapshot_gc_ts not refresh too long",
|
||||||
K(snapshot_gc_ts), K(snapshots), K(last_change_ts_),
|
K(snapshot_gc_ts), K(new_snapshots), K(last_change_ts_),
|
||||||
K(last_not_change_interval_us));
|
K(last_not_change_interval_us));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
STORAGE_LOG(DEBUG, "reload freeze info and snapshots", K(snapshot_gc_ts), K(snapshots));
|
STORAGE_LOG(DEBUG, "reload freeze info and snapshots", K(snapshot_gc_ts), K(new_snapshots));
|
||||||
}
|
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
if (REACH_TENANT_TIME_INTERVAL(20 * 1000 * 1000 /*20s*/)) {
|
if (REACH_TENANT_TIME_INTERVAL(20 * 1000 * 1000 /*20s*/)) {
|
||||||
STORAGE_LOG(INFO, "ObTenantFreezeInfoMgr success to update infos", K(snapshots), K(freeze_info_mgr_));
|
STORAGE_LOG(INFO, "ObTenantFreezeInfoMgr success to update infos",
|
||||||
|
K(new_snapshot_gc_scn), K(new_freeze_infos), K(new_snapshots), K(freeze_info_mgr_));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|||||||
@ -150,6 +150,10 @@ private:
|
|||||||
share::ObFreezeInfo &freeze_info);
|
share::ObFreezeInfo &freeze_info);
|
||||||
int try_update_reserved_snapshot();
|
int try_update_reserved_snapshot();
|
||||||
int try_update_info();
|
int try_update_info();
|
||||||
|
int inner_update_info(
|
||||||
|
const share::SCN &new_snapshot_gc_scn,
|
||||||
|
const common::ObIArray<share::ObFreezeInfo> &new_freeze_infos,
|
||||||
|
const common::ObIArray<share::ObSnapshotInfo> &new_snapshots);
|
||||||
|
|
||||||
class ReloadTask : public common::ObTimerTask
|
class ReloadTask : public common::ObTimerTask
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user