fix scn shortage about major_freeze
This commit is contained in:
@ -405,7 +405,6 @@ int ObFreezeInfoManager::generate_frozen_scn(
|
||||
}
|
||||
}
|
||||
|
||||
int64_t new_gts = 0;
|
||||
SCN tmp_frozen_scn;
|
||||
SCN local_max_frozen_scn;
|
||||
|
||||
@ -418,17 +417,15 @@ int ObFreezeInfoManager::generate_frozen_scn(
|
||||
} else if (max_frozen_status.frozen_scn_ != local_max_frozen_scn) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("max frozen_scn not same in cache and table", KR(ret), K(max_frozen_status), K(local_max_frozen_scn));
|
||||
} else if (OB_FAIL(get_gts(new_gts))) {
|
||||
} else if (OB_FAIL(get_gts(tmp_frozen_scn))) {
|
||||
LOG_WARN("fail to get gts", KR(ret));
|
||||
} else if (OB_FAIL(tmp_frozen_scn.convert_for_gts(new_gts))) {
|
||||
LOG_WARN("fail to convert for gts", KR(ret), K(new_gts));
|
||||
} else if ((tmp_frozen_scn <= snapshot_gc_scn)
|
||||
|| (tmp_frozen_scn <= local_max_frozen_scn)
|
||||
|| (tmp_frozen_scn <= snapshot_info.snapshot_scn_)) {
|
||||
// current time from gts must be greater than old ts
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("get invalid frozen_timestmap", KR(ret), K(snapshot_gc_scn),
|
||||
K(new_gts), K(tmp_frozen_scn), K(local_max_frozen_scn), K(snapshot_info));
|
||||
K(tmp_frozen_scn), K(local_max_frozen_scn), K(snapshot_info));
|
||||
} else {
|
||||
new_frozen_scn = tmp_frozen_scn;
|
||||
}
|
||||
@ -472,8 +469,9 @@ int ObFreezeInfoManager::renew_snapshot_gc_scn()
|
||||
|
||||
SCN cur_snapshot_gc_scn;
|
||||
SCN new_snapshot_gc_scn;
|
||||
uint64_t new_snapshot_gc_ts = 0;
|
||||
int64_t cur_gts = 0;
|
||||
SCN cur_gts_scn;
|
||||
const int64_t default_weak_ts = transaction::ObWeakReadUtil::default_max_stale_time_for_weak_consistency();
|
||||
const uint64_t WEAK_TS_NS = (uint64_t)(default_weak_ts * 1000L);
|
||||
int64_t affected_rows = 0;
|
||||
ObMySQLTransaction trans;
|
||||
ObRecursiveMutexGuard guard(lock_);
|
||||
@ -485,13 +483,9 @@ int ObFreezeInfoManager::renew_snapshot_gc_scn()
|
||||
} else if (OB_FAIL(ObGlobalStatProxy::select_snapshot_gc_scn_for_update(trans, tenant_id_,
|
||||
cur_snapshot_gc_scn))) {
|
||||
LOG_WARN("fail to select snapshot_gc_scn for update", KR(ret), K_(tenant_id));
|
||||
} else if (OB_FAIL(get_gts(cur_gts))) {
|
||||
} else if (OB_FAIL(get_gts(cur_gts_scn))) {
|
||||
LOG_WARN("fail to get_gts", KR(ret));
|
||||
} else if (FALSE_IT(new_snapshot_gc_ts =
|
||||
(cur_gts - transaction::ObWeakReadUtil::default_max_stale_time_for_weak_consistency()))) {
|
||||
LOG_WARN("fail to calc new snapshot_gc_scn", KR(ret), K(cur_gts));
|
||||
} else if (OB_FAIL(new_snapshot_gc_scn.convert_for_inner_table_field(new_snapshot_gc_ts))) {
|
||||
LOG_WARN("fail to convert val to SCN", KR(ret), K(new_snapshot_gc_ts));
|
||||
} else if (FALSE_IT(new_snapshot_gc_scn = SCN::minus(cur_gts_scn, WEAK_TS_NS))) {
|
||||
} else if ((new_snapshot_gc_scn <= freeze_info_.latest_snapshot_gc_scn_)
|
||||
|| (cur_snapshot_gc_scn >= new_snapshot_gc_scn)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
@ -524,19 +518,16 @@ int ObFreezeInfoManager::renew_snapshot_gc_scn()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObFreezeInfoManager::get_gts(int64_t &ts) const
|
||||
int ObFreezeInfoManager::get_gts(palf::SCN >s_scn) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_external_consistent = true;
|
||||
const int64_t timeout_us = 10 * 1000 * 1000;
|
||||
palf::SCN tmp_scn;
|
||||
if (OB_FAIL(OB_TS_MGR.get_ts_sync(tenant_id_, timeout_us, tmp_scn, is_external_consistent))) {
|
||||
if (OB_FAIL(OB_TS_MGR.get_ts_sync(tenant_id_, timeout_us, gts_scn, is_external_consistent))) {
|
||||
LOG_WARN("fail to get ts sync", K(ret), K_(tenant_id), K(timeout_us));
|
||||
} else if (!is_external_consistent) { // only suppport gts in 4.0
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_ERROR("cannot freeze without gts", KR(ret), K_(tenant_id));
|
||||
} else {
|
||||
ts = tmp_scn.get_val_for_lsn_allocator();
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -560,11 +551,10 @@ int ObFreezeInfoManager::try_gc_freeze_info()
|
||||
ObRecursiveMutexGuard guard(lock_);
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
const int64_t MAX_KEEP_INTERVAL_US = 30 * 24 * 60 * 60 * 1000L * 1000L; // 30 day
|
||||
const uint64_t MAX_KEEP_INTERVAL_NS = 30 * 24 * 60 * 60 * 1000L * 1000L * 1000L; // 30 day
|
||||
const int64_t MIN_REMAINED_VERSION_COUNT = 32;
|
||||
const int64_t now = ObTimeUtility::current_time();
|
||||
const uint64_t min_frozen_ts = (now - MAX_KEEP_INTERVAL_US) * 1000L; // frozen_scn is nano_second
|
||||
SCN min_frozen_scn;
|
||||
SCN cur_gts_scn;
|
||||
SCN cur_snapshot_gc_scn;
|
||||
|
||||
ObFreezeInfoProxy freeze_info_proxy(tenant_id_);
|
||||
@ -579,8 +569,9 @@ int ObFreezeInfoManager::try_gc_freeze_info()
|
||||
LOG_WARN("fail to select snapshot_gc_scn for update", KR(ret), K_(tenant_id));
|
||||
} else if (OB_FAIL(freeze_info_proxy.get_all_freeze_info(trans, all_frozen_status))) {
|
||||
LOG_WARN("fail to get all freeze info", KR(ret), K_(tenant_id));
|
||||
} else if (OB_FAIL(min_frozen_scn.convert_for_inner_table_field(min_frozen_ts))) {
|
||||
LOG_WARN("fail to convert val to SCN", KR(ret), K(min_frozen_ts));
|
||||
} else if (OB_FAIL(get_gts(cur_gts_scn))) {
|
||||
LOG_WARN("fail to get gts", KR(ret));
|
||||
} else if (FALSE_IT(min_frozen_scn = SCN::minus(cur_gts_scn, MAX_KEEP_INTERVAL_NS))) {
|
||||
} else {
|
||||
const int64_t frozen_status_cnt = all_frozen_status.count();
|
||||
if (frozen_status_cnt > MIN_REMAINED_VERSION_COUNT) {
|
||||
|
||||
@ -118,7 +118,7 @@ private:
|
||||
|
||||
int set_local_snapshot_gc_scn(const palf::SCN &new_scn);
|
||||
|
||||
int get_gts(int64_t &ts) const;
|
||||
int get_gts(palf::SCN >s_scn) const;
|
||||
int get_schema_version(const palf::SCN &frozen_scn, int64_t &schema_version) const;
|
||||
|
||||
int get_min_freeze_info(share::ObSimpleFrozenStatus &frozen_status);
|
||||
|
||||
@ -231,7 +231,7 @@ int ObMajorMergeProgressChecker::check_tablet_data_version(
|
||||
// logonly replica no need check
|
||||
} else {
|
||||
palf::SCN rep_snapshot_scn;
|
||||
if (OB_FAIL(rep_snapshot_scn.convert_for_inner_table_field((uint64_t)(r->get_snapshot_version())))) {
|
||||
if (OB_FAIL(rep_snapshot_scn.convert_for_tx(r->get_snapshot_version()))) {
|
||||
LOG_WARN("fail to convert val to SCN", KR(ret), "snapshot_version", r->get_snapshot_version());
|
||||
} else {
|
||||
if ((p->smallest_snapshot_scn_ <= palf::SCN::min_scn())
|
||||
|
||||
@ -669,7 +669,7 @@ int ObMajorMergeScheduler::try_update_global_merged_scn(const int64_t expected_e
|
||||
LOG_WARN("try update global last_merged_scn failed", KR(ret), K(expected_epoch));
|
||||
} else {
|
||||
ROOTSERVICE_EVENT_ADD("daily_merge", "global_merged", K_(tenant_id),
|
||||
"global_broadcast_scn", global_info.global_broadcast_scn_.get_scn_val());
|
||||
"global_broadcast_scn", global_info.global_broadcast_scn_);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -532,7 +532,7 @@ int ObSchemaHistoryRecycler::get_recycle_schema_version_by_global_stat(
|
||||
if (OB_FAIL(check_inner_stat())) {
|
||||
LOG_WARN("schema history recycler is stopped", KR(ret));
|
||||
} else {
|
||||
int64_t specific_merged_version = -1;
|
||||
palf::SCN spec_frozen_scn;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); i++) {
|
||||
const uint64_t tenant_id = tenant_ids.at(i);
|
||||
ObFreezeInfoProxy freeze_info_proxy(tenant_id);
|
||||
@ -551,13 +551,13 @@ int ObSchemaHistoryRecycler::get_recycle_schema_version_by_global_stat(
|
||||
} else if (frozen_status_arr.count() < reserved_num + 1) {
|
||||
ret = OB_EAGAIN;
|
||||
LOG_WARN("not exist enough frozen_scn to reserve", KR(ret), K(reserved_num), K(frozen_status_arr));
|
||||
} else if (FALSE_IT(specific_merged_version = frozen_status_arr.at(reserved_num).frozen_scn_.get_val_for_inner_table_field())) {
|
||||
} else if (FALSE_IT(spec_frozen_scn = frozen_status_arr.at(reserved_num).frozen_scn_)) {
|
||||
} else if (OB_FAIL(freeze_info_proxy.get_freeze_schema_info(*sql_proxy_, tenant_id,
|
||||
specific_merged_version, schema_version))) {
|
||||
LOG_WARN("fail to get specific_merged_version", KR(ret), K(tenant_id), K(specific_merged_version));
|
||||
spec_frozen_scn, schema_version))) {
|
||||
LOG_WARN("fail to get freeze schema info", KR(ret), K(tenant_id), K(spec_frozen_scn));
|
||||
} else if (!schema_version.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("schema version is invalid", KR(ret), K(tenant_id), K(specific_merged_version));
|
||||
LOG_WARN("schema version is invalid", KR(ret), K(tenant_id), K(spec_frozen_scn));
|
||||
} else {
|
||||
int64_t specific_schema_version = schema_version.schema_version_;
|
||||
if (OB_FAIL(fill_recycle_schema_versions(
|
||||
@ -566,7 +566,7 @@ int ObSchemaHistoryRecycler::get_recycle_schema_version_by_global_stat(
|
||||
KR(ret), K(tenant_id), K(specific_schema_version));
|
||||
}
|
||||
LOG_INFO("[SCHEMA_RECYCLE] get recycle schema version by major version",
|
||||
KR(ret), K(tenant_id), K(specific_merged_version), K(specific_schema_version));
|
||||
KR(ret), K(tenant_id), K(spec_frozen_scn), K(specific_schema_version));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -401,21 +401,21 @@ int ObFreezeInfoProxy::construct_frozen_status_(
|
||||
int ObFreezeInfoProxy::get_freeze_schema_info(
|
||||
ObISQLClient &sql_proxy,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t major_version,
|
||||
const palf::SCN &frozen_scn,
|
||||
TenantIdAndSchemaVersion &schema_version_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql;
|
||||
ObTimeoutCtx ctx;
|
||||
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id
|
||||
|| major_version <= 0)) {
|
||||
|| (!frozen_scn.is_valid()))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arg", KR(ret), K(tenant_id), K(major_version));
|
||||
LOG_WARN("invalid arg", KR(ret), K(tenant_id), K(frozen_scn));
|
||||
} else if (OB_FAIL(rootserver::ObRootUtils::get_rs_default_timeout_ctx(ctx))) {
|
||||
LOG_WARN("fail to get timeout ctx", KR(ret), K(tenant_id), K(ctx));
|
||||
} else if (OB_FAIL(sql.assign_fmt("SELECT * FROM %s WHERE frozen_scn = %ld",
|
||||
OB_ALL_FREEZE_INFO_TNAME, major_version))) {
|
||||
LOG_WARN("fail to assign fmt", KR(ret), K(tenant_id), K(major_version));
|
||||
OB_ALL_FREEZE_INFO_TNAME, frozen_scn.get_val_for_inner_table_field()))) {
|
||||
LOG_WARN("fail to assign fmt", KR(ret), K(tenant_id), K(frozen_scn));
|
||||
}
|
||||
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
@ -430,7 +430,7 @@ int ObFreezeInfoProxy::get_freeze_schema_info(
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
}
|
||||
LOG_WARN("fail to get result", KR(ret), K(tenant_id), K(major_version));
|
||||
LOG_WARN("fail to get result", KR(ret), K(tenant_id), K(frozen_scn));
|
||||
} else {
|
||||
schema_version_info.tenant_id_ = tenant_id;
|
||||
EXTRACT_INT_FIELD_MYSQL(*result, "schema_version", schema_version_info.schema_version_, int64_t);
|
||||
|
||||
@ -167,7 +167,7 @@ public:
|
||||
|
||||
int get_freeze_schema_info(common::ObISQLClient &sql_proxy,
|
||||
const uint64_t tenant_id,
|
||||
const int64_t major_version,
|
||||
const palf::SCN &frozen_scn,
|
||||
TenantIdAndSchemaVersion &schema_version_info);
|
||||
|
||||
private:
|
||||
|
||||
@ -229,12 +229,6 @@ ObZoneMergeInfo::MergeStatus ObZoneMergeInfo::get_merge_status(const char* merge
|
||||
return status;
|
||||
}
|
||||
|
||||
bool ObZoneMergeInfo::is_merged(const int64_t broadcast_version) const
|
||||
{
|
||||
return (broadcast_version == broadcast_scn_.get_scn_val())
|
||||
&& (broadcast_scn_.get_scn_val() == last_merged_scn_.get_scn_val());
|
||||
}
|
||||
|
||||
bool ObZoneMergeInfo::is_valid() const
|
||||
{
|
||||
bool is_valid = true;
|
||||
|
||||
@ -86,7 +86,6 @@ public:
|
||||
static const char *get_merge_status_str(const MergeStatus status);
|
||||
static MergeStatus get_merge_status(const char* merge_status_str);
|
||||
|
||||
bool is_merged(const int64_t broadcast_version) const;
|
||||
bool is_in_merge() const;
|
||||
bool need_merge(const int64_t broadcast_version) const;
|
||||
|
||||
|
||||
@ -1085,7 +1085,7 @@ int ObTenantFreezer::get_global_frozen_scn_(int64_t &frozen_scn)
|
||||
if (OB_FAIL(rootserver::ObMajorFreezeHelper::get_frozen_scn(tenant_id, tmp_frozen_scn))) {
|
||||
LOG_WARN("get_frozen_scn failed", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
frozen_scn = tmp_frozen_scn.get_val_for_inner_table_field();
|
||||
frozen_scn = tmp_frozen_scn.get_val_for_tx();
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
||||
@ -126,7 +126,7 @@ int ObTenantFreezerP::do_major_freeze_()
|
||||
if (OB_FAIL(ObMajorFreezeHelper::get_frozen_scn(tenant_id, frozen_scn))) {
|
||||
LOG_WARN("get_frozen_scn failed", KR(ret));
|
||||
} else {
|
||||
int64_t frozen_scn_val = frozen_scn.get_val_for_inner_table_field();
|
||||
int64_t frozen_scn_val = frozen_scn.get_val_for_tx();
|
||||
bool need_major = true;
|
||||
ObTenantFreezer *freezer = MTL(ObTenantFreezer *);
|
||||
ObRetryMajorInfo retry_major_info = freezer->get_retry_major_info();
|
||||
|
||||
Reference in New Issue
Block a user