clear some TODOs about SCN at storage layer
This commit is contained in:
parent
9842295897
commit
caf3220c8c
@ -66,11 +66,12 @@ ObTableAccessContext::ObTableAccessContext()
|
||||
out_cnt_(0),
|
||||
trans_version_range_(),
|
||||
range_array_pos_(nullptr),
|
||||
merge_log_ts_(INT_MAX),
|
||||
merge_scn_(),
|
||||
lob_locator_helper_(nullptr),
|
||||
iter_pool_(nullptr),
|
||||
block_row_store_(nullptr)
|
||||
{
|
||||
merge_scn_.set_max();
|
||||
}
|
||||
|
||||
ObTableAccessContext::~ObTableAccessContext()
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include "ob_table_access_param.h"
|
||||
#include "storage/lob/ob_lob_locator.h"
|
||||
#include "storage/tx/ob_defensive_check_mgr.h"
|
||||
#include "logservice/palf/scn.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -101,7 +102,7 @@ struct ObTableAccessContext
|
||||
KP_(table_scan_stat),
|
||||
K_(out_cnt),
|
||||
K_(trans_version_range),
|
||||
K_(merge_log_ts),
|
||||
K_(merge_scn),
|
||||
K_(lob_locator_helper),
|
||||
KP_(iter_pool),
|
||||
KP_(block_row_store));
|
||||
@ -132,7 +133,7 @@ public:
|
||||
int64_t out_cnt_;
|
||||
common::ObVersionRange trans_version_range_;
|
||||
const common::ObSEArray<int64_t, 4, common::ModulePageAllocator> *range_array_pos_;
|
||||
int64_t merge_log_ts_;
|
||||
palf::SCN merge_scn_;
|
||||
ObLobLocatorHelper *lob_locator_helper_;
|
||||
ObStoreRowIterPool *iter_pool_;
|
||||
ObBlockRowStore *block_row_store_;
|
||||
|
@ -1995,12 +1995,10 @@ int ObMultiVersionMicroBlockMinorMergeRowScanner::get_trans_state(
|
||||
int ret = OB_SUCCESS;
|
||||
//get trans status & committed_trans_version_
|
||||
SCN scn_commit_trans_version = SCN::max_scn();
|
||||
SCN merge_scn;
|
||||
merge_scn.convert_for_lsn_allocator(context_->merge_log_ts_);
|
||||
auto &tx_table_guard = context_->store_ctx_->mvcc_acc_ctx_.get_tx_table_guard();
|
||||
int64_t read_epoch = tx_table_guard.epoch();;
|
||||
if (OB_FAIL(tx_table_guard.get_tx_table()->get_tx_state_with_scn(
|
||||
trans_id, merge_scn, read_epoch, state, scn_commit_trans_version))) {
|
||||
trans_id, context_->merge_scn_, read_epoch, state, scn_commit_trans_version))) {
|
||||
LOG_WARN("get transaction status failed", K(ret), K(trans_id), K(state));
|
||||
} else {
|
||||
commit_trans_version = scn_commit_trans_version.get_val_for_tx();
|
||||
|
@ -140,7 +140,7 @@ int ObPartitionMergeIter::init_query_base_params(const ObMergeParameter &merge_p
|
||||
LOG_WARN("Failed to init table access context", K(ret), K(query_flag));
|
||||
} else {
|
||||
// always use end_scn for safety
|
||||
access_context_.merge_log_ts_ = merge_param.scn_range_.end_scn_.get_val_for_inner_table_field();
|
||||
access_context_.merge_scn_ = merge_param.scn_range_.end_scn_;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -193,14 +193,7 @@ int ObTabletMergeInfo::build_create_sstable_param(const ObTabletMergeCtx &ctx,
|
||||
table_key.scn_range_ = ctx.scn_range_;
|
||||
}
|
||||
param.table_key_ = table_key;
|
||||
// TODO(scn):
|
||||
if (INT64_MAX == ctx.merge_scn_) {
|
||||
param.filled_tx_scn_.set_max();
|
||||
} else {
|
||||
if (OB_FAIL(param.filled_tx_scn_.convert_for_lsn_allocator(ctx.merge_scn_))) {
|
||||
LOG_WARN("failed to convert for gts", K(ret), K(ctx));
|
||||
}
|
||||
}
|
||||
param.filled_tx_scn_ = ctx.merge_scn_;
|
||||
|
||||
param.table_mode_ = ctx.schema_ctx_.merge_schema_->get_table_mode_struct();
|
||||
param.index_type_ = ctx.schema_ctx_.merge_schema_->get_index_type();
|
||||
@ -489,7 +482,7 @@ ObTabletMergeCtx::ObTabletMergeCtx(
|
||||
allocator_(allocator),
|
||||
sstable_version_range_(),
|
||||
scn_range_(),
|
||||
merge_scn_(INT64_MAX),
|
||||
merge_scn_(),
|
||||
create_snapshot_version_(0),
|
||||
tables_handle_(),
|
||||
merged_table_handle_(),
|
||||
@ -511,6 +504,7 @@ ObTabletMergeCtx::ObTabletMergeCtx(
|
||||
time_guard_(),
|
||||
rebuild_seq_(-1)
|
||||
{
|
||||
merge_scn_.set_max();
|
||||
}
|
||||
|
||||
ObTabletMergeCtx::~ObTabletMergeCtx()
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include "storage/compaction/ob_tablet_merge_task.h"
|
||||
#include "storage/tx_storage/ob_ls_map.h"
|
||||
#include "storage/tx_storage/ob_ls_handle.h"
|
||||
#include "logservice/palf/scn.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -204,7 +205,7 @@ struct ObTabletMergeCtx
|
||||
// 2. filled in ObPartitionStore::get_merge_tables
|
||||
ObVersionRange sstable_version_range_;// version range for new sstable
|
||||
share::ObScnRange scn_range_;
|
||||
int64_t merge_scn_; // TODO(scn), TODO(danling): change type from int64_t to palf::SCN
|
||||
palf::SCN merge_scn_;
|
||||
int64_t create_snapshot_version_;
|
||||
|
||||
storage::ObTablesHandleArray tables_handle_;
|
||||
|
@ -617,7 +617,7 @@ int ObTabletMergePrepareTask::process()
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("Unexcepted empty log ts range in minor merge", K(ret), K(ctx->scn_range_));
|
||||
} else {
|
||||
ctx->merge_scn_ = ctx->scn_range_.end_scn_.get_val_for_inner_table_field();
|
||||
ctx->merge_scn_ = ctx->scn_range_.end_scn_;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -145,7 +145,7 @@ int ObTxTableMergePrepareTask::process()
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("Unexcepted empty log ts range in minor merge", K(ret), K(ctx->scn_range_));
|
||||
} else {
|
||||
ctx->merge_scn_ = ctx->scn_range_.end_scn_.get_val_for_inner_table_field();
|
||||
ctx->merge_scn_ = ctx->scn_range_.end_scn_;
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
|
@ -632,8 +632,8 @@ int ObStorageHATabletsBuilder::get_remote_logical_minor_scn_range_(
|
||||
int ret = OB_SUCCESS;
|
||||
scn_range.reset();
|
||||
ObArray<ObITable *> sstables;
|
||||
scn_range.start_scn_.convert_for_gts(ObTabletMeta::INIT_CLOG_CHECKPOINT_TS);
|
||||
scn_range.end_scn_.convert_for_gts(ObTabletMeta::INIT_CLOG_CHECKPOINT_TS);
|
||||
scn_range.start_scn_ = ObTabletMeta::INIT_CLOG_CHECKPOINT_SCN;
|
||||
scn_range.end_scn_ = ObTabletMeta::INIT_CLOG_CHECKPOINT_SCN;
|
||||
|
||||
if (!is_inited_) {
|
||||
ret = OB_NOT_INIT;
|
||||
|
@ -656,7 +656,7 @@ int ObTabletTableBackfillTXTask::prepare_merge_ctx_()
|
||||
tablet_merge_ctx_.sstable_version_range_.multi_version_start_ = tablet_handle_.get_obj()->get_multi_version_start();
|
||||
tablet_merge_ctx_.sstable_version_range_.snapshot_version_ = tablet_handle_.get_obj()->get_snapshot_version();
|
||||
tablet_merge_ctx_.scn_range_ = table_handle_.get_table()->get_key().scn_range_;
|
||||
tablet_merge_ctx_.merge_scn_ = backfill_tx_ctx_->log_sync_scn_.get_val_for_lsn_allocator();
|
||||
tablet_merge_ctx_.merge_scn_ = backfill_tx_ctx_->log_sync_scn_;
|
||||
tablet_merge_ctx_.create_snapshot_version_ = 0;
|
||||
|
||||
if (OB_FAIL(tablet_merge_ctx_.tables_handle_.add_table(table_handle_))) {
|
||||
|
@ -910,11 +910,7 @@ int ObMemtableMultiVersionScanIterator::init_next_value_iter()
|
||||
ret = (OB_SUCCESS == ret) ? OB_ERR_UNEXPECTED : ret;
|
||||
} else {
|
||||
key_first_row_ = true;
|
||||
// TODO(handora.qc): fix it
|
||||
palf::SCN merge_scn;
|
||||
merge_scn.convert_for_lsn_allocator(context_->merge_log_ts_);
|
||||
|
||||
value_iter_->set_merge_scn(merge_scn);
|
||||
value_iter_->set_merge_scn(context_->merge_scn_);
|
||||
row_checker_.reset();
|
||||
}
|
||||
return ret;
|
||||
|
@ -1632,7 +1632,7 @@ int ObPartitionIncrementalRangeSpliter::ObIncrementalIterator::prepare_table_acc
|
||||
if (OB_FAIL(tbl_xs_ctx_.init(query_flag, store_ctx_, allocator_, allocator_, scan_version_range))) {
|
||||
STORAGE_LOG(WARN, "Failed to init table access context", KR(ret));
|
||||
} else {
|
||||
tbl_xs_ctx_.merge_log_ts_ = merge_ctx_.merge_scn_;
|
||||
tbl_xs_ctx_.merge_scn_ = merge_ctx_.merge_scn_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -92,7 +92,7 @@ ObStorageSchemaRecorder::ObStorageSchemaRecorder()
|
||||
max_saved_table_version_(OB_INVALID_VERSION),
|
||||
clog_buf_(nullptr),
|
||||
clog_len_(0),
|
||||
clog_ts_(-1),
|
||||
clog_scn_(),
|
||||
schema_guard_(nullptr),
|
||||
storage_schema_(nullptr),
|
||||
allocator_(nullptr),
|
||||
@ -120,7 +120,7 @@ void ObStorageSchemaRecorder::reset()
|
||||
ls_id_.reset();
|
||||
tablet_id_.reset();
|
||||
tablet_handle_.reset();
|
||||
clog_ts_ = -1;
|
||||
clog_scn_.reset();
|
||||
clog_len_ = 0;
|
||||
}
|
||||
|
||||
@ -283,7 +283,7 @@ int ObStorageSchemaRecorder::try_update_storage_schema(
|
||||
}
|
||||
} else { // sync schema clog success
|
||||
FLOG_INFO("finish save table schema", K_(ls_id), K_(tablet_id), K(sync_table_version),
|
||||
"schema_version", storage_schema_->get_schema_version(), K_(clog_ts), K(timeout));
|
||||
"schema_version", storage_schema_->get_schema_version(), K_(clog_scn), K(timeout));
|
||||
}
|
||||
}
|
||||
|
||||
@ -342,9 +342,9 @@ void ObStorageSchemaRecorder::update_table_schema_succ(
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_ERROR("schema log with smaller table version", K(ret), K_(tablet_id),
|
||||
K(table_version), K(max_saved_table_version_));
|
||||
} else if (OB_UNLIKELY(clog_ts_ <= 0 || nullptr == storage_schema_)) {
|
||||
} else if (OB_UNLIKELY(!clog_scn_.is_valid() || clog_scn_.is_min() || nullptr == storage_schema_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("clog ts or storage schema is invalid", K(ret), K_(clog_ts), KP_(storage_schema));
|
||||
LOG_WARN("clog ts or storage schema is invalid", K(ret), K_(clog_scn), KP_(storage_schema));
|
||||
} else if (storage_schema_->get_schema_version() != table_version) {
|
||||
LOG_ERROR("schema version not match", K(storage_schema_), K(table_version));
|
||||
}
|
||||
@ -369,7 +369,7 @@ int ObStorageSchemaRecorder::dec_ref_on_memtable(const bool sync_finish)
|
||||
KP_(storage_schema), K_(tablet_handle));
|
||||
} else {
|
||||
storage_schema_->set_sync_finish(sync_finish);
|
||||
if (OB_FAIL(tablet_handle_.get_obj()->save_multi_source_data_unit(storage_schema_, clog_ts_,
|
||||
if (OB_FAIL(tablet_handle_.get_obj()->save_multi_source_data_unit(storage_schema_, clog_scn_.get_val_for_tx(),
|
||||
false/*for_replay*/, memtable::MemtableRefOp::DEC_REF, true/*is_callback*/))) {
|
||||
LOG_WARN("failed to save storage schema", K(ret), K_(tablet_id), K(storage_schema_));
|
||||
}
|
||||
@ -471,9 +471,8 @@ int ObStorageSchemaRecorder::submit_schema_log(const int64_t table_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const bool need_nonblock = false;
|
||||
const int64_t ref_ts_ns = 0;
|
||||
palf::LSN lsn;
|
||||
clog_ts_ = -1;
|
||||
clog_scn_.reset();
|
||||
|
||||
if (OB_UNLIKELY(nullptr == log_handler_ || nullptr == storage_schema_
|
||||
|| !tablet_handle_.is_valid()
|
||||
@ -501,7 +500,7 @@ int ObStorageSchemaRecorder::submit_schema_log(const int64_t table_id)
|
||||
if (OB_BLOCK_FROZEN != ret) {
|
||||
LOG_WARN("failed to inc ref for storage schema", K(ret), K_(tablet_id), K(storage_schema_));
|
||||
}
|
||||
} else if (OB_FAIL(log_handler_->append(clog_buf_, clog_len_, ref_ts_ns, need_nonblock, logcb_ptr_, lsn, clog_ts_))) {
|
||||
} else if (OB_FAIL(log_handler_->append(clog_buf_, clog_len_, palf::SCN::min_scn(), need_nonblock, logcb_ptr_, lsn, clog_scn_))) {
|
||||
LOG_WARN("fail to submit log", K(ret), K_(tablet_id));
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(dec_ref_on_memtable(false))) {
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include "storage/ob_storage_schema.h"
|
||||
#include "storage/meta_mem/ob_tablet_handle.h"
|
||||
#include "share/schema/ob_multi_version_schema_service.h"
|
||||
#include "logservice/palf/scn.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -127,7 +128,7 @@ private:
|
||||
int64_t max_saved_table_version_;
|
||||
char *clog_buf_;
|
||||
int64_t clog_len_;
|
||||
int64_t clog_ts_;
|
||||
palf::SCN clog_scn_;
|
||||
|
||||
share::schema::ObSchemaGetterGuard *schema_guard_;
|
||||
ObStorageSchema *storage_schema_;
|
||||
|
@ -26,7 +26,7 @@ using namespace blocksstable;
|
||||
using namespace palf;
|
||||
namespace storage
|
||||
{
|
||||
const int64_t ObTabletMeta::INIT_CLOG_CHECKPOINT_TS = 1;
|
||||
const palf::SCN ObTabletMeta::INIT_CLOG_CHECKPOINT_SCN = palf::SCN::base_scn();
|
||||
|
||||
ObTabletMeta::ObTabletMeta()
|
||||
: version_(TABLET_META_VERSION),
|
||||
|
@ -43,7 +43,7 @@ struct ObMigrationTabletParam;
|
||||
class ObTabletMeta final
|
||||
{
|
||||
public:
|
||||
static const int64_t INIT_CLOG_CHECKPOINT_TS;
|
||||
static const palf::SCN INIT_CLOG_CHECKPOINT_SCN;
|
||||
|
||||
public:
|
||||
ObTabletMeta();
|
||||
|
@ -1394,13 +1394,9 @@ int ObTabletTableStore::combin_ha_minor_sstables_(
|
||||
common::ObIArray<ObITable *> &new_minor_sstables)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
//TODO(SCN) TabletMeta::INIT_CLOG_CHECKPOINT_TS should be SCN
|
||||
palf::SCN logical_start_scn;
|
||||
logical_start_scn.convert_for_lsn_allocator(ObTabletMeta::INIT_CLOG_CHECKPOINT_TS);
|
||||
palf::SCN logical_end_scn;
|
||||
logical_end_scn.convert_for_lsn_allocator(ObTabletMeta::INIT_CLOG_CHECKPOINT_TS);
|
||||
palf::SCN max_copy_end_scn;
|
||||
max_copy_end_scn.convert_for_lsn_allocator(ObTabletMeta::INIT_CLOG_CHECKPOINT_TS);
|
||||
palf::SCN logical_start_scn = ObTabletMeta::INIT_CLOG_CHECKPOINT_SCN;
|
||||
palf::SCN logical_end_scn = ObTabletMeta::INIT_CLOG_CHECKPOINT_SCN;
|
||||
palf::SCN max_copy_end_scn = ObTabletMeta::INIT_CLOG_CHECKPOINT_SCN;
|
||||
int64_t old_store_minor_tables_index = 0;
|
||||
|
||||
//get remote logical minor sstable log ts
|
||||
@ -1441,7 +1437,7 @@ int ObTabletTableStore::combin_ha_minor_sstables_(
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && !found && ObTabletMeta::INIT_CLOG_CHECKPOINT_TS != logical_end_scn.get_val_for_inner_table_field()) {
|
||||
if (OB_SUCC(ret) && !found && ObTabletMeta::INIT_CLOG_CHECKPOINT_SCN != logical_end_scn) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("can not find table continue with old table store", K(ret),
|
||||
K(old_store_minor_sstables), K(logical_start_scn), K(logical_end_scn), K(need_add_minor_sstables));
|
||||
|
Loading…
x
Reference in New Issue
Block a user