fix boundary snapshot version when freeze info is invalid && revert FreezeInfo SCN Modification
This commit is contained in:
@ -389,7 +389,7 @@ int ObCompactionDiagnoseMgr::diagnose_tenant_tablet()
|
|||||||
LOG_WARN("failed to add dignose info about freeze_info", K(tmp_ret), K(merged_version));
|
LOG_WARN("failed to add dignose info about freeze_info", K(tmp_ret), K(merged_version));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
compaction_scn = freeze_info.freeze_scn.get_val_for_tx();
|
compaction_scn = freeze_info.freeze_version;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -70,11 +70,9 @@ int ObPartitionMergePolicy::get_neighbour_freeze_info(
|
|||||||
LOG_WARN("Failed to get freeze info, use snapshot_gc_ts instead", K(ret), K(snapshot_version));
|
LOG_WARN("Failed to get freeze info, use snapshot_gc_ts instead", K(ret), K(snapshot_version));
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
freeze_info.reset();
|
freeze_info.reset();
|
||||||
freeze_info.next.freeze_scn.set_max();
|
freeze_info.next.freeze_version = INT64_MAX;
|
||||||
if (OB_NOT_NULL(last_major)) {
|
if (OB_NOT_NULL(last_major)) {
|
||||||
if (OB_FAIL(freeze_info.prev.freeze_scn.convert_for_tx(last_major->get_snapshot_version()))) {
|
freeze_info.prev.freeze_version = last_major->get_snapshot_version();
|
||||||
LOG_WARN("failed to convert scn", K(ret), K(last_major));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG_WARN("Failed to get neighbour major freeze info", K(ret), K(snapshot_version));
|
LOG_WARN("Failed to get neighbour major freeze info", K(ret), K(snapshot_version));
|
||||||
@ -137,7 +135,7 @@ int ObPartitionMergePolicy::find_mini_merge_tables(
|
|||||||
result.reset();
|
result.reset();
|
||||||
// TODO: @dengzhi.ldz, remove max_snapshot_version, merge all forzen memtables
|
// TODO: @dengzhi.ldz, remove max_snapshot_version, merge all forzen memtables
|
||||||
// Keep max_snapshot_version currently because major merge must be done step by step
|
// Keep max_snapshot_version currently because major merge must be done step by step
|
||||||
int64_t max_snapshot_version = freeze_info.next.freeze_scn.get_val_for_tx();
|
int64_t max_snapshot_version = freeze_info.next.freeze_version;
|
||||||
const SCN &clog_checkpoint_scn = tablet.get_clog_checkpoint_scn();
|
const SCN &clog_checkpoint_scn = tablet.get_clog_checkpoint_scn();
|
||||||
|
|
||||||
// Freezing in the restart phase may not satisfy end >= last_max_sstable,
|
// Freezing in the restart phase may not satisfy end >= last_max_sstable,
|
||||||
@ -342,8 +340,8 @@ int ObPartitionMergePolicy::get_boundary_snapshot_version(
|
|||||||
min_snapshot = last_major_table->get_snapshot_version();
|
min_snapshot = last_major_table->get_snapshot_version();
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
min_snapshot = freeze_info.prev.freeze_scn.get_val_for_tx();
|
min_snapshot = freeze_info.prev.freeze_version;
|
||||||
max_snapshot = freeze_info.next.freeze_scn.get_val_for_tx();
|
max_snapshot = freeze_info.next.freeze_version;
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -510,7 +508,7 @@ int ObPartitionMergePolicy::get_major_merge_tables(
|
|||||||
LOG_WARN("failed to get freeze info", K(ret), K(base_table->get_snapshot_version()));
|
LOG_WARN("failed to get freeze info", K(ret), K(base_table->get_snapshot_version()));
|
||||||
} else if (OB_FAIL(result.handle_.add_table(base_table))) {
|
} else if (OB_FAIL(result.handle_.add_table(base_table))) {
|
||||||
LOG_WARN("failed to add base_table to result", K(ret));
|
LOG_WARN("failed to add base_table to result", K(ret));
|
||||||
} else if (base_table->get_snapshot_version() >= freeze_info.freeze_scn.get_val_for_tx()) {
|
} else if (base_table->get_snapshot_version() >= freeze_info.freeze_version) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_ERROR("unexpected sstable with snapshot_version bigger than next freeze_scn",
|
LOG_ERROR("unexpected sstable with snapshot_version bigger than next freeze_scn",
|
||||||
K(ret), K(freeze_info), KPC(base_table), K(tablet));
|
K(ret), K(freeze_info), KPC(base_table), K(tablet));
|
||||||
@ -546,7 +544,7 @@ int ObPartitionMergePolicy::get_major_merge_tables(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret) && OB_NOT_NULL(base_table)) {
|
if (OB_SUCC(ret) && OB_NOT_NULL(base_table)) {
|
||||||
const int64_t major_snapshot = MAX(base_table->get_snapshot_version(), freeze_info.freeze_scn.get_val_for_tx());
|
const int64_t major_snapshot = MAX(base_table->get_snapshot_version(), freeze_info.freeze_version);
|
||||||
result.read_base_version_ = base_table->get_snapshot_version();
|
result.read_base_version_ = base_table->get_snapshot_version();
|
||||||
result.version_range_.snapshot_version_ = major_snapshot;
|
result.version_range_.snapshot_version_ = major_snapshot;
|
||||||
result.create_snapshot_version_ = base_table->get_meta().get_basic_meta().create_snapshot_version_;
|
result.create_snapshot_version_ = base_table->get_meta().get_basic_meta().create_snapshot_version_;
|
||||||
@ -799,7 +797,7 @@ int ObPartitionMergePolicy::deal_hist_minor_merge(
|
|||||||
LOG_WARN("failed to get freeze info mgr from MTL", K(ret));
|
LOG_WARN("failed to get freeze info mgr from MTL", K(ret));
|
||||||
} else if (OB_ISNULL(first_major_table = table_store.get_major_sstables().get_boundary_table(false))) {
|
} else if (OB_ISNULL(first_major_table = table_store.get_major_sstables().get_boundary_table(false))) {
|
||||||
// index table during building, need compat with continuous multi version
|
// index table during building, need compat with continuous multi version
|
||||||
if (0 == (max_snapshot_version = freeze_info_mgr->get_latest_frozen_scn().get_val_for_tx())) {
|
if (0 == (max_snapshot_version = freeze_info_mgr->get_latest_frozen_version())) {
|
||||||
// no freeze info found, wait normal mini minor to free sstable
|
// no freeze info found, wait normal mini minor to free sstable
|
||||||
ret = OB_NO_NEED_MERGE;
|
ret = OB_NO_NEED_MERGE;
|
||||||
LOG_WARN("No freeze range to do hist minor merge for buiding index", K(ret), K(table_store));
|
LOG_WARN("No freeze range to do hist minor merge for buiding index", K(ret), K(table_store));
|
||||||
@ -891,7 +889,7 @@ int ObPartitionMergePolicy::check_need_major_merge(
|
|||||||
LOG_INFO("can't get freeze info after snapshot", K(ret), K(merge_version), K(major_sstable_version));
|
LOG_INFO("can't get freeze info after snapshot", K(ret), K(merge_version), K(major_sstable_version));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
can_merge = last_sstable_snapshot >= freeze_info.freeze_scn.get_val_for_tx();
|
can_merge = last_sstable_snapshot >= freeze_info.freeze_version;
|
||||||
if (!can_merge) {
|
if (!can_merge) {
|
||||||
LOG_TRACE("tablet need merge, but cannot merge now", K(tablet_id), K(merge_version), K(last_sstable_snapshot), K(freeze_info));
|
LOG_TRACE("tablet need merge, but cannot merge now", K(tablet_id), K(merge_version), K(last_sstable_snapshot), K(freeze_info));
|
||||||
}
|
}
|
||||||
@ -907,7 +905,7 @@ int ObPartitionMergePolicy::check_need_major_merge(
|
|||||||
// no frozen memtable, need force freeze
|
// no frozen memtable, need force freeze
|
||||||
need_force_freeze = true;
|
need_force_freeze = true;
|
||||||
} else {
|
} else {
|
||||||
need_force_freeze = last_frozen_memtable->get_snapshot_version() < freeze_info.freeze_scn.get_val_for_tx();
|
need_force_freeze = last_frozen_memtable->get_snapshot_version() < freeze_info.freeze_version;
|
||||||
if (!need_force_freeze) {
|
if (!need_force_freeze) {
|
||||||
FLOG_INFO("tablet no need force freeze", K(ret), K(tablet_id), K(merge_version), K(freeze_info), KPC(last_frozen_memtable));
|
FLOG_INFO("tablet no need force freeze", K(ret), K(tablet_id), K(merge_version), K(freeze_info), KPC(last_frozen_memtable));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -130,16 +130,16 @@ void ObTenantFreezeInfoMgr::destroy()
|
|||||||
TG_DESTROY(tg_id_);
|
TG_DESTROY(tg_id_);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCN ObTenantFreezeInfoMgr::get_latest_frozen_scn()
|
int64_t ObTenantFreezeInfoMgr::get_latest_frozen_version()
|
||||||
{
|
{
|
||||||
SCN frozen_scn = SCN::min_scn();
|
int64_t frozen_version = 0;
|
||||||
RLockGuard lock_guard(lock_);
|
RLockGuard lock_guard(lock_);
|
||||||
ObIArray<FreezeInfo> &info_list = info_list_[cur_idx_];
|
ObIArray<FreezeInfo> &info_list = info_list_[cur_idx_];
|
||||||
|
|
||||||
if (0 != info_list.count()) {
|
if (0 != info_list.count()) {
|
||||||
frozen_scn = info_list.at(info_list.count()-1).freeze_scn;
|
frozen_version = info_list.at(info_list.count()-1).freeze_version;
|
||||||
}
|
}
|
||||||
return frozen_scn;
|
return frozen_version;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTenantFreezeInfoMgr::get_min_dependent_freeze_info(FreezeInfo &freeze_info)
|
int ObTenantFreezeInfoMgr::get_min_dependent_freeze_info(FreezeInfo &freeze_info)
|
||||||
@ -223,9 +223,9 @@ int64_t ObTenantFreezeInfoMgr::find_pos_in_list_(
|
|||||||
while (l < r && ret_pos < 0) {
|
while (l < r && ret_pos < 0) {
|
||||||
mid = (l + r) >> 1;
|
mid = (l + r) >> 1;
|
||||||
const FreezeInfo &tmp_info = info_list.at(mid);
|
const FreezeInfo &tmp_info = info_list.at(mid);
|
||||||
if (snapshot_version < tmp_info.freeze_scn.get_val_for_tx()) {
|
if (snapshot_version < tmp_info.freeze_version) {
|
||||||
r = mid;
|
r = mid;
|
||||||
} else if (snapshot_version > tmp_info.freeze_scn.get_val_for_tx()) {
|
} else if (snapshot_version > tmp_info.freeze_version) {
|
||||||
l = mid + 1;
|
l = mid + 1;
|
||||||
} else {
|
} else {
|
||||||
ret_pos = mid;
|
ret_pos = mid;
|
||||||
@ -293,7 +293,7 @@ int ObTenantFreezeInfoMgr::get_freeze_info_behind_snapshot_version_(
|
|||||||
bool found = false;
|
bool found = false;
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && !found && i < info_list.count(); ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && !found && i < info_list.count(); ++i) {
|
||||||
FreezeInfo &tmp_info = info_list.at(i);
|
FreezeInfo &tmp_info = info_list.at(i);
|
||||||
if (snapshot_version < tmp_info.freeze_scn.get_val_for_tx()) {
|
if (snapshot_version < tmp_info.freeze_version) {
|
||||||
freeze_info = tmp_info;
|
freeze_info = tmp_info;
|
||||||
found = true;
|
found = true;
|
||||||
}
|
}
|
||||||
@ -330,7 +330,7 @@ int ObTenantFreezeInfoMgr::inner_get_neighbour_major_freeze(
|
|||||||
bool found = false;
|
bool found = false;
|
||||||
for (int64_t i = 0; i < info_list.count() && OB_SUCC(ret) && !found; ++i) {
|
for (int64_t i = 0; i < info_list.count() && OB_SUCC(ret) && !found; ++i) {
|
||||||
FreezeInfo &next_info = info_list.at(i);
|
FreezeInfo &next_info = info_list.at(i);
|
||||||
if (snapshot_version < next_info.freeze_scn.get_val_for_tx()) {
|
if (snapshot_version < next_info.freeze_version) {
|
||||||
found = true;
|
found = true;
|
||||||
if (0 == i) {
|
if (0 == i) {
|
||||||
ret = OB_ENTRY_NOT_EXIST;
|
ret = OB_ENTRY_NOT_EXIST;
|
||||||
@ -344,7 +344,7 @@ int ObTenantFreezeInfoMgr::inner_get_neighbour_major_freeze(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (OB_SUCC(ret) && !found) {
|
if (OB_SUCC(ret) && !found) {
|
||||||
info.next.freeze_scn.set_max();
|
info.next.freeze_version = INT64_MAX;
|
||||||
info.prev = info_list.at(info_list.count() - 1);
|
info.prev = info_list.at(info_list.count() - 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -436,17 +436,17 @@ int ObTenantFreezeInfoMgr::get_min_reserved_snapshot(
|
|||||||
STORAGE_LOG(WARN, "fail to get multi version duration", K(ret), K(tablet_id));
|
STORAGE_LOG(WARN, "fail to get multi version duration", K(ret), K(tablet_id));
|
||||||
} else {
|
} else {
|
||||||
if (merged_version < 1) {
|
if (merged_version < 1) {
|
||||||
freeze_info.freeze_scn.set_min();
|
freeze_info.freeze_version = 0;
|
||||||
} else if (OB_FAIL(get_freeze_info_behind_snapshot_version_(merged_version, freeze_info))) {
|
} else if (OB_FAIL(get_freeze_info_behind_snapshot_version_(merged_version, freeze_info))) {
|
||||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||||
LOG_WARN("failed to get freeze info behind snapshot", K(ret), K(merged_version));
|
LOG_WARN("failed to get freeze info behind snapshot", K(ret), K(merged_version));
|
||||||
} else {
|
} else {
|
||||||
freeze_info.freeze_scn.set_max();
|
freeze_info.freeze_version = INT64_MAX;
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
snapshot_version = std::max(0L, snapshot_gc_ts_ - duration * 1000L * 1000L *1000L);
|
snapshot_version = std::max(0L, snapshot_gc_ts_ - duration * 1000L * 1000L *1000L);
|
||||||
snapshot_version = std::min(snapshot_version, freeze_info.freeze_scn.get_val_for_tx());
|
snapshot_version = std::min(snapshot_version, freeze_info.freeze_version);
|
||||||
for (int64_t i = 0; i < snapshots.count() && OB_SUCC(ret); ++i) {
|
for (int64_t i = 0; i < snapshots.count() && OB_SUCC(ret); ++i) {
|
||||||
bool related = false;
|
bool related = false;
|
||||||
const ObSnapshotInfo &snapshot = snapshots.at(i);
|
const ObSnapshotInfo &snapshot = snapshots.at(i);
|
||||||
@ -481,19 +481,19 @@ int ObTenantFreezeInfoMgr::diagnose_min_reserved_snapshot(
|
|||||||
STORAGE_LOG(WARN, "fail to get multi version duration", K(ret), K(tablet_id));
|
STORAGE_LOG(WARN, "fail to get multi version duration", K(ret), K(tablet_id));
|
||||||
} else {
|
} else {
|
||||||
if (merge_snapshot_version < 1) {
|
if (merge_snapshot_version < 1) {
|
||||||
freeze_info.freeze_scn.set_min();
|
freeze_info.freeze_version = 0;
|
||||||
} else if (OB_FAIL(get_freeze_info_behind_snapshot_version_(merge_snapshot_version, freeze_info))) {
|
} else if (OB_FAIL(get_freeze_info_behind_snapshot_version_(merge_snapshot_version, freeze_info))) {
|
||||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||||
LOG_WARN("failed to get freeze info behind snapshot", K(ret), K(merge_snapshot_version));
|
LOG_WARN("failed to get freeze info behind snapshot", K(ret), K(merge_snapshot_version));
|
||||||
} else {
|
} else {
|
||||||
freeze_info.freeze_scn.set_max();
|
freeze_info.freeze_version = INT64_MAX;
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
snapshot_version = std::max(0L, snapshot_gc_ts_ - duration * 1000L * 1000L * 1000L);
|
snapshot_version = std::max(0L, snapshot_gc_ts_ - duration * 1000L * 1000L * 1000L);
|
||||||
snapshot_from_type = "undo_retention";
|
snapshot_from_type = "undo_retention";
|
||||||
if (freeze_info.freeze_scn.get_val_for_tx() < snapshot_version) {
|
if (freeze_info.freeze_version < snapshot_version) {
|
||||||
snapshot_version = freeze_info.freeze_scn.get_val_for_tx();
|
snapshot_version = freeze_info.freeze_version;
|
||||||
snapshot_from_type = "major_freeze_ts";
|
snapshot_from_type = "major_freeze_ts";
|
||||||
}
|
}
|
||||||
for (int64_t i = 0; i < snapshots.count() && OB_SUCC(ret); ++i) {
|
for (int64_t i = 0; i < snapshots.count() && OB_SUCC(ret); ++i) {
|
||||||
@ -561,7 +561,7 @@ int ObTenantFreezeInfoMgr::get_reserve_points(
|
|||||||
// return ret;
|
// return ret;
|
||||||
// }
|
// }
|
||||||
|
|
||||||
int ObTenantFreezeInfoMgr::get_latest_freeze_scn(SCN &freeze_scn)
|
int ObTenantFreezeInfoMgr::get_latest_freeze_version(int64_t &freeze_version)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
@ -572,17 +572,17 @@ int ObTenantFreezeInfoMgr::get_latest_freeze_scn(SCN &freeze_scn)
|
|||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
STORAGE_LOG(WARN, "not init", K(ret));
|
STORAGE_LOG(WARN, "not init", K(ret));
|
||||||
} else {
|
} else {
|
||||||
freeze_scn.set_min();
|
freeze_version = 0;
|
||||||
|
|
||||||
if (info_list.count() > 0) {
|
if (info_list.count() > 0) {
|
||||||
freeze_scn = info_list.at(info_list.count() - 1).freeze_scn;
|
freeze_version = info_list.at(info_list.count() - 1).freeze_version;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTenantFreezeInfoMgr::prepare_new_info_list(const SCN &min_major_snapshot)
|
int ObTenantFreezeInfoMgr::prepare_new_info_list(const int64_t min_major_snapshot)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
@ -591,9 +591,9 @@ int ObTenantFreezeInfoMgr::prepare_new_info_list(const SCN &min_major_snapshot)
|
|||||||
snapshots_[next_idx].reset();
|
snapshots_[next_idx].reset();
|
||||||
|
|
||||||
for (int64_t i = 0; i < info_list_[cur_idx_].count() && OB_SUCC(ret); ++i) {
|
for (int64_t i = 0; i < info_list_[cur_idx_].count() && OB_SUCC(ret); ++i) {
|
||||||
if (SCN::max_scn() == min_major_snapshot || // no garbage collection is necessary
|
if (INT64_MAX == min_major_snapshot || // no garbage collection is necessary
|
||||||
// or version is bigger or equal than the smallest major version currently
|
// or version is bigger or equal than the smallest major version currently
|
||||||
info_list_[cur_idx_].at(i).freeze_scn >= min_major_snapshot) {
|
info_list_[cur_idx_].at(i).freeze_version >= min_major_snapshot) {
|
||||||
if (OB_FAIL(info_list_[next_idx].push_back(info_list_[cur_idx_].at(i)))) {
|
if (OB_FAIL(info_list_[next_idx].push_back(info_list_[cur_idx_].at(i)))) {
|
||||||
STORAGE_LOG(WARN, "fail to push back info", K(ret));
|
STORAGE_LOG(WARN, "fail to push back info", K(ret));
|
||||||
}
|
}
|
||||||
@ -616,7 +616,7 @@ int ObTenantFreezeInfoMgr::update_next_info_list(const ObIArray<FreezeInfo> &inf
|
|||||||
const FreezeInfo &next = info_list.at(i);
|
const FreezeInfo &next = info_list.at(i);
|
||||||
if (next_info_list.count() > 0) {
|
if (next_info_list.count() > 0) {
|
||||||
FreezeInfo &prev = next_info_list.at(next_info_list.count() - 1);
|
FreezeInfo &prev = next_info_list.at(next_info_list.count() - 1);
|
||||||
if (OB_UNLIKELY(prev.freeze_scn > next.freeze_scn)) {
|
if (OB_UNLIKELY(prev.freeze_version > next.freeze_version)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
STORAGE_LOG(ERROR, "freeze version decrease is not allowed", K(ret), K(prev), K(next));
|
STORAGE_LOG(ERROR, "freeze version decrease is not allowed", K(ret), K(prev), K(next));
|
||||||
}
|
}
|
||||||
@ -654,7 +654,7 @@ int ObTenantFreezeInfoMgr::update_info(
|
|||||||
const int64_t snapshot_gc_ts,
|
const int64_t snapshot_gc_ts,
|
||||||
const ObIArray<FreezeInfo> &info_list,
|
const ObIArray<FreezeInfo> &info_list,
|
||||||
const ObIArray<ObSnapshotInfo> &snapshots,
|
const ObIArray<ObSnapshotInfo> &snapshots,
|
||||||
const SCN &min_major_snapshot,
|
const int64_t min_major_snapshot,
|
||||||
bool& gc_snapshot_ts_changed)
|
bool& gc_snapshot_ts_changed)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -735,29 +735,37 @@ int ObTenantFreezeInfoMgr::ReloadTask::get_global_info(int64_t &snapshot_gc_ts)
|
|||||||
}
|
}
|
||||||
|
|
||||||
int ObTenantFreezeInfoMgr::ReloadTask::get_freeze_info(
|
int ObTenantFreezeInfoMgr::ReloadTask::get_freeze_info(
|
||||||
SCN &min_major_version,
|
int64_t &min_major_version,
|
||||||
ObIArray<FreezeInfo> &freeze_info)
|
ObIArray<FreezeInfo> &freeze_info)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
int64_t freeze_version = 0;
|
||||||
SCN freeze_scn = SCN::min_scn();
|
SCN freeze_scn = SCN::min_scn();
|
||||||
min_major_version.set_max();
|
SCN min_major_scn = SCN::max_scn();
|
||||||
|
min_major_version = INT64_MAX;
|
||||||
ObSEArray<ObSimpleFrozenStatus, 8> tmp;
|
ObSEArray<ObSimpleFrozenStatus, 8> tmp;
|
||||||
ObFreezeInfoProxy freeze_info_proxy(MTL_ID());
|
ObFreezeInfoProxy freeze_info_proxy(MTL_ID());
|
||||||
|
|
||||||
if (OB_FAIL(mgr_.get_latest_freeze_scn(freeze_scn))) {
|
if (OB_FAIL(mgr_.get_latest_freeze_version(freeze_version))) {
|
||||||
STORAGE_LOG(WARN, "fail to get major version", K(ret));
|
STORAGE_LOG(WARN, "fail to get major version", K(ret));
|
||||||
|
} else if (OB_FAIL(freeze_scn.convert_for_tx(freeze_version))) {
|
||||||
|
STORAGE_LOG(WARN, "fail to convert to scn", K(ret), K(freeze_version));
|
||||||
} else if (OB_FAIL(freeze_info_proxy.get_min_major_available_and_larger_info(
|
} else if (OB_FAIL(freeze_info_proxy.get_min_major_available_and_larger_info(
|
||||||
*sql_proxy_,
|
*sql_proxy_,
|
||||||
freeze_scn,
|
freeze_scn,
|
||||||
min_major_version,
|
min_major_scn,
|
||||||
tmp))) {
|
tmp))) {
|
||||||
STORAGE_LOG(WARN, "fail to get freeze info", K(ret), K(freeze_scn));
|
STORAGE_LOG(WARN, "fail to get freeze info", K(ret), K(freeze_scn));
|
||||||
|
} else if (OB_UNLIKELY(!min_major_scn.is_valid())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
STORAGE_LOG(ERROR, "get unexpected invalid scn", K(ret), K(min_major_scn));
|
||||||
|
} else if (FALSE_IT(min_major_version = min_major_scn.get_val_for_tx())) {
|
||||||
} else {
|
} else {
|
||||||
for (int64_t i = 0; i < tmp.count() && OB_SUCC(ret); ++i) {
|
for (int64_t i = 0; i < tmp.count() && OB_SUCC(ret); ++i) {
|
||||||
ObSimpleFrozenStatus &status = tmp.at(i);
|
ObSimpleFrozenStatus &status = tmp.at(i);
|
||||||
if (OB_FAIL(freeze_info.push_back(
|
if (OB_FAIL(freeze_info.push_back(
|
||||||
FreezeInfo(status.frozen_scn_, status.schema_version_, status.cluster_version_)))) {
|
FreezeInfo(status.frozen_scn_.get_val_for_tx(), status.schema_version_, status.cluster_version_)))) {
|
||||||
STORAGE_LOG(WARN, "fail to push back freeze info", K(ret), K(status));
|
STORAGE_LOG(WARN, "fail to push back freeze info", K(ret), K(status));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -840,7 +848,7 @@ int ObTenantFreezeInfoMgr::ReloadTask::try_update_info()
|
|||||||
ObSEArray<ObSnapshotInfo, 4> snapshots;
|
ObSEArray<ObSnapshotInfo, 4> snapshots;
|
||||||
bool changed = false;
|
bool changed = false;
|
||||||
observer::ObService *ob_service = GCTX.ob_service_;
|
observer::ObService *ob_service = GCTX.ob_service_;
|
||||||
SCN min_major_snapshot = SCN::max_scn();
|
int64_t min_major_snapshot = INT64_MAX;
|
||||||
|
|
||||||
if (OB_FAIL(get_global_info(snapshot_gc_ts))) {
|
if (OB_FAIL(get_global_info(snapshot_gc_ts))) {
|
||||||
if (OB_TENANT_NOT_EXIST != ret) {
|
if (OB_TENANT_NOT_EXIST != ret) {
|
||||||
|
|||||||
@ -42,22 +42,22 @@ class ObTenantFreezeInfoMgr
|
|||||||
public:
|
public:
|
||||||
|
|
||||||
struct FreezeInfo {
|
struct FreezeInfo {
|
||||||
share::SCN freeze_scn;
|
int64_t freeze_version;
|
||||||
int64_t schema_version;
|
int64_t schema_version;
|
||||||
int64_t cluster_version;
|
int64_t cluster_version;
|
||||||
|
|
||||||
FreezeInfo() : freeze_scn(), schema_version(-1), cluster_version(0) {}
|
FreezeInfo() : freeze_version(-1), schema_version(-1), cluster_version(0) {}
|
||||||
FreezeInfo(const share::SCN &scn, const int64_t schema_ver, const int64_t cluster_ver)
|
FreezeInfo(const int64_t &freeze_ver, const int64_t schema_ver, const int64_t cluster_ver)
|
||||||
: freeze_scn(scn), schema_version(schema_ver), cluster_version(cluster_ver) {}
|
: freeze_version(freeze_ver), schema_version(schema_ver), cluster_version(cluster_ver) {}
|
||||||
FreezeInfo &operator =(const FreezeInfo &o)
|
FreezeInfo &operator =(const FreezeInfo &o)
|
||||||
{
|
{
|
||||||
freeze_scn = o.freeze_scn;
|
freeze_version = o.freeze_version;
|
||||||
schema_version = o.schema_version;
|
schema_version = o.schema_version;
|
||||||
cluster_version = o.cluster_version;
|
cluster_version = o.cluster_version;
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
void reset() { freeze_scn.reset(); schema_version = -1; cluster_version = 0; }
|
void reset() { freeze_version = -1; schema_version = -1; cluster_version = 0; }
|
||||||
TO_STRING_KV(K(freeze_scn), K(schema_version), K(cluster_version));
|
TO_STRING_KV(K(freeze_version), K(schema_version), K(cluster_version));
|
||||||
};
|
};
|
||||||
|
|
||||||
struct NeighbourFreezeInfo {
|
struct NeighbourFreezeInfo {
|
||||||
@ -85,7 +85,7 @@ public:
|
|||||||
void stop();
|
void stop();
|
||||||
void destroy();
|
void destroy();
|
||||||
|
|
||||||
share::SCN get_latest_frozen_scn();
|
int64_t get_latest_frozen_version();
|
||||||
|
|
||||||
int get_freeze_info_behind_major_snapshot(const int64_t major_snapshot, common::ObIArray<FreezeInfo> &freeze_infos);
|
int get_freeze_info_behind_major_snapshot(const int64_t major_snapshot, common::ObIArray<FreezeInfo> &freeze_infos);
|
||||||
int get_freeze_info_by_snapshot_version(const int64_t snapshot_version, FreezeInfo &freeze_info);
|
int get_freeze_info_by_snapshot_version(const int64_t snapshot_version, FreezeInfo &freeze_info);
|
||||||
@ -117,7 +117,7 @@ public:
|
|||||||
const int64_t snapshot_gc_ts,
|
const int64_t snapshot_gc_ts,
|
||||||
const common::ObIArray<FreezeInfo> &info_list,
|
const common::ObIArray<FreezeInfo> &info_list,
|
||||||
const common::ObIArray<share::ObSnapshotInfo> &snapshots,
|
const common::ObIArray<share::ObSnapshotInfo> &snapshots,
|
||||||
const share::SCN &min_major_snapshot,
|
const int64_t min_major_snapshot,
|
||||||
bool& changed);
|
bool& changed);
|
||||||
|
|
||||||
int64_t get_snapshot_gc_ts();
|
int64_t get_snapshot_gc_ts();
|
||||||
@ -137,11 +137,11 @@ private:
|
|||||||
common::MODIFY_GC_SNAPSHOT_INTERVAL + 10L * 1000L * 1000L;
|
common::MODIFY_GC_SNAPSHOT_INTERVAL + 10L * 1000L * 1000L;
|
||||||
static const int64_t MIN_DEPENDENT_FREEZE_INFO_GAP = 2;
|
static const int64_t MIN_DEPENDENT_FREEZE_INFO_GAP = 2;
|
||||||
|
|
||||||
int get_latest_freeze_scn(share::SCN &freeze_scn);
|
int get_latest_freeze_version(int64_t &freeze_version);
|
||||||
int64_t get_next_idx() { return 1L - cur_idx_; }
|
int64_t get_next_idx() { return 1L - cur_idx_; }
|
||||||
void switch_info() { cur_idx_ = get_next_idx(); }
|
void switch_info() { cur_idx_ = get_next_idx(); }
|
||||||
int get_info_nolock(const int64_t idx, FreezeInfo &freeze_info);
|
int get_info_nolock(const int64_t idx, FreezeInfo &freeze_info);
|
||||||
int prepare_new_info_list(const share::SCN &min_major_snapshot);
|
int prepare_new_info_list(const int64_t min_major_snapshot);
|
||||||
virtual int get_multi_version_duration(int64_t &duration) const;
|
virtual int get_multi_version_duration(int64_t &duration) const;
|
||||||
int inner_get_neighbour_major_freeze(const int64_t snapshot_version,
|
int inner_get_neighbour_major_freeze(const int64_t snapshot_version,
|
||||||
NeighbourFreezeInfo &info);
|
NeighbourFreezeInfo &info);
|
||||||
@ -166,7 +166,7 @@ private:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
int get_global_info(int64_t &snapshot_gc_ts);
|
int get_global_info(int64_t &snapshot_gc_ts);
|
||||||
int get_freeze_info(share::SCN &min_major_snapshot,
|
int get_freeze_info(int64_t &min_major_snapshot,
|
||||||
common::ObIArray<FreezeInfo> &freeze_info);
|
common::ObIArray<FreezeInfo> &freeze_info);
|
||||||
|
|
||||||
bool inited_;
|
bool inited_;
|
||||||
|
|||||||
@ -536,7 +536,7 @@ int TestCompactionPolicy::prepare_freeze_info(
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObTenantFreezeInfoMgr *mgr = MTL(ObTenantFreezeInfoMgr *);
|
ObTenantFreezeInfoMgr *mgr = MTL(ObTenantFreezeInfoMgr *);
|
||||||
bool changed = false;
|
bool changed = false;
|
||||||
SCN min_major_snapshot = SCN::max_scn();
|
int64_t min_major_snapshot = INT64_MAX;
|
||||||
|
|
||||||
if (OB_ISNULL(mgr)) {
|
if (OB_ISNULL(mgr)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
@ -738,9 +738,7 @@ TEST_F(TestCompactionPolicy, check_minor_merge_basic)
|
|||||||
|
|
||||||
common::ObArray<ObTenantFreezeInfoMgr::FreezeInfo> freeze_info;
|
common::ObArray<ObTenantFreezeInfoMgr::FreezeInfo> freeze_info;
|
||||||
common::ObArray<share::ObSnapshotInfo> snapshots;
|
common::ObArray<share::ObSnapshotInfo> snapshots;
|
||||||
SCN scn;
|
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(1, 1, 0)));
|
||||||
scn.convert_for_tx(1);
|
|
||||||
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(scn, 1, 0)));
|
|
||||||
|
|
||||||
ret = TestCompactionPolicy::prepare_freeze_info(500, freeze_info, snapshots);
|
ret = TestCompactionPolicy::prepare_freeze_info(500, freeze_info, snapshots);
|
||||||
ASSERT_EQ(OB_SUCCESS, ret);
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
@ -773,13 +771,9 @@ TEST_F(TestCompactionPolicy, check_no_need_minor_merge)
|
|||||||
|
|
||||||
common::ObArray<ObTenantFreezeInfoMgr::FreezeInfo> freeze_info;
|
common::ObArray<ObTenantFreezeInfoMgr::FreezeInfo> freeze_info;
|
||||||
common::ObArray<share::ObSnapshotInfo> snapshots;
|
common::ObArray<share::ObSnapshotInfo> snapshots;
|
||||||
SCN scn;
|
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(1, 1, 0)));
|
||||||
scn.convert_for_tx(1);
|
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(320, 1, 0)));
|
||||||
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(scn, 1, 0)));
|
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(400, 1, 0)));
|
||||||
scn.convert_for_tx(320);
|
|
||||||
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(scn, 1, 0)));
|
|
||||||
scn.convert_for_tx(400);
|
|
||||||
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(scn, 1, 0)));
|
|
||||||
|
|
||||||
ret = TestCompactionPolicy::prepare_freeze_info(500, freeze_info, snapshots);
|
ret = TestCompactionPolicy::prepare_freeze_info(500, freeze_info, snapshots);
|
||||||
ASSERT_EQ(OB_SUCCESS, ret);
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
@ -812,11 +806,8 @@ TEST_F(TestCompactionPolicy, check_major_merge_basic)
|
|||||||
|
|
||||||
common::ObArray<ObTenantFreezeInfoMgr::FreezeInfo> freeze_info;
|
common::ObArray<ObTenantFreezeInfoMgr::FreezeInfo> freeze_info;
|
||||||
common::ObArray<share::ObSnapshotInfo> snapshots;
|
common::ObArray<share::ObSnapshotInfo> snapshots;
|
||||||
SCN scn;
|
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(1, 1, 0)));
|
||||||
scn.convert_for_tx(1);
|
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(340, 1, 0)));
|
||||||
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(scn, 1, 0)));
|
|
||||||
scn.convert_for_tx(340);
|
|
||||||
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(scn, 1, 0)));
|
|
||||||
|
|
||||||
ret = TestCompactionPolicy::prepare_freeze_info(500, freeze_info, snapshots);
|
ret = TestCompactionPolicy::prepare_freeze_info(500, freeze_info, snapshots);
|
||||||
ASSERT_EQ(OB_SUCCESS, ret);
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
@ -850,11 +841,8 @@ TEST_F(TestCompactionPolicy, check_no_need_major_merge)
|
|||||||
|
|
||||||
common::ObArray<ObTenantFreezeInfoMgr::FreezeInfo> freeze_info;
|
common::ObArray<ObTenantFreezeInfoMgr::FreezeInfo> freeze_info;
|
||||||
common::ObArray<share::ObSnapshotInfo> snapshots;
|
common::ObArray<share::ObSnapshotInfo> snapshots;
|
||||||
SCN scn;
|
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(1, 1, 0)));
|
||||||
scn.convert_for_tx(1);
|
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(340, 1, 0)));
|
||||||
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(scn, 1, 0)));
|
|
||||||
scn.convert_for_tx(340);
|
|
||||||
ASSERT_EQ(OB_SUCCESS, freeze_info.push_back(ObTenantFreezeInfoMgr::FreezeInfo(scn, 1, 0)));
|
|
||||||
|
|
||||||
ret = TestCompactionPolicy::prepare_freeze_info(500, freeze_info, snapshots);
|
ret = TestCompactionPolicy::prepare_freeze_info(500, freeze_info, snapshots);
|
||||||
ASSERT_EQ(OB_SUCCESS, ret);
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
|||||||
Reference in New Issue
Block a user