clean some TODOs about ObScnRange && FreezeInfo
This commit is contained in:
parent
97a2a94d7c
commit
89c7502360
@ -339,7 +339,7 @@ int ObAllVirtualTabletSSTableMacroInfo::gen_row(
|
||||
break;
|
||||
case END_LOG_SCN:
|
||||
//end_log_scn
|
||||
cur_row_.cells_[i].set_uint64(table_key.get_end_log_ts() < 0 ? 0 : table_key.get_end_log_ts());
|
||||
cur_row_.cells_[i].set_uint64(!table_key.get_end_scn().is_valid() ? 0 : table_key.get_end_scn().get_val_for_inner_table_field());
|
||||
break;
|
||||
case MACRO_LOGIC_VERSION:
|
||||
//macro_logic_version
|
||||
@ -668,7 +668,7 @@ bool ObAllVirtualTabletSSTableMacroInfo::check_sstable_need_ignore(const ObITabl
|
||||
objs_[index++].set_int(MTL_ID()); // tenant_id
|
||||
objs_[index++].set_int(ls_id_); // ls_id
|
||||
objs_[index++].set_int(table_key.tablet_id_.id()); // tablet_id
|
||||
objs_[index++].set_uint64(table_key.get_end_log_ts() < 0 ? 0 : table_key.get_end_log_ts()); // end_log_scn
|
||||
objs_[index++].set_uint64(!table_key.get_end_scn().is_valid() ? 0 : table_key.get_end_scn().get_val_for_inner_table_field());
|
||||
|
||||
ObRowkey rowkey(objs_, index + 1);
|
||||
for (int64_t i = 0; i < key_ranges_.count() && need_ignore; ++i) {
|
||||
|
@ -71,7 +71,7 @@ int ObIndexTreePrefetcher::init(
|
||||
access_ctx_ = &access_ctx;
|
||||
iter_param_ = &iter_param;
|
||||
index_read_info_ = iter_param.get_full_read_info()->get_index_read_info();
|
||||
data_version_ = sstable_->is_major_sstable() ? sstable_->get_snapshot_version() : sstable_->get_key().get_end_log_ts();
|
||||
data_version_ = sstable_->is_major_sstable() ? sstable_->get_snapshot_version() : sstable_->get_key().get_end_scn().get_val_for_tx();
|
||||
data_block_cache_ = &(OB_STORE_CACHE.get_block_cache());
|
||||
index_block_cache_ = &(OB_STORE_CACHE.get_index_block_cache());
|
||||
is_inited_ = true;
|
||||
@ -98,7 +98,7 @@ int ObIndexTreePrefetcher::switch_context(
|
||||
sstable_ = &sstable;
|
||||
access_ctx_ = &access_ctx;
|
||||
index_read_info_ = &index_read_info;
|
||||
data_version_ = sstable_->is_major_sstable() ? sstable_->get_snapshot_version() : sstable_->get_key().get_end_log_ts();
|
||||
data_version_ = sstable_->is_major_sstable() ? sstable_->get_snapshot_version() : sstable_->get_key().get_end_scn().get_val_for_tx();
|
||||
if (!is_rescan_) {
|
||||
is_rescan_ = true;
|
||||
for (int64_t i = 0; i < DEFAULT_GET_MICRO_DATA_HANDLE_CNT; ++i) {
|
||||
@ -160,7 +160,7 @@ int ObIndexTreePrefetcher::lookup_in_cache(ObSSTableReadHandle &read_handle)
|
||||
++access_ctx_->table_store_stat_.row_cache_miss_cnt_;
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else if (OB_UNLIKELY(read_handle.row_handle_.row_value_->get_start_log_ts() != sstable_->get_key().get_start_log_ts())) {
|
||||
} else if (OB_UNLIKELY(read_handle.row_handle_.row_value_->get_start_log_ts() != sstable_->get_key().get_start_scn().get_val_for_tx())) {
|
||||
++access_ctx_->table_store_stat_.row_cache_miss_cnt_;
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
@ -530,7 +530,7 @@ int ObIndexTreeMultiPassPrefetcher::init_basic_info(
|
||||
int32_t range_count = 0;
|
||||
sstable_ = &sstable;
|
||||
access_ctx_ = &access_ctx;
|
||||
data_version_ = sstable_->is_major_sstable() ? sstable_->get_snapshot_version() : sstable_->get_key().get_end_log_ts();
|
||||
data_version_ = sstable_->is_major_sstable() ? sstable_->get_snapshot_version() : sstable_->get_key().get_end_scn().get_val_for_tx();
|
||||
cur_level_ = 0;
|
||||
iter_type_ = iter_type;
|
||||
index_tree_height_ = sstable_->get_meta().get_index_tree_height();
|
||||
|
@ -1082,7 +1082,7 @@ int ObMultipleMerge::prepare_read_tables(bool refresh)
|
||||
}
|
||||
} else if (OB_FAIL(tablet_handle.get_obj()->get_read_tables(
|
||||
get_table_param_.sample_info_.is_no_sample()
|
||||
? access_ctx_->store_ctx_->mvcc_acc_ctx_.get_snapshot_version().get_val_for_lsn_allocator()
|
||||
? access_ctx_->store_ctx_->mvcc_acc_ctx_.get_snapshot_version().get_val_for_tx()
|
||||
: INT64_MAX,
|
||||
get_table_param_.tablet_iter_,
|
||||
false/*allow_not_ready*/))) {
|
||||
@ -1210,7 +1210,7 @@ int ObMultipleMerge::refresh_tablet_iter()
|
||||
} else if (OB_FAIL(ls_handle.get_ls()->get_tablet_svr()->get_read_tables(
|
||||
tablet_id,
|
||||
get_table_param_.sample_info_.is_no_sample()
|
||||
? access_ctx_->store_ctx_->mvcc_acc_ctx_.get_snapshot_version().get_val_for_lsn_allocator()
|
||||
? access_ctx_->store_ctx_->mvcc_acc_ctx_.get_snapshot_version().get_val_for_tx()
|
||||
: INT64_MAX,
|
||||
get_table_param_.tablet_iter_,
|
||||
false/*allow_not_ready*/))) {
|
||||
|
@ -261,7 +261,7 @@ int ObSingleMerge::inner_get_next_row(ObDatumRow &row)
|
||||
if (table_idx == tables_.count() - 1) {
|
||||
access_ctx_->defensive_check_record_.start_access_table_idx_ = table_idx;
|
||||
access_ctx_->defensive_check_record_.total_table_handle_cnt_ = tables_.count();
|
||||
access_ctx_->defensive_check_record_.fist_access_table_start_log_ts_ = table->get_start_log_ts();
|
||||
access_ctx_->defensive_check_record_.fist_access_table_start_log_ts_ = table->get_start_scn().get_val_for_tx();
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
@ -145,7 +145,7 @@ int ObTableScanIterator::prepare_table_context()
|
||||
ObVersionRange trans_version_range;
|
||||
trans_version_range.multi_version_start_ = 0;
|
||||
trans_version_range.base_version_ = 0;
|
||||
trans_version_range.snapshot_version_ = ctx_guard_.get_store_ctx().mvcc_acc_ctx_.get_snapshot_version().get_val_for_lsn_allocator();
|
||||
trans_version_range.snapshot_version_ = ctx_guard_.get_store_ctx().mvcc_acc_ctx_.get_snapshot_version().get_val_for_tx();
|
||||
if (OB_UNLIKELY(!trans_version_range.is_valid())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "trans version range is not valid", K(ret), K(trans_version_range));
|
||||
@ -377,7 +377,7 @@ int ObTableScanIterator::open_iter()
|
||||
if (nullptr == scan_merge_ && OB_FAIL(init_scan_iter(scan_merge_))) {
|
||||
STORAGE_LOG(WARN, "Failed to init scanmerge", K(ret));
|
||||
} else if (OB_FAIL(get_table_param_.tablet_iter_.tablet_handle_.get_obj()->get_read_tables(
|
||||
main_table_ctx_.store_ctx_->mvcc_acc_ctx_.get_snapshot_version().get_val_for_lsn_allocator(),
|
||||
main_table_ctx_.store_ctx_->mvcc_acc_ctx_.get_snapshot_version().get_val_for_tx(),
|
||||
get_table_param_.tablet_iter_,
|
||||
false /*allow_not_ready*/ ))) {
|
||||
STORAGE_LOG(WARN, "Fail to read tables", K(ret));
|
||||
|
@ -288,7 +288,7 @@ int ObBackupUtils::check_tablet_ddl_sstable_validity_(const storage::ObTabletHan
|
||||
LOG_WARN("table ptr not correct", K(ret), KPC(last_table_ptr));
|
||||
} else {
|
||||
const ObITable::TableKey &table_key = last_table_ptr->get_key();
|
||||
if (table_key.get_end_log_ts() != tablet_ddl_checkpoint_scn.get_val_for_inner_table_field()) {
|
||||
if (table_key.get_end_scn() != tablet_ddl_checkpoint_scn) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("tablet meta is not valid", K(ret), K(table_key), K(tablet_ddl_checkpoint_scn));
|
||||
}
|
||||
|
@ -365,7 +365,7 @@ int ObMicroBlockRowGetter::inner_get_row(
|
||||
// fuse row cache bypass the row cache
|
||||
} else if (context_->enable_put_row_cache() && param_->read_with_same_schema()) {
|
||||
ObRowCacheValue row_cache_value;
|
||||
if (OB_FAIL(row_cache_value.init(sstable_->get_key().get_start_log_ts(), row_))) {
|
||||
if (OB_FAIL(row_cache_value.init(sstable_->get_key().get_start_scn().get_val_for_tx(), row_))) {
|
||||
LOG_WARN("fail to init row cache value", K(ret), K(row_));
|
||||
} else {
|
||||
//put row cache, ignore fail
|
||||
@ -374,7 +374,7 @@ int ObMicroBlockRowGetter::inner_get_row(
|
||||
param_->tablet_id_,
|
||||
rowkey,
|
||||
read_info_->get_datum_utils(),
|
||||
sstable_->is_major_sstable() ? sstable_->get_snapshot_version() : sstable_->get_key().get_end_log_ts(),
|
||||
sstable_->is_major_sstable() ? sstable_->get_snapshot_version() : sstable_->get_key().get_end_scn().get_val_for_tx(),
|
||||
sstable_->get_key().table_type_);
|
||||
if (OB_SUCCESS == OB_STORE_CACHE.get_row_cache().put_row(row_cache_key, row_cache_value)) {
|
||||
context_->table_store_stat_.row_cache_put_cnt_++;
|
||||
|
@ -373,7 +373,7 @@ int ObCompactionDiagnoseMgr::diagnose_tenant_tablet()
|
||||
LOG_WARN("failed to add dignose info about freeze_info", K(tmp_ret), K(merged_version));
|
||||
}
|
||||
} else {
|
||||
compaction_scn = freeze_info.freeze_scn.get_val_for_inner_table_field();
|
||||
compaction_scn = freeze_info.freeze_scn.get_val_for_tx();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -105,9 +105,9 @@ public:
|
||||
public:
|
||||
// TODO(scn): change scn of int64_t type to palf::SCN
|
||||
int64_t get_max_filtered_end_scn_v0() { return max_filtered_end_scn_; }
|
||||
palf::SCN get_max_filtered_end_scn() { palf::SCN tmp_scn; tmp_scn.convert_for_lsn_allocator(max_filtered_end_scn_); return tmp_scn; }
|
||||
palf::SCN get_max_filtered_end_scn() { palf::SCN tmp_scn; tmp_scn.convert_for_tx(max_filtered_end_scn_); return tmp_scn; }
|
||||
int64_t get_recycle_scn_v0() { return filter_val_; }
|
||||
palf::SCN get_recycle_scn() { palf::SCN tmp_scn; tmp_scn.convert_for_lsn_allocator(filter_val_); return tmp_scn; }
|
||||
palf::SCN get_recycle_scn() { palf::SCN tmp_scn; tmp_scn.convert_for_tx(filter_val_); return tmp_scn; }
|
||||
|
||||
private:
|
||||
bool is_inited_;
|
||||
|
@ -112,7 +112,7 @@ int ObPartitionMergeIter::init_query_base_params(const ObMergeParameter &merge_p
|
||||
} else {
|
||||
if (merge_param.version_range_.snapshot_version_ >= palf::OB_MAX_SCN_TS_NS) {
|
||||
tmp_scn.set_max();
|
||||
} else if (OB_FAIL(tmp_scn.convert_for_lsn_allocator(merge_param.version_range_.snapshot_version_))) {
|
||||
} else if (OB_FAIL(tmp_scn.convert_for_tx(merge_param.version_range_.snapshot_version_))) {
|
||||
LOG_WARN("Failed to convert", K(ret), K_(merge_param.version_range_.snapshot_version));
|
||||
}
|
||||
}
|
||||
|
@ -72,7 +72,7 @@ int ObPartitionMergePolicy::get_neighbour_freeze_info(
|
||||
freeze_info.reset();
|
||||
freeze_info.next.freeze_scn.set_max();
|
||||
if (OB_NOT_NULL(last_major)) {
|
||||
freeze_info.prev.freeze_scn.convert_for_gts(last_major->get_snapshot_version());
|
||||
freeze_info.prev.freeze_scn.convert_for_tx(last_major->get_snapshot_version());
|
||||
}
|
||||
} else {
|
||||
LOG_WARN("Failed to get neighbour major freeze info", K(ret), K(snapshot_version));
|
||||
@ -132,7 +132,7 @@ int ObPartitionMergePolicy::find_mini_merge_tables(
|
||||
result.reset();
|
||||
// TODO: @dengzhi.ldz, remove max_snapshot_version, merge all forzen memtables
|
||||
// Keep max_snapshot_version currently because major merge must be done step by step
|
||||
int64_t max_snapshot_version = freeze_info.next.freeze_scn.get_val_for_inner_table_field();
|
||||
int64_t max_snapshot_version = freeze_info.next.freeze_scn.get_val_for_tx();
|
||||
ObITable *last_table = tablet.get_table_store().get_minor_sstables().get_boundary_table(true/*last*/);
|
||||
const palf::SCN last_minor_scn = nullptr == last_table ? tablet.get_clog_checkpoint_scn() : last_table->get_end_scn();
|
||||
|
||||
@ -323,8 +323,8 @@ int ObPartitionMergePolicy::get_boundary_snapshot_version(
|
||||
min_snapshot = last_major_table->get_snapshot_version();
|
||||
}
|
||||
} else {
|
||||
min_snapshot = freeze_info.prev.freeze_scn.get_val_for_inner_table_field();
|
||||
max_snapshot = freeze_info.next.freeze_scn.get_val_for_inner_table_field();
|
||||
min_snapshot = freeze_info.prev.freeze_scn.get_val_for_tx();
|
||||
max_snapshot = freeze_info.next.freeze_scn.get_val_for_tx();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -527,7 +527,7 @@ int ObPartitionMergePolicy::get_major_merge_tables(
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(base_table)) {
|
||||
const int64_t major_snapshot = MAX(base_table->get_snapshot_version(), freeze_info.freeze_scn.get_val_for_inner_table_field());
|
||||
const int64_t major_snapshot = MAX(base_table->get_snapshot_version(), freeze_info.freeze_scn.get_val_for_tx());
|
||||
result.read_base_version_ = base_table->get_snapshot_version();
|
||||
result.version_range_.snapshot_version_ = major_snapshot;
|
||||
result.create_snapshot_version_ = base_table->get_meta().get_basic_meta().create_snapshot_version_;
|
||||
@ -778,7 +778,7 @@ int ObPartitionMergePolicy::deal_hist_minor_merge(
|
||||
LOG_WARN("failed to get freeze info mgr from MTL", K(ret));
|
||||
} else if (OB_ISNULL(first_major_table = table_store.get_major_sstables().get_boundary_table(false))) {
|
||||
// index table during building, need compat with continuous multi version
|
||||
if (0 == (max_snapshot_version = freeze_info_mgr->get_latest_frozen_scn().get_val_for_inner_table_field())) {
|
||||
if (0 == (max_snapshot_version = freeze_info_mgr->get_latest_frozen_scn().get_val_for_tx())) {
|
||||
// no freeze info found, wait normal mini minor to free sstable
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
LOG_WARN("No freeze range to do hist minor merge for buiding index", K(ret), K(table_store));
|
||||
@ -870,7 +870,7 @@ int ObPartitionMergePolicy::check_need_major_merge(
|
||||
LOG_INFO("can't get freeze info after snapshot", K(ret), K(merge_version), K(major_sstable_version));
|
||||
}
|
||||
} else {
|
||||
can_merge = last_sstable_snapshot >= freeze_info.freeze_scn.get_val_for_inner_table_field();
|
||||
can_merge = last_sstable_snapshot >= freeze_info.freeze_scn.get_val_for_tx();
|
||||
if (!can_merge) {
|
||||
LOG_TRACE("tablet need merge, but cannot merge now", K(tablet_id), K(merge_version), K(last_sstable_snapshot), K(freeze_info));
|
||||
}
|
||||
@ -886,7 +886,7 @@ int ObPartitionMergePolicy::check_need_major_merge(
|
||||
// no frozen memtable, need force freeze
|
||||
need_force_freeze = true;
|
||||
} else {
|
||||
need_force_freeze = last_frozen_memtable->get_snapshot_version() < freeze_info.freeze_scn.get_val_for_inner_table_field();
|
||||
need_force_freeze = last_frozen_memtable->get_snapshot_version() < freeze_info.freeze_scn.get_val_for_tx();
|
||||
if (!need_force_freeze) {
|
||||
FLOG_INFO("tablet no need force freeze", K(ret), K(tablet_id), K(merge_version), K(freeze_info), KPC(last_frozen_memtable));
|
||||
}
|
||||
|
@ -657,10 +657,10 @@ int ObTabletMergeCtx::inner_init_for_major()
|
||||
} else if (get_merge_table_result.handle_.get_count() > 1
|
||||
&& !ObTenantTabletScheduler::check_tx_table_ready(
|
||||
*ls_handle_.get_ls(),
|
||||
get_merge_table_result.scn_range_.end_scn_.get_val_for_inner_table_field())) {
|
||||
get_merge_table_result.scn_range_.end_scn_.get_val_for_tx())) {
|
||||
ret = OB_EAGAIN;
|
||||
LOG_INFO("tx table is not ready. waiting for max_decided_log_ts ...",
|
||||
KR(ret), "merge_log_ts", get_merge_table_result.scn_range_.end_scn_.get_val_for_inner_table_field());
|
||||
KR(ret), "merge_scn", get_merge_table_result.scn_range_.end_scn_);
|
||||
} else if (OB_FAIL(get_basic_info_from_result(get_merge_table_result))) {
|
||||
LOG_WARN("failed to set basic info to ctx", K(ret), K(get_merge_table_result), KPC(this));
|
||||
} else if (OB_FAIL(get_table_schema_to_merge())) {
|
||||
@ -718,10 +718,10 @@ int ObTabletMergeCtx::inner_init_for_minor(bool &skip_rest_operation)
|
||||
}
|
||||
} else if (!ObTenantTabletScheduler::check_tx_table_ready(
|
||||
*ls_handle_.get_ls(),
|
||||
get_merge_table_result.scn_range_.end_scn_.get_val_for_inner_table_field())) {
|
||||
get_merge_table_result.scn_range_.end_scn_.get_val_for_tx())) {
|
||||
ret = OB_EAGAIN;
|
||||
LOG_INFO("tx table is not ready. waiting for max_decided_log_ts ...",
|
||||
KR(ret), "merge_log_ts", get_merge_table_result.scn_range_.end_scn_.get_val_for_inner_table_field());
|
||||
KR(ret), "merge_scn", get_merge_table_result.scn_range_.end_scn_);
|
||||
} else if (OB_FAIL(get_storage_schema_to_merge(get_merge_table_result.handle_, true/*get_schema_on_memtable*/))) {
|
||||
LOG_ERROR("Fail to get storage schema", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(get_basic_info_from_result(get_merge_table_result))) {
|
||||
@ -778,7 +778,7 @@ int ObTabletMergeCtx::update_tablet_directly(const ObGetMergeTablesResult &get_m
|
||||
schema_ctx_.storage_schema_,
|
||||
rebuild_seq,
|
||||
param_.is_major_merge(),
|
||||
0/*clog_checkpoint_ts*/);
|
||||
palf::SCN::min_scn()/*clog_checkpoint_scn*/);
|
||||
ObTabletHandle new_tablet_handle;
|
||||
if (OB_FAIL(ls_handle_.get_ls()->update_tablet_table_store(
|
||||
param_.tablet_id_, param, new_tablet_handle))) {
|
||||
@ -1127,15 +1127,15 @@ int ObTabletMergeCtx::generate_participant_table_info(char *buf, const int64_t b
|
||||
"[MAJOR]scn", tables_handle_.get_table(0)->get_snapshot_version());
|
||||
if (tables_handle_.get_count() > 1) {
|
||||
ADD_COMPACTION_INFO_PARAM(buf, buf_len,
|
||||
"[MINI]start_scn", tables_handle_.get_table(1)->get_start_scn().get_val_for_inner_table_field(),
|
||||
"end_scn", tables_handle_.get_table(tables_handle_.get_count() - 1)->get_end_scn().get_val_for_inner_table_field());
|
||||
"[MINI]start_scn", tables_handle_.get_table(1)->get_start_scn().get_val_for_tx(),
|
||||
"end_scn", tables_handle_.get_table(tables_handle_.get_count() - 1)->get_end_scn().get_val_for_tx());
|
||||
}
|
||||
} else {
|
||||
if (tables_handle_.get_count() > 0) {
|
||||
ADD_COMPACTION_INFO_PARAM(buf, buf_len,
|
||||
"table_cnt", tables_handle_.get_count(),
|
||||
"start_scn", tables_handle_.get_table(0)->get_start_scn().get_val_for_inner_table_field(),
|
||||
"end_scn", tables_handle_.get_table(tables_handle_.get_count() - 1)->get_end_scn().get_val_for_inner_table_field());
|
||||
"start_scn", tables_handle_.get_table(0)->get_start_scn().get_val_for_tx(),
|
||||
"end_scn", tables_handle_.get_table(tables_handle_.get_count() - 1)->get_end_scn().get_val_for_tx());
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -970,7 +970,7 @@ int ObTabletMergeFinishTask::add_sstable_for_merge(ObTabletMergeCtx &ctx)
|
||||
update_storage_schema,
|
||||
ctx.rebuild_seq_,
|
||||
ctx.param_.is_major_merge(),
|
||||
clog_checkpoint_scn.get_val_for_inner_table_field(),
|
||||
clog_checkpoint_scn,
|
||||
ctx.param_.is_mini_minor_merge());
|
||||
ObTablet *old_tablet = ctx.tablet_handle_.get_obj();
|
||||
ObTabletHandle new_tablet_handle;
|
||||
|
@ -223,9 +223,9 @@ int64_t ObTenantFreezeInfoMgr::find_pos_in_list_(
|
||||
while (l < r && ret_pos < 0) {
|
||||
mid = (l + r) >> 1;
|
||||
const FreezeInfo &tmp_info = info_list.at(mid);
|
||||
if (snapshot_version < tmp_info.freeze_scn.get_val_for_inner_table_field()) {
|
||||
if (snapshot_version < tmp_info.freeze_scn.get_val_for_tx()) {
|
||||
r = mid;
|
||||
} else if (snapshot_version > tmp_info.freeze_scn.get_val_for_inner_table_field()) {
|
||||
} else if (snapshot_version > tmp_info.freeze_scn.get_val_for_tx()) {
|
||||
l = mid + 1;
|
||||
} else {
|
||||
ret_pos = mid;
|
||||
@ -293,7 +293,7 @@ int ObTenantFreezeInfoMgr::get_freeze_info_behind_snapshot_version_(
|
||||
bool found = false;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && !found && i < info_list.count(); ++i) {
|
||||
FreezeInfo &tmp_info = info_list.at(i);
|
||||
if (snapshot_version < tmp_info.freeze_scn.get_val_for_inner_table_field()) {
|
||||
if (snapshot_version < tmp_info.freeze_scn.get_val_for_tx()) {
|
||||
freeze_info = tmp_info;
|
||||
found = true;
|
||||
}
|
||||
@ -330,7 +330,7 @@ int ObTenantFreezeInfoMgr::inner_get_neighbour_major_freeze(
|
||||
bool found = false;
|
||||
for (int64_t i = 0; i < info_list.count() && OB_SUCC(ret) && !found; ++i) {
|
||||
FreezeInfo &next_info = info_list.at(i);
|
||||
if (snapshot_version < next_info.freeze_scn.get_val_for_inner_table_field()) {
|
||||
if (snapshot_version < next_info.freeze_scn.get_val_for_tx()) {
|
||||
found = true;
|
||||
if (0 == i) {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
@ -446,14 +446,14 @@ int ObTenantFreezeInfoMgr::get_min_reserved_snapshot(
|
||||
}
|
||||
}
|
||||
snapshot_version = std::max(0L, snapshot_gc_ts_ - duration * 1000L * 1000L *1000L);
|
||||
snapshot_version = std::min(snapshot_version, static_cast<int64_t>(freeze_info.freeze_scn.get_val_for_inner_table_field()));
|
||||
snapshot_version = std::min(snapshot_version, static_cast<int64_t>(freeze_info.freeze_scn.get_val_for_tx()));
|
||||
for (int64_t i = 0; i < snapshots.count() && OB_SUCC(ret); ++i) {
|
||||
bool related = false;
|
||||
const ObSnapshotInfo &snapshot = snapshots.at(i);
|
||||
if (OB_FAIL(is_snapshot_related_to_tablet(tablet_id, snapshot, related))) {
|
||||
STORAGE_LOG(WARN, "fail to check snapshot relation", K(ret), K(tablet_id), K(snapshot));
|
||||
} else if (related) {
|
||||
snapshot_version = std::min(snapshot_version, (int64_t)snapshot.snapshot_scn_.get_val_for_lsn_allocator());
|
||||
snapshot_version = std::min(snapshot_version, (int64_t)snapshot.snapshot_scn_.get_val_for_tx());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -492,8 +492,8 @@ int ObTenantFreezeInfoMgr::diagnose_min_reserved_snapshot(
|
||||
}
|
||||
snapshot_version = std::max(0L, snapshot_gc_ts_ - duration * 1000L * 1000L);
|
||||
snapshot_from_type = "undo_retention";
|
||||
if (freeze_info.freeze_scn.get_val_for_inner_table_field() < snapshot_version) {
|
||||
snapshot_version = freeze_info.freeze_scn.get_val_for_inner_table_field();
|
||||
if (freeze_info.freeze_scn.get_val_for_tx() < snapshot_version) {
|
||||
snapshot_version = freeze_info.freeze_scn.get_val_for_tx();
|
||||
snapshot_from_type = "major_freeze_ts";
|
||||
}
|
||||
for (int64_t i = 0; i < snapshots.count() && OB_SUCC(ret); ++i) {
|
||||
@ -501,8 +501,8 @@ int ObTenantFreezeInfoMgr::diagnose_min_reserved_snapshot(
|
||||
const ObSnapshotInfo &snapshot = snapshots.at(i);
|
||||
if (OB_FAIL(is_snapshot_related_to_tablet(tablet_id, snapshot, related))) {
|
||||
STORAGE_LOG(WARN, "fail to check snapshot relation", K(ret), K(tablet_id), K(snapshot));
|
||||
} else if (related && snapshot.snapshot_scn_.get_val_for_lsn_allocator() < snapshot_version) {
|
||||
snapshot_version = snapshot.snapshot_scn_.get_val_for_lsn_allocator();
|
||||
} else if (related && snapshot.snapshot_scn_.get_val_for_tx() < snapshot_version) {
|
||||
snapshot_version = snapshot.snapshot_scn_.get_val_for_tx();
|
||||
snapshot_from_type = snapshot.get_snapshot_type_str();
|
||||
}
|
||||
}
|
||||
@ -728,7 +728,7 @@ int ObTenantFreezeInfoMgr::ReloadTask::get_global_info(int64_t &snapshot_gc_ts)
|
||||
}
|
||||
} else {
|
||||
// TODO SCN
|
||||
snapshot_gc_ts = snapshot_gc_scn.get_val_for_inner_table_field();
|
||||
snapshot_gc_ts = snapshot_gc_scn.get_val_for_tx();
|
||||
}
|
||||
|
||||
return ret;
|
||||
@ -756,9 +756,8 @@ int ObTenantFreezeInfoMgr::ReloadTask::get_freeze_info(
|
||||
} else {
|
||||
for (int64_t i = 0; i < tmp.count() && OB_SUCC(ret); ++i) {
|
||||
ObSimpleFrozenStatus &status = tmp.at(i);
|
||||
const uint64_t frozen_scn_val = status.frozen_scn_.get_val_for_inner_table_field();
|
||||
if (OB_FAIL(freeze_info.push_back(
|
||||
FreezeInfo(frozen_scn_val, status.schema_version_, status.cluster_version_)))) {
|
||||
FreezeInfo(status.frozen_scn_, status.schema_version_, status.cluster_version_)))) {
|
||||
STORAGE_LOG(WARN, "fail to push back freeze info", K(ret), K(status));
|
||||
}
|
||||
}
|
||||
@ -766,7 +765,7 @@ int ObTenantFreezeInfoMgr::ReloadTask::get_freeze_info(
|
||||
compaction::ADD_COMPACTION_EVENT(
|
||||
MTL_ID(),
|
||||
MAJOR_MERGE,
|
||||
tmp.at(tmp.count() - 1).frozen_scn_.get_val_for_inner_table_field(),
|
||||
tmp.at(tmp.count() - 1).frozen_scn_.get_val_for_tx(),
|
||||
compaction::ObServerCompactionEvent::GET_FREEZE_INFO,
|
||||
ObTimeUtility::fast_current_time(),
|
||||
"new_freeze_info_cnt",
|
||||
|
@ -54,7 +54,7 @@ public:
|
||||
FreezeInfo() : freeze_scn(), schema_version(-1), cluster_version(0) {}
|
||||
FreezeInfo(const int64_t scn, const int64_t schema_ver, const int64_t cluster_ver)
|
||||
: freeze_scn(), schema_version(schema_ver), cluster_version(cluster_ver) {
|
||||
freeze_scn.convert_for_gts(scn);
|
||||
freeze_scn.convert_for_tx(scn);
|
||||
}
|
||||
FreezeInfo(const palf::SCN &scn, const int64_t schema_ver, const int64_t cluster_ver)
|
||||
: freeze_scn(scn), schema_version(schema_ver), cluster_version(cluster_ver) {}
|
||||
|
@ -451,12 +451,12 @@ bool ObTenantTabletScheduler::check_weak_read_ts_ready(
|
||||
ObLS &ls)
|
||||
{
|
||||
bool is_ready_for_compaction = false;
|
||||
palf::SCN weak_read_ts;
|
||||
palf::SCN weak_read_scn;
|
||||
|
||||
if (FALSE_IT(weak_read_ts = ls.get_ls_wrs_handler()->get_ls_weak_read_ts())) {
|
||||
} else if (weak_read_ts.get_val_for_lsn_allocator() < merge_version) {
|
||||
if (FALSE_IT(weak_read_scn = ls.get_ls_wrs_handler()->get_ls_weak_read_ts())) {
|
||||
} else if (weak_read_scn.get_val_for_tx() < merge_version) {
|
||||
FLOG_INFO("current slave_read_ts is smaller than freeze_ts, try later",
|
||||
"ls_id", ls.get_ls_id(), K(merge_version), K(weak_read_ts));
|
||||
"ls_id", ls.get_ls_id(), K(merge_version), K(weak_read_scn));
|
||||
} else {
|
||||
is_ready_for_compaction = true;
|
||||
}
|
||||
|
@ -132,7 +132,7 @@ int ObIMemtableMgr::release_memtables(const int64_t log_ts)
|
||||
&& memtable->is_empty()
|
||||
&& !memtable->get_is_force_freeze()) {
|
||||
break;
|
||||
} else if (memtable->get_end_log_ts() <= log_ts
|
||||
} else if (memtable->get_end_scn().get_val_for_tx() <= log_ts
|
||||
&& memtable->can_be_minor_merged()) {
|
||||
if (OB_FAIL(release_head_memtable_(memtable))) {
|
||||
STORAGE_LOG(WARN, "fail to release memtable", K(ret), KPC(memtable));
|
||||
|
@ -129,8 +129,6 @@ public:
|
||||
OB_INLINE bool is_remote_logical_minor_sstable() const { return ObITable::is_remote_logical_minor_sstable(table_type_); }
|
||||
|
||||
OB_INLINE const common::ObTabletID &get_tablet_id() const { return tablet_id_; }
|
||||
OB_INLINE int64_t get_start_log_ts() const { return get_start_scn().get_val_for_inner_table_field(); }
|
||||
OB_INLINE int64_t get_end_log_ts() const { return get_end_scn().get_val_for_inner_table_field(); }
|
||||
OB_INLINE palf::SCN get_start_scn() const { return scn_range_.start_scn_; }
|
||||
OB_INLINE palf::SCN get_end_scn() const { return scn_range_.end_scn_; }
|
||||
OB_INLINE int64_t get_snapshot_version() const
|
||||
@ -205,8 +203,6 @@ public:
|
||||
const common::ObIArray<blocksstable::ObDatumRange> &ranges,
|
||||
ObStoreRowIterator *&row_iter) = 0;
|
||||
|
||||
virtual OB_INLINE int64_t get_start_log_ts() const;
|
||||
virtual OB_INLINE int64_t get_end_log_ts() const;
|
||||
virtual OB_INLINE palf::SCN get_start_scn() const;
|
||||
virtual OB_INLINE palf::SCN get_end_scn() const;
|
||||
virtual OB_INLINE share::ObScnRange &get_scn_range() { return key_.scn_range_; }
|
||||
@ -467,15 +463,6 @@ OB_INLINE bool ObITable::TableKey::is_valid() const
|
||||
return valid;
|
||||
}
|
||||
|
||||
OB_INLINE int64_t ObITable::get_start_log_ts() const
|
||||
{
|
||||
return key_.get_start_log_ts();
|
||||
}
|
||||
|
||||
OB_INLINE int64_t ObITable::get_end_log_ts() const
|
||||
{
|
||||
return key_.get_end_log_ts();
|
||||
}
|
||||
|
||||
OB_INLINE palf::SCN ObITable::get_start_scn() const
|
||||
{
|
||||
|
@ -211,7 +211,7 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam(
|
||||
const int64_t rebuild_seq)
|
||||
: table_handle_(),
|
||||
snapshot_version_(snapshot_version),
|
||||
clog_checkpoint_ts_(0),
|
||||
clog_checkpoint_scn_(),
|
||||
multi_version_start_(multi_version_start),
|
||||
keep_old_ddl_sstable_(true),
|
||||
need_report_(false),
|
||||
@ -223,6 +223,7 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam(
|
||||
ddl_start_scn_(),
|
||||
ddl_snapshot_version_(0)
|
||||
{
|
||||
clog_checkpoint_scn_.set_min();
|
||||
}
|
||||
|
||||
ObUpdateTableStoreParam::ObUpdateTableStoreParam(
|
||||
@ -232,11 +233,11 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam(
|
||||
const ObStorageSchema *storage_schema,
|
||||
const int64_t rebuild_seq,
|
||||
const bool need_report,
|
||||
const int64_t clog_checkpoint_ts,
|
||||
const palf::SCN clog_checkpoint_scn,
|
||||
const bool need_check_sstable)
|
||||
: table_handle_(table_handle),
|
||||
snapshot_version_(snapshot_version),
|
||||
clog_checkpoint_ts_(clog_checkpoint_ts),
|
||||
clog_checkpoint_scn_(),
|
||||
multi_version_start_(multi_version_start),
|
||||
keep_old_ddl_sstable_(true),
|
||||
need_report_(need_report),
|
||||
@ -248,6 +249,7 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam(
|
||||
ddl_start_scn_(),
|
||||
ddl_snapshot_version_(0)
|
||||
{
|
||||
clog_checkpoint_scn_ = clog_checkpoint_scn;
|
||||
}
|
||||
|
||||
ObUpdateTableStoreParam::ObUpdateTableStoreParam(
|
||||
@ -260,7 +262,7 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam(
|
||||
const bool need_report)
|
||||
: table_handle_(table_handle),
|
||||
snapshot_version_(snapshot_version),
|
||||
clog_checkpoint_ts_(0),
|
||||
clog_checkpoint_scn_(),
|
||||
multi_version_start_(0),
|
||||
keep_old_ddl_sstable_(keep_old_ddl_sstable),
|
||||
need_report_(need_report),
|
||||
@ -272,13 +274,14 @@ ObUpdateTableStoreParam::ObUpdateTableStoreParam(
|
||||
ddl_start_scn_(),
|
||||
ddl_snapshot_version_(0)
|
||||
{
|
||||
clog_checkpoint_scn_.set_min();
|
||||
}
|
||||
|
||||
bool ObUpdateTableStoreParam::is_valid() const
|
||||
{
|
||||
return multi_version_start_ >= ObVersionRange::MIN_VERSION
|
||||
&& snapshot_version_ >= ObVersionRange::MIN_VERSION
|
||||
&& clog_checkpoint_ts_ >= 0
|
||||
&& clog_checkpoint_scn_.is_valid()
|
||||
&& nullptr != storage_schema_
|
||||
&& storage_schema_->is_valid()
|
||||
&& rebuild_seq_ >= 0;
|
||||
@ -353,7 +356,7 @@ int ObBatchUpdateTableStoreParam::get_max_clog_checkpoint_ts(int64_t &clog_check
|
||||
} else if (!table->is_multi_version_minor_sstable()) {
|
||||
//do nothing
|
||||
} else {
|
||||
clog_checkpoint_ts = std::max(clog_checkpoint_ts, table->get_end_log_ts());
|
||||
clog_checkpoint_ts = std::max(clog_checkpoint_ts, table->get_end_scn().get_val_for_tx());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -299,7 +299,7 @@ struct ObUpdateTableStoreParam
|
||||
const ObStorageSchema *storage_schema,
|
||||
const int64_t rebuild_seq,
|
||||
const bool need_report = false,
|
||||
const int64_t clog_checkpoint_ts = 0,
|
||||
const palf::SCN clog_checkpoint_scn = palf::SCN::min_scn(),
|
||||
const bool need_check_sstable = false);
|
||||
|
||||
ObUpdateTableStoreParam( // for ddl merge task only
|
||||
@ -312,13 +312,13 @@ struct ObUpdateTableStoreParam
|
||||
const bool need_report = false);
|
||||
|
||||
bool is_valid() const;
|
||||
TO_STRING_KV(K_(table_handle), K_(snapshot_version), K_(clog_checkpoint_ts), K_(multi_version_start),
|
||||
TO_STRING_KV(K_(table_handle), K_(snapshot_version), K_(clog_checkpoint_scn), K_(multi_version_start),
|
||||
K_(keep_old_ddl_sstable), K_(need_report), KPC_(storage_schema), K_(rebuild_seq), K_(update_with_major_flag),
|
||||
K_(need_check_sstable), K_(ddl_checkpoint_scn), K_(ddl_start_scn), K_(ddl_snapshot_version));
|
||||
|
||||
ObTableHandleV2 table_handle_;
|
||||
int64_t snapshot_version_;
|
||||
int64_t clog_checkpoint_ts_;
|
||||
palf::SCN clog_checkpoint_scn_;
|
||||
int64_t multi_version_start_;
|
||||
bool keep_old_ddl_sstable_;
|
||||
bool need_report_;
|
||||
|
@ -117,7 +117,7 @@ int ObStorageTableGuard::refresh_and_protect_table(ObRelativeTable &relative_tab
|
||||
// the last ret code will be passed to upper levels
|
||||
if (OB_FAIL(store_ctx_.ls_->get_tablet_svr()->get_read_tables(
|
||||
tablet_id,
|
||||
store_ctx_.mvcc_acc_ctx_.get_snapshot_version().get_val_for_lsn_allocator(),
|
||||
store_ctx_.mvcc_acc_ctx_.get_snapshot_version().get_val_for_tx(),
|
||||
iter,
|
||||
relative_table.allow_not_ready()))) {
|
||||
LOG_WARN("fail to get read tables", K(ret), K(ls_id), K(tablet_id),
|
||||
|
@ -106,10 +106,7 @@ int ObLockTable::restore_lock_table_(ObITable &sstable)
|
||||
} else if (OB_FAIL(handle.get_lock_memtable(memtable))) {
|
||||
LOG_WARN("get_lock_memtable_ fail.", KR(ret));
|
||||
} else {
|
||||
// TODO: cxf remove this
|
||||
palf::SCN tmp;
|
||||
tmp.convert_for_lsn_allocator(sstable.get_end_log_ts());
|
||||
memtable->set_flushed_scn(tmp);
|
||||
memtable->set_flushed_scn(sstable.get_end_scn());
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(row_iter->get_next_row(row))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
|
@ -587,7 +587,7 @@ int ObMemtableArray::rebuild(common::ObIArray<ObTableHandleV2> &handle_array)
|
||||
LOG_ERROR("ObMemtableArray not inited", K(ret), KPC(this), K(handle_array));
|
||||
} else {
|
||||
ObITable *last_memtable = get_table(count() - 1);
|
||||
int64_t end_log_ts = (NULL == last_memtable) ? 0 : last_memtable->get_end_log_ts();
|
||||
palf::SCN end_scn = (NULL == last_memtable) ? palf::SCN::min_scn() : last_memtable->get_end_scn();
|
||||
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < handle_array.count(); ++i) {
|
||||
memtable::ObMemtable *memtable = nullptr;
|
||||
@ -598,8 +598,8 @@ int ObMemtableArray::rebuild(common::ObIArray<ObTableHandleV2> &handle_array)
|
||||
} else if (FALSE_IT(memtable = reinterpret_cast<memtable::ObMemtable *>(table))) {
|
||||
} else if (memtable->is_empty()) {
|
||||
FLOG_INFO("Empty memtable discarded", KPC(memtable));
|
||||
} else if (table->get_end_log_ts() < end_log_ts) {
|
||||
} else if (table->get_end_log_ts() == end_log_ts && table == last_memtable) { //fix issue 41996395
|
||||
} else if (table->get_end_scn() < end_scn) {
|
||||
} else if (table->get_end_scn() == end_scn && table == last_memtable) { //fix issue 41996395
|
||||
} else if (OB_FAIL(add_table(table))) {
|
||||
LOG_WARN("failed to add memtable to curr memtables", K(ret), KPC(this));
|
||||
}
|
||||
@ -941,13 +941,13 @@ void ObPrintTableStoreIterator::table_to_string(
|
||||
? (static_cast<ObSSTable *>(table)->get_meta().get_basic_meta().contain_uncommitted_row_ ? "true" : "false")
|
||||
: "unused";
|
||||
|
||||
BUF_PRINTF(" %-10s %-19p %-19lu %-19lu %-19lu %-19lu %-4ld %-16s ",
|
||||
BUF_PRINTF(" %-10s %-19p %-19lu %-19lu %-19s %-19s %-4ld %-16s ",
|
||||
table_name,
|
||||
reinterpret_cast<const void *>(table),
|
||||
table->get_upper_trans_version(),
|
||||
table->get_max_merged_trans_version(),
|
||||
table->get_start_scn().get_val_for_inner_table_field(),
|
||||
table->get_end_scn().get_val_for_inner_table_field(),
|
||||
to_cstring(table->get_start_scn()),
|
||||
to_cstring(table->get_end_scn()),
|
||||
table->get_ref(),
|
||||
uncommit_row);
|
||||
} else {
|
||||
|
@ -221,7 +221,6 @@ int ObTablet::init(
|
||||
int64_t max_sync_schema_version = 0;
|
||||
int64_t input_max_sync_schema_version = 0;
|
||||
allocator_ = &(MTL(ObTenantMetaMemMgr*)->get_tenant_allocator());
|
||||
palf::SCN tmp_clog_checkpoint_scn;
|
||||
if (OB_UNLIKELY(is_inited_)) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("init twice", K(ret), K(is_inited_));
|
||||
@ -239,12 +238,10 @@ int ObTablet::init(
|
||||
} else if (FALSE_IT(input_max_sync_schema_version = MIN(MAX(param.storage_schema_->schema_version_,
|
||||
old_tablet.storage_schema_.schema_version_), max_sync_schema_version))) {
|
||||
// use min schema version to avoid lose storage_schema in replay/reboot
|
||||
} else if (OB_FAIL(tmp_clog_checkpoint_scn.convert_for_tx(param.clog_checkpoint_ts_))) {
|
||||
LOG_WARN("convert for tx failed", K(ret));
|
||||
} else if (OB_FAIL(tablet_meta_.init(*allocator_, old_tablet.tablet_meta_,
|
||||
param.snapshot_version_, param.multi_version_start_,
|
||||
tx_data, ddl_data, autoinc_seq, input_max_sync_schema_version,
|
||||
tmp_clog_checkpoint_scn, param.ddl_checkpoint_scn_, param.ddl_start_scn_, param.ddl_snapshot_version_))) {
|
||||
param.clog_checkpoint_scn_, param.ddl_checkpoint_scn_, param.ddl_start_scn_, param.ddl_snapshot_version_))) {
|
||||
LOG_WARN("failed to init tablet meta", K(ret), K(old_tablet), K(param),
|
||||
K(tx_data), K(ddl_data), K(autoinc_seq), K(input_max_sync_schema_version));
|
||||
} else if (OB_FAIL(table_store_.init(*allocator_, this, param, old_tablet.table_store_))) {
|
||||
@ -1131,7 +1128,7 @@ int ObTablet::update_upper_trans_version(ObLS &ls, bool &is_updated)
|
||||
if (OB_FAIL(ls.get_upper_trans_version_before_given_scn(
|
||||
sstable->get_end_scn(), tmp_scn))) {
|
||||
LOG_WARN("failed to get upper trans version before given log ts", K(ret), KPC(sstable));
|
||||
} else if (FALSE_IT(max_trans_version = tmp_scn.is_max() ? INT64_MAX : tmp_scn.get_val_for_lsn_allocator())) {
|
||||
} else if (FALSE_IT(max_trans_version = tmp_scn.get_val_for_tx())) {
|
||||
} else if (0 == max_trans_version) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("max trans version should not be 0", KPC(sstable));
|
||||
@ -1308,7 +1305,7 @@ int ObTablet::do_rowkey_exists(
|
||||
LOG_WARN("invalid argument", K(ret), K(store_ctx), K(rowkey), K(query_flag));
|
||||
} else if (OB_FAIL(allow_to_read_())) {
|
||||
LOG_WARN("not allowed to read", K(ret), K(tablet_meta_));
|
||||
} else if (OB_FAIL(get_read_tables(store_ctx.mvcc_acc_ctx_.get_snapshot_version().get_val_for_lsn_allocator(),
|
||||
} else if (OB_FAIL(get_read_tables(store_ctx.mvcc_acc_ctx_.get_snapshot_version().get_val_for_tx(),
|
||||
table_iter,
|
||||
query_flag.index_invalid_))) {
|
||||
LOG_WARN("get read iterator fail", K(ret));
|
||||
@ -1442,7 +1439,7 @@ int ObTablet::rowkeys_exists(
|
||||
LOG_WARN("tablet id doesn't match", K(ret), K(relative_table.get_tablet_id()), K(tablet_meta_.tablet_id_));
|
||||
} else if (OB_FAIL(allow_to_read_())) {
|
||||
LOG_WARN("not allowed to read", K(ret), K(tablet_meta_));
|
||||
} else if (OB_FAIL(get_read_tables(store_ctx.mvcc_acc_ctx_.get_snapshot_version().get_val_for_lsn_allocator(),
|
||||
} else if (OB_FAIL(get_read_tables(store_ctx.mvcc_acc_ctx_.get_snapshot_version().get_val_for_tx(),
|
||||
tables_iter,
|
||||
relative_table.allow_not_ready()))) {
|
||||
LOG_WARN("get read iterator fail", K(ret));
|
||||
@ -1672,7 +1669,7 @@ int ObTablet::release_memtables(const palf::SCN scn)
|
||||
LOG_WARN("not inited", K(ret), K_(is_inited));
|
||||
} else if (OB_FAIL(get_memtable_mgr(memtable_mgr))) {
|
||||
LOG_WARN("failed to get memtable mgr", K(ret));
|
||||
} else if (OB_FAIL(memtable_mgr->release_memtables(scn.get_val_for_gts()))) {
|
||||
} else if (OB_FAIL(memtable_mgr->release_memtables(scn.get_val_for_tx()))) {
|
||||
LOG_WARN("failed to release memtables", K(ret), K(scn));
|
||||
}
|
||||
|
||||
|
@ -481,9 +481,7 @@ int ObTabletMemtableMgr::get_memtable_for_replay(palf::SCN replay_scn,
|
||||
handle.reset();
|
||||
LOG_WARN("fail to get data memtable", K(ret));
|
||||
} else {
|
||||
int64_t start_log_ts = memtable->get_start_log_ts();
|
||||
int64_t end_log_ts = memtable->get_end_log_ts();
|
||||
if (replay_scn.get_val_for_lsn_allocator() > start_log_ts && replay_scn.get_val_for_lsn_allocator() <= end_log_ts) {
|
||||
if (replay_scn > memtable->get_start_scn() && replay_scn <= memtable->get_end_scn()) {
|
||||
break;
|
||||
} else {
|
||||
handle.reset();
|
||||
@ -820,12 +818,12 @@ int ObTabletMemtableMgr::find_start_pos_(const int64_t start_log_ts,
|
||||
if (OB_ISNULL(memtable)) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_ERROR("memtable must not null", K(ret));
|
||||
} else if (memtable->get_end_log_ts() == start_log_ts) {
|
||||
} else if (memtable->get_end_scn().get_val_for_tx() == start_log_ts) {
|
||||
if (memtable->get_snapshot_version() > start_snapshot_version) {
|
||||
start_pos = i;
|
||||
break;
|
||||
}
|
||||
} else if (memtable->get_end_log_ts() > start_log_ts) {
|
||||
} else if (memtable->get_end_scn().get_val_for_tx() > start_log_ts) {
|
||||
start_pos = i;
|
||||
break;
|
||||
}
|
||||
|
@ -778,8 +778,8 @@ int ObTabletTableStore::build_minor_tables(
|
||||
// no minor tables to override new_table, skip to add new_table
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
LOG_WARN("No minor tables in old store, cannot add a minor sstable", K(ret), K(param), KPC(new_table), K(old_store));
|
||||
} else if (new_table->get_end_log_ts() < old_minor_tables.get_boundary_table(false/*first*/)->get_start_log_ts()
|
||||
|| new_table->get_start_log_ts() > old_minor_tables.get_boundary_table(true/*last*/)->get_end_log_ts()) {
|
||||
} else if (new_table->get_end_scn() < old_minor_tables.get_boundary_table(false/*first*/)->get_start_scn()
|
||||
|| new_table->get_start_scn() > old_minor_tables.get_boundary_table(true/*last*/)->get_end_scn()) {
|
||||
ret = OB_NO_NEED_MERGE;
|
||||
LOG_WARN("No minor tables covered by new minor table in old store, cannot add the new minor table",
|
||||
K(ret), K(param), KPC(new_table), K(old_store));
|
||||
@ -1289,7 +1289,7 @@ int ObTabletTableStore::build_ha_ddl_tables_(
|
||||
break;
|
||||
} else if (!new_table->is_ddl_sstable()) {
|
||||
//do nothing
|
||||
} else if (OB_NOT_NULL(last_ddl_table) && new_table->get_start_log_ts() != last_ddl_table->get_end_log_ts()) {
|
||||
} else if (OB_NOT_NULL(last_ddl_table) && new_table->get_start_scn() != last_ddl_table->get_end_scn()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ddl table is not continue", K(ret), K(param), K(old_store));
|
||||
} else if (OB_FAIL(ddl_tables.push_back(new_table))) {
|
||||
@ -1304,7 +1304,7 @@ int ObTabletTableStore::build_ha_ddl_tables_(
|
||||
if (OB_ISNULL(new_table) || !new_table->is_ddl_sstable()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("new table is null or table type is unexpected", K(ret), KPC(new_table));
|
||||
} else if (OB_NOT_NULL(last_ddl_table) && new_table->get_start_log_ts() != last_ddl_table->get_end_log_ts()) {
|
||||
} else if (OB_NOT_NULL(last_ddl_table) && new_table->get_start_scn() != last_ddl_table->get_end_scn()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("ddl table is not continue", K(ret), K(param), K(old_store));
|
||||
} else if (OB_FAIL(ddl_tables.push_back(new_table))) {
|
||||
|
@ -226,8 +226,8 @@ void TestCompactionPolicy::generate_table_key(
|
||||
table_key.version_range_.base_version_ = start_scn;
|
||||
table_key.version_range_.snapshot_version_ = end_scn;
|
||||
} else {
|
||||
table_key.scn_range_.start_scn_.convert_for_gts(start_scn);
|
||||
table_key.scn_range_.end_scn_.convert_for_gts(end_scn);
|
||||
table_key.scn_range_.start_scn_.convert_for_tx(start_scn);
|
||||
table_key.scn_range_.end_scn_.convert_for_tx(end_scn);
|
||||
}
|
||||
}
|
||||
|
||||
@ -306,7 +306,7 @@ int TestCompactionPolicy::mock_memtable(
|
||||
mt_mgr->clean_tail_memtable_();
|
||||
} else if (palf::OB_MAX_SCN_TS_NS != end_border) { // frozen memtable
|
||||
palf::SCN snapshot_scn;
|
||||
snapshot_scn.convert_for_lsn_allocator(snapshot_version);
|
||||
snapshot_scn.convert_for_tx(snapshot_version);
|
||||
memtable->snapshot_version_ = snapshot_scn;
|
||||
memtable->write_ref_cnt_ = 0;
|
||||
memtable->unsynced_cnt_ = 0;
|
||||
@ -733,13 +733,13 @@ TEST_F(TestCompactionPolicy, check_mini_merge_basic)
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(result.update_tablet_directly_, true);
|
||||
|
||||
tablet_handle_.get_obj()->tablet_meta_.clog_checkpoint_scn_.convert_for_lsn_allocator(280);
|
||||
tablet_handle_.get_obj()->tablet_meta_.clog_checkpoint_scn_.convert_for_tx(280);
|
||||
tablet_handle_.get_obj()->tablet_meta_.snapshot_version_ = 280;
|
||||
result.reset();
|
||||
ret = ObPartitionMergePolicy::get_mini_merge_tables(param, 0, *tablet_handle_.get_obj(), result);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(3, result.handle_.get_count());
|
||||
ASSERT_EQ(300, result.scn_range_.end_scn_.get_val_for_lsn_allocator());
|
||||
ASSERT_EQ(300, result.scn_range_.end_scn_.get_val_for_tx());
|
||||
ASSERT_EQ(result.update_tablet_directly_, true);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user