replace log_ts with scn
This commit is contained in:
parent
cdf5734cac
commit
9842295897
@ -792,17 +792,17 @@ int ObRecordHeaderV3::deserialize(const char *buf, int64_t buf_len, int64_t &pos
|
||||
}
|
||||
|
||||
ObDDLMacroBlockRedoInfo::ObDDLMacroBlockRedoInfo()
|
||||
: table_key_(), data_buffer_(), block_type_(ObDDLMacroBlockType::DDL_MB_INVALID_TYPE), start_log_ts_(0)
|
||||
: table_key_(), data_buffer_(), block_type_(ObDDLMacroBlockType::DDL_MB_INVALID_TYPE), start_scn_(palf::SCN::min_scn())
|
||||
{
|
||||
}
|
||||
|
||||
bool ObDDLMacroBlockRedoInfo::is_valid() const
|
||||
{
|
||||
return table_key_.is_valid() && data_buffer_.ptr() != nullptr
|
||||
&& block_type_ != ObDDLMacroBlockType::DDL_MB_INVALID_TYPE && logic_id_.is_valid() && start_log_ts_ > 0;
|
||||
return table_key_.is_valid() && data_buffer_.ptr() != nullptr && block_type_ != ObDDLMacroBlockType::DDL_MB_INVALID_TYPE
|
||||
&& logic_id_.is_valid() && start_scn_.is_valid() && !start_scn_.is_min();
|
||||
}
|
||||
|
||||
OB_SERIALIZE_MEMBER(ObDDLMacroBlockRedoInfo, table_key_, data_buffer_, block_type_, logic_id_, start_log_ts_);
|
||||
OB_SERIALIZE_MEMBER(ObDDLMacroBlockRedoInfo, table_key_, data_buffer_, block_type_, logic_id_, start_scn_);
|
||||
|
||||
constexpr uint8_t ObColClusterInfoMask::BYTES_TYPE_TO_LEN[];
|
||||
|
||||
|
@ -30,6 +30,7 @@
|
||||
#include "storage/ob_i_store.h"
|
||||
#include "storage/ob_i_table.h"
|
||||
#include "storage/blocksstable/ob_logic_macro_id.h"
|
||||
#include "logservice/palf/scn.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -1143,13 +1144,13 @@ public:
|
||||
ObDDLMacroBlockRedoInfo();
|
||||
~ObDDLMacroBlockRedoInfo() = default;
|
||||
bool is_valid() const;
|
||||
TO_STRING_KV(K_(table_key), K_(data_buffer), K_(block_type), K_(logic_id), K_(start_log_ts));
|
||||
TO_STRING_KV(K_(table_key), K_(data_buffer), K_(block_type), K_(logic_id), K_(start_scn));
|
||||
public:
|
||||
storage::ObITable::TableKey table_key_;
|
||||
ObString data_buffer_;
|
||||
ObDDLMacroBlockType block_type_;
|
||||
ObLogicMacroBlockId logic_id_;
|
||||
int64_t start_log_ts_;
|
||||
palf::SCN start_scn_;
|
||||
};
|
||||
|
||||
}//end namespace blocksstable
|
||||
|
@ -1206,6 +1206,8 @@ int ObBaseIndexBlockBuilder::meta_to_row_desc(
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
STORAGE_LOG(WARN, "invalid null data store desc", K(ret));
|
||||
} else if (FALSE_IT(data_desc = const_cast<ObDataStoreDesc *>(row_desc.data_store_desc_))) {
|
||||
} else if (OB_FAIL(data_desc->end_scn_.convert_for_tx(macro_meta.val_.snapshot_version_))) {
|
||||
STORAGE_LOG(WARN, "fail to convert scn", K(ret), K(macro_meta.val_.snapshot_version_));
|
||||
} else {
|
||||
data_desc->row_store_type_ = macro_meta.val_.row_store_type_;
|
||||
data_desc->compressor_type_ = macro_meta.val_.compressor_type_;
|
||||
@ -1213,9 +1215,7 @@ int ObBaseIndexBlockBuilder::meta_to_row_desc(
|
||||
data_desc->encrypt_id_ = macro_meta.val_.encrypt_id_;
|
||||
MEMCPY(data_desc->encrypt_key_, macro_meta.val_.encrypt_key_, sizeof(data_desc->encrypt_key_));
|
||||
data_desc->schema_version_ = macro_meta.val_.schema_version_;
|
||||
// if major merge, set snapshot_version_; otherwise, end_log_ts_
|
||||
data_desc->snapshot_version_ = macro_meta.val_.snapshot_version_;
|
||||
data_desc->end_log_ts_ = macro_meta.val_.snapshot_version_;
|
||||
|
||||
row_desc.is_secondary_meta_ = false;
|
||||
row_desc.is_macro_node_ = true;
|
||||
|
@ -308,7 +308,7 @@ int ObIndexBlockRowBuilder::append_header_and_meta(const ObIndexBlockRowDesc &de
|
||||
if (header_->is_data_index() && !header_->is_major_node()) {
|
||||
ObIndexBlockRowMinorMetaInfo *minor_meta
|
||||
= reinterpret_cast<ObIndexBlockRowMinorMetaInfo *>(data_buf_ + write_pos_);
|
||||
minor_meta->snapshot_version_ = desc.data_store_desc_->end_log_ts_;
|
||||
minor_meta->snapshot_version_ = desc.data_store_desc_->end_scn_.get_val_for_tx();
|
||||
header_->contain_uncommitted_row_ = desc.contain_uncommitted_row_;
|
||||
minor_meta->max_merged_trans_version_ = desc.max_merged_trans_version_;
|
||||
minor_meta->row_count_delta_ = desc.row_count_delta_;
|
||||
|
@ -201,10 +201,11 @@ int ObDataStoreDesc::init(
|
||||
schema_rowkey_col_cnt_ = merge_schema.get_rowkey_column_num();
|
||||
schema_version_ = merge_schema.get_schema_version();
|
||||
snapshot_version_ = snapshot_version;
|
||||
end_log_ts_ = snapshot_version; // will update in ObPartitionMerger::open_macro_writer
|
||||
sstable_index_builder_ = nullptr;
|
||||
|
||||
if (OB_FAIL(merge_schema.get_store_column_count(row_column_count_, true))) {
|
||||
if (OB_FAIL(end_scn_.convert_for_tx(snapshot_version))) { // will update in ObPartitionMerger::open_macro_writer
|
||||
STORAGE_LOG(WARN, "fail to convert scn", K(ret), K(snapshot_version));
|
||||
} else if (OB_FAIL(merge_schema.get_store_column_count(row_column_count_, true))) {
|
||||
STORAGE_LOG(WARN, "failed to get store column count", K(ret), K_(tablet_id));
|
||||
} else {
|
||||
rowkey_column_count_ =
|
||||
@ -304,7 +305,7 @@ void ObDataStoreDesc::reset()
|
||||
merge_type_ = INVALID_MERGE_TYPE;
|
||||
compressor_type_ = ObCompressorType::INVALID_COMPRESSOR;
|
||||
snapshot_version_ = 0;
|
||||
end_log_ts_ = 0;
|
||||
end_scn_.set_min();
|
||||
encrypt_id_ = 0;
|
||||
need_prebuild_bloomfilter_ = false;
|
||||
bloomfilter_rowkey_prefix_ = 0;
|
||||
@ -338,7 +339,7 @@ int ObDataStoreDesc::assign(const ObDataStoreDesc &desc)
|
||||
merge_type_ = desc.merge_type_;
|
||||
compressor_type_ = desc.compressor_type_;
|
||||
snapshot_version_ = desc.snapshot_version_;
|
||||
end_log_ts_ = desc.end_log_ts_;
|
||||
end_scn_ = desc.end_scn_;
|
||||
encrypt_id_ = desc.encrypt_id_;
|
||||
need_prebuild_bloomfilter_ = desc.need_prebuild_bloomfilter_;
|
||||
bloomfilter_rowkey_prefix_ = desc.bloomfilter_rowkey_prefix_;
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include "ob_sstable_meta.h"
|
||||
#include "share/ob_encryption_util.h"
|
||||
#include "storage/blocksstable/ob_macro_block_meta.h"
|
||||
#include "logservice/palf/scn.h"
|
||||
|
||||
namespace oceanbase {
|
||||
namespace storage {
|
||||
@ -62,7 +63,7 @@ struct ObDataStoreDesc
|
||||
ObSSTableIndexBuilder *sstable_index_builder_;
|
||||
ObCompressorType compressor_type_;
|
||||
int64_t snapshot_version_;
|
||||
int64_t end_log_ts_;
|
||||
palf::SCN end_scn_;
|
||||
int64_t progressive_merge_round_;
|
||||
int64_t encrypt_id_;
|
||||
bool need_prebuild_bloomfilter_;
|
||||
@ -93,7 +94,7 @@ struct ObDataStoreDesc
|
||||
OB_INLINE bool is_major_merge() const { return storage::is_major_merge(merge_type_); }
|
||||
int64_t get_logical_version() const
|
||||
{
|
||||
return is_major_merge() ? snapshot_version_ : end_log_ts_;
|
||||
return is_major_merge() ? snapshot_version_ : end_scn_.get_val_for_tx();
|
||||
}
|
||||
TO_STRING_KV(
|
||||
K_(ls_id),
|
||||
@ -109,7 +110,7 @@ struct ObDataStoreDesc
|
||||
KP_(merge_info),
|
||||
K_(merge_type),
|
||||
K_(snapshot_version),
|
||||
K_(end_log_ts),
|
||||
K_(end_scn),
|
||||
K_(need_prebuild_bloomfilter),
|
||||
K_(bloomfilter_rowkey_prefix),
|
||||
K_(encrypt_id),
|
||||
|
@ -50,7 +50,7 @@ ObSSTableBasicMeta::ObSSTableBasicMeta()
|
||||
upper_trans_version_(0),
|
||||
max_merged_trans_version_(0),
|
||||
recycle_version_(0),
|
||||
ddl_log_ts_(0),
|
||||
ddl_scn_(palf::SCN::min_scn()),
|
||||
filled_tx_scn_(palf::SCN::min_scn()),
|
||||
data_index_tree_height_(0),
|
||||
table_mode_(),
|
||||
@ -90,7 +90,7 @@ bool ObSSTableBasicMeta::operator==(const ObSSTableBasicMeta &other) const
|
||||
&& progressive_merge_step_ == other.progressive_merge_step_
|
||||
&& upper_trans_version_ == other.upper_trans_version_
|
||||
&& max_merged_trans_version_ == other.max_merged_trans_version_
|
||||
&& ddl_log_ts_ == other.ddl_log_ts_
|
||||
&& ddl_scn_ == other.ddl_scn_
|
||||
&& filled_tx_scn_ == other.filled_tx_scn_
|
||||
&& data_index_tree_height_ == other.data_index_tree_height_
|
||||
&& table_mode_ == other.table_mode_
|
||||
@ -123,8 +123,8 @@ bool ObSSTableBasicMeta::is_valid() const
|
||||
&& upper_trans_version_ >= 0
|
||||
&& max_merged_trans_version_ >= 0
|
||||
&& recycle_version_ >= 0
|
||||
&& ddl_log_ts_ >= 0
|
||||
// && filled_tx_scn_.is_valid() // TODO(scn), TODO(yangyi.yyy) wait TODO(danling) modify merge_scn
|
||||
&& ddl_scn_.is_valid()
|
||||
&& filled_tx_scn_.is_valid()
|
||||
&& data_index_tree_height_ >= 0
|
||||
&& row_store_type_ < ObRowStoreType::MAX_ROW_STORE);
|
||||
return ret;
|
||||
@ -153,7 +153,7 @@ void ObSSTableBasicMeta::reset()
|
||||
upper_trans_version_ = 0;
|
||||
max_merged_trans_version_ = 0;
|
||||
recycle_version_ = 0;
|
||||
ddl_log_ts_ = 0;
|
||||
ddl_scn_.set_min();
|
||||
filled_tx_scn_.set_min();
|
||||
data_index_tree_height_ = 0;
|
||||
table_mode_.reset();
|
||||
@ -208,7 +208,7 @@ DEFINE_SERIALIZE(ObSSTableBasicMeta)
|
||||
upper_trans_version_,
|
||||
max_merged_trans_version_,
|
||||
recycle_version_,
|
||||
ddl_log_ts_,
|
||||
ddl_scn_,
|
||||
filled_tx_scn_,
|
||||
data_index_tree_height_,
|
||||
table_mode_,
|
||||
@ -269,7 +269,7 @@ DEFINE_DESERIALIZE(ObSSTableBasicMeta)
|
||||
upper_trans_version_,
|
||||
max_merged_trans_version_,
|
||||
recycle_version_,
|
||||
ddl_log_ts_,
|
||||
ddl_scn_,
|
||||
filled_tx_scn_,
|
||||
data_index_tree_height_,
|
||||
table_mode_,
|
||||
@ -283,6 +283,8 @@ DEFINE_DESERIALIZE(ObSSTableBasicMeta)
|
||||
} else if (OB_UNLIKELY(length_ != pos - start_pos)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error, deserialize may has bug", K(ret), K(pos), K(start_pos), KPC(this));
|
||||
} else {
|
||||
filled_tx_scn_.transform_max();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -315,7 +317,7 @@ DEFINE_GET_SERIALIZE_SIZE(ObSSTableBasicMeta)
|
||||
upper_trans_version_,
|
||||
max_merged_trans_version_,
|
||||
recycle_version_,
|
||||
ddl_log_ts_,
|
||||
ddl_scn_,
|
||||
filled_tx_scn_,
|
||||
data_index_tree_height_,
|
||||
table_mode_,
|
||||
@ -457,7 +459,7 @@ int ObSSTableMeta::init_base_meta(
|
||||
basic_meta_.max_merged_trans_version_ = param.max_merged_trans_version_;
|
||||
basic_meta_.upper_trans_version_ = contain_uncommitted_row() ?
|
||||
INT64_MAX : basic_meta_.max_merged_trans_version_;
|
||||
basic_meta_.ddl_log_ts_ = param.ddl_log_ts_;
|
||||
basic_meta_.ddl_scn_ = param.ddl_scn_;
|
||||
basic_meta_.filled_tx_scn_ = param.filled_tx_scn_;
|
||||
basic_meta_.data_index_tree_height_ = param.data_index_tree_height_;
|
||||
basic_meta_.row_store_type_ = param.root_row_store_type_;
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include "storage/ob_storage_schema.h"
|
||||
#include "storage/ob_i_table.h"
|
||||
#include "storage/blocksstable/ob_sstable_meta_info.h"
|
||||
#include "logservice/palf/scn.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -56,7 +57,7 @@ public:
|
||||
}
|
||||
OB_INLINE int64_t get_upper_trans_version() const { return ATOMIC_LOAD(&upper_trans_version_); }
|
||||
OB_INLINE int64_t get_max_merged_trans_version() const { return max_merged_trans_version_; }
|
||||
OB_INLINE int64_t get_ddl_log_ts() const { return ddl_log_ts_; }
|
||||
OB_INLINE palf::SCN get_ddl_scn() const { return ddl_scn_; }
|
||||
OB_INLINE int64_t get_create_snapshot_version() const { return create_snapshot_version_; }
|
||||
OB_INLINE palf::SCN get_filled_tx_scn() const { return filled_tx_scn_; }
|
||||
OB_INLINE int16_t get_data_index_tree_height() const { return data_index_tree_height_; }
|
||||
@ -74,7 +75,7 @@ public:
|
||||
K(create_snapshot_version_), K(progressive_merge_round_),
|
||||
K(progressive_merge_step_), K(data_index_tree_height_), K(table_mode_),
|
||||
K(upper_trans_version_), K(max_merged_trans_version_), K_(recycle_version),
|
||||
K(ddl_log_ts_), K(filled_tx_scn_),
|
||||
K(ddl_scn_), K(filled_tx_scn_),
|
||||
K(contain_uncommitted_row_), K(status_), K_(row_store_type), K_(compressor_type),
|
||||
K_(encrypt_id), K_(master_key_id), KPHEX_(encrypt_key, sizeof(encrypt_key_)));
|
||||
|
||||
@ -102,7 +103,7 @@ public:
|
||||
int64_t max_merged_trans_version_;
|
||||
// recycle_version only avaliable for minor sstable, recored recycled multi version start
|
||||
int64_t recycle_version_;
|
||||
int64_t ddl_log_ts_; // only used in DDL SSTable, all MB in DDL SSTable should have the same log_ts(start_log_ts)
|
||||
palf::SCN ddl_scn_; // only used in DDL SSTable, all MB in DDL SSTable should have the same scn(start_scn)
|
||||
palf::SCN filled_tx_scn_; // only for rebuild
|
||||
int16_t data_index_tree_height_;
|
||||
share::schema::ObTableMode table_mode_;
|
||||
|
@ -140,7 +140,7 @@ int ObPartitionMerger::open_macro_writer(ObMergeParameter &merge_param)
|
||||
} else if (OB_FAIL(macro_start_seq.set_parallel_degree(task_idx_))) {
|
||||
STORAGE_LOG(WARN, "Failed to set parallel degree to macro start seq", K(ret), K_(task_idx));
|
||||
} else {
|
||||
data_store_desc_.end_log_ts_ = merge_ctx_->scn_range_.end_scn_.get_val_for_inner_table_field();
|
||||
data_store_desc_.end_scn_ = merge_ctx_->scn_range_.end_scn_;
|
||||
if (OB_FAIL(macro_writer_.open(data_store_desc_, macro_start_seq))) {
|
||||
STORAGE_LOG(WARN, "Failed to open macro block writer", K(ret));
|
||||
}
|
||||
|
@ -91,7 +91,7 @@ int ObDDLMacroBlockClogCb::init(const share::ObLSID &ls_id,
|
||||
redo_info_.block_type_ = redo_info.block_type_;
|
||||
redo_info_.logic_id_ = redo_info.logic_id_;
|
||||
redo_info_.table_key_ = redo_info.table_key_;
|
||||
redo_info_.start_log_ts_ = redo_info.start_log_ts_;
|
||||
redo_info_.start_scn_ = redo_info.start_scn_;
|
||||
ls_id_ = ls_id;
|
||||
macro_block_id_ = macro_block_id;
|
||||
}
|
||||
@ -136,7 +136,7 @@ int ObDDLMacroBlockClogCb::on_success()
|
||||
macro_block.log_ts_ = __get_scn().get_val_for_inner_table_field();
|
||||
macro_block.buf_ = redo_info_.data_buffer_.ptr();
|
||||
macro_block.size_ = redo_info_.data_buffer_.length();
|
||||
macro_block.ddl_start_log_ts_ = redo_info_.start_log_ts_;
|
||||
macro_block.ddl_start_log_ts_ = redo_info_.start_scn_.get_val_for_lsn_allocator();
|
||||
if (OB_FAIL(ObDDLKVPendingGuard::set_macro_block(tablet_handle.get_obj(), macro_block))) {
|
||||
LOG_WARN("set macro block into ddl kv failed", K(ret), K(tablet_handle), K(macro_block));
|
||||
}
|
||||
|
@ -353,7 +353,7 @@ int ObDDLTableMergeTask::process()
|
||||
LOG_WARN("release ddl kv failed", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (OB_SUCC(ret) && merge_param_.is_commit_) {
|
||||
if (skip_major_process) {
|
||||
} else if (OB_ISNULL(sstable)) {
|
||||
@ -578,7 +578,6 @@ int ObTabletDDLUtil::create_ddl_sstable(ObSSTableIndexBuilder *sstable_index_bui
|
||||
param.index_type_ = storage_schema.get_index_type();
|
||||
param.rowkey_column_cnt_ = storage_schema.get_rowkey_column_num() + ObMultiVersionRowkeyHelpper::get_extra_rowkey_col_cnt();
|
||||
param.schema_version_ = storage_schema.get_schema_version();
|
||||
param.ddl_log_ts_ = ddl_param.start_log_ts_;
|
||||
param.create_snapshot_version_ = ddl_param.snapshot_version_;
|
||||
ObSSTableMergeRes::fill_addr_and_data(res.root_desc_,
|
||||
param.root_block_addr_, param.root_block_data_);
|
||||
@ -604,7 +603,9 @@ int ObTabletDDLUtil::create_ddl_sstable(ObSSTableIndexBuilder *sstable_index_bui
|
||||
param.other_block_ids_ = res.other_block_ids_;
|
||||
MEMCPY(param.encrypt_key_, res.encrypt_key_, share::OB_MAX_TABLESPACE_ENCRYPT_KEY_LENGTH);
|
||||
|
||||
if (OB_FAIL(res.fill_column_checksum(&storage_schema, param.column_checksums_))) {
|
||||
if (OB_FAIL(param.ddl_scn_.convert_for_lsn_allocator(ddl_param.start_log_ts_))) {
|
||||
LOG_WARN("fail to convert scn", K(ret), K(ddl_param.start_log_ts_));
|
||||
} else if (OB_FAIL(res.fill_column_checksum(&storage_schema, param.column_checksums_))) {
|
||||
LOG_WARN("fail to fill column checksum for empty major", K(ret), K(param));
|
||||
} else if (OB_FAIL(ObTabletCreateDeleteHelper::create_sstable(param, table_handle))) {
|
||||
LOG_WARN("create sstable failed", K(ret), K(param));
|
||||
|
@ -105,7 +105,7 @@ int ObDDLRedoLogReplayer::replay_redo(const ObDDLRedoLog &log, const int64_t log
|
||||
} else if (OB_UNLIKELY(!log.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", K(ret), K(log));
|
||||
} else if (OB_FAIL(check_need_replay_ddl_log(table_key, redo_info.start_log_ts_, log_ts, need_replay, tablet_handle))) {
|
||||
} else if (OB_FAIL(check_need_replay_ddl_log(table_key, redo_info.start_scn_.get_val_for_lsn_allocator(), log_ts, need_replay, tablet_handle))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
LOG_WARN("fail to check need replay ddl log", K(ret), K(table_key), K(log_ts));
|
||||
}
|
||||
@ -141,7 +141,7 @@ int ObDDLRedoLogReplayer::replay_redo(const ObDDLRedoLog &log, const int64_t log
|
||||
macro_block.log_ts_ = log_ts;
|
||||
macro_block.buf_ = redo_info.data_buffer_.ptr();
|
||||
macro_block.size_ = redo_info.data_buffer_.length();
|
||||
macro_block.ddl_start_log_ts_ = redo_info.start_log_ts_;
|
||||
macro_block.ddl_start_log_ts_ = redo_info.start_scn_.get_val_for_lsn_allocator();
|
||||
if (OB_FAIL(ObDDLKVPendingGuard::set_macro_block(tablet_handle.get_obj(), macro_block))) {
|
||||
LOG_WARN("set macro block into ddl kv failed", K(ret), K(tablet_handle), K(macro_block));
|
||||
}
|
||||
|
@ -1211,13 +1211,14 @@ int ObDDLRedoLogWriterCallback::write(const ObMacroBlockHandle ¯o_handle,
|
||||
LOG_WARN("ObDDLRedoLogWriterCallback is not inited", K(ret));
|
||||
} else if (OB_FAIL(prepare_block_buffer_if_need())) {
|
||||
LOG_WARN("prepare block buffer failed", K(ret));
|
||||
} else if (OB_FAIL(redo_info_.start_scn_.convert_for_lsn_allocator(ddl_writer_->get_start_log_ts()))) {
|
||||
LOG_WARN("fail to convert scn", K(ret), K(ddl_writer_->get_start_log_ts()));
|
||||
} else {
|
||||
macro_block_id_ = macro_handle.get_macro_id();
|
||||
redo_info_.table_key_ = table_key_;
|
||||
redo_info_.data_buffer_.assign(buf, OB_SERVER_BLOCK_MGR.get_macro_block_size());
|
||||
redo_info_.block_type_ = block_type_;
|
||||
redo_info_.logic_id_ = logic_id;
|
||||
redo_info_.start_log_ts_ = ddl_writer_->get_start_log_ts();
|
||||
if (OB_FAIL(ddl_writer_->write_redo_log(redo_info_, macro_block_id_))) {
|
||||
LOG_WARN("fail to write ddl redo log", K(ret));
|
||||
}
|
||||
|
@ -962,7 +962,7 @@ int ObPhysicalCopyFinishTask::build_create_sstable_param_(
|
||||
param.occupy_size_ = sstable_param_->basic_meta_.occupy_size_;
|
||||
param.original_size_ = sstable_param_->basic_meta_.original_size_;
|
||||
param.max_merged_trans_version_ = sstable_param_->basic_meta_.max_merged_trans_version_;
|
||||
param.ddl_log_ts_ = sstable_param_->basic_meta_.ddl_log_ts_;
|
||||
param.ddl_scn_ = sstable_param_->basic_meta_.ddl_scn_;
|
||||
param.filled_tx_scn_ = sstable_param_->basic_meta_.filled_tx_scn_;
|
||||
param.contain_uncommitted_row_ = sstable_param_->basic_meta_.contain_uncommitted_row_;
|
||||
param.compressor_type_ = sstable_param_->basic_meta_.compressor_type_;
|
||||
|
@ -984,7 +984,7 @@ int ObStorageHATabletsBuilder::build_remote_logical_sstable_param_(
|
||||
param.column_cnt_ = table_schema.get_column_count() + multi_version_col_cnt;
|
||||
param.data_checksum_ = 0;
|
||||
param.occupy_size_ = 0;
|
||||
param.ddl_log_ts_ = 0;
|
||||
param.ddl_scn_.set_min();
|
||||
param.filled_tx_scn_.set_min();
|
||||
param.original_size_ = 0;
|
||||
param.compressor_type_ = ObCompressorType::NONE_COMPRESSOR;
|
||||
|
@ -259,7 +259,12 @@ int ObLSDDLLogHandler::flush(palf::SCN &rec_scn)
|
||||
palf::SCN ObLSDDLLogHandler::get_rec_scn()
|
||||
{
|
||||
palf::SCN tmp;
|
||||
tmp.convert_for_lsn_allocator(get_rec_log_ts());
|
||||
const int64_t rec_log_ts = get_rec_log_ts();
|
||||
if (INT64_MAX == rec_log_ts) {
|
||||
tmp.set_max();
|
||||
} else {
|
||||
tmp.convert_for_lsn_allocator(rec_log_ts);
|
||||
}
|
||||
return tmp;
|
||||
}
|
||||
|
||||
|
@ -1856,8 +1856,8 @@ int ObLSTabletService::build_create_sstable_param_for_migration(
|
||||
param.occupy_size_ = mig_param.basic_meta_.occupy_size_;
|
||||
param.original_size_ = mig_param.basic_meta_.original_size_;
|
||||
param.max_merged_trans_version_ = mig_param.basic_meta_.max_merged_trans_version_;
|
||||
param.ddl_log_ts_ = mig_param.basic_meta_.ddl_log_ts_;
|
||||
param.filled_tx_scn_ = mig_param.basic_meta_.filled_tx_scn_;
|
||||
param.ddl_scn_ = mig_param.basic_meta_.ddl_scn_;
|
||||
param.filled_tx_scn_ = mig_param.basic_meta_.filled_tx_scn_;
|
||||
param.contain_uncommitted_row_ = mig_param.basic_meta_.contain_uncommitted_row_;
|
||||
param.compressor_type_ = mig_param.basic_meta_.compressor_type_;
|
||||
param.encrypt_id_ = mig_param.basic_meta_.encrypt_id_;
|
||||
|
@ -2385,7 +2385,7 @@ int ObTabletCreateDeleteHelper::build_create_sstable_param(
|
||||
param.column_cnt_ = table_schema.get_column_count() + multi_version_col_cnt;
|
||||
param.data_checksum_ = 0;
|
||||
param.occupy_size_ = 0;
|
||||
param.ddl_log_ts_ = 0;
|
||||
param.ddl_scn_.set_min();
|
||||
param.filled_tx_scn_.set_min();
|
||||
param.original_size_ = 0;
|
||||
param.compressor_type_ = ObCompressorType::NONE_COMPRESSOR;
|
||||
@ -2443,7 +2443,7 @@ int ObTabletCreateDeleteHelper::build_create_sstable_param(
|
||||
param.column_cnt_ = table_schema.get_column_count() + multi_version_col_cnt;
|
||||
param.data_checksum_ = 0;
|
||||
param.occupy_size_ = 0;
|
||||
param.ddl_log_ts_ = 0;
|
||||
param.ddl_scn_.set_min();
|
||||
param.filled_tx_scn_.set_min();
|
||||
param.original_size_ = 0;
|
||||
param.compressor_type_ = ObCompressorType::NONE_COMPRESSOR;
|
||||
|
@ -50,7 +50,7 @@ ObTabletCreateSSTableParam::ObTabletCreateSSTableParam()
|
||||
occupy_size_(0),
|
||||
original_size_(0),
|
||||
max_merged_trans_version_(0),
|
||||
ddl_log_ts_(0),
|
||||
ddl_scn_(palf::SCN::min_scn()),
|
||||
filled_tx_scn_(palf::SCN::min_scn()),
|
||||
contain_uncommitted_row_(false),
|
||||
compressor_type_(ObCompressorType::INVALID_COMPRESSOR),
|
||||
@ -84,7 +84,7 @@ bool ObTabletCreateSSTableParam::is_valid() const
|
||||
&& rowkey_column_cnt_ >= 0
|
||||
&& column_cnt_ >= 0
|
||||
&& occupy_size_ >= 0
|
||||
&& ddl_log_ts_ > OB_INVALID_TIMESTAMP
|
||||
&& ddl_scn_.is_valid()
|
||||
&& filled_tx_scn_.is_valid()
|
||||
&& original_size_ >= 0)) {
|
||||
ret = false;
|
||||
@ -92,7 +92,7 @@ bool ObTabletCreateSSTableParam::is_valid() const
|
||||
K(root_row_store_type_), K(data_index_tree_height_), K(index_blocks_cnt_),
|
||||
K(data_blocks_cnt_), K(micro_block_cnt_), K(use_old_macro_block_count_),
|
||||
K(row_count_), K(rowkey_column_cnt_), K(column_cnt_), K(occupy_size_),
|
||||
K(original_size_), K(ddl_log_ts_), K(filled_tx_scn_));
|
||||
K(original_size_), K(ddl_scn_), K(filled_tx_scn_));
|
||||
} else if (ObITable::is_ddl_sstable(table_key_.table_type_)) {
|
||||
// ddl sstable can have invalid meta addr, so skip following ifs
|
||||
} else if (!is_block_meta_valid(root_block_addr_, root_block_data_)) {
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include "storage/blocksstable/ob_macro_block_id.h"
|
||||
#include "storage/blocksstable/ob_imicro_block_reader.h"
|
||||
#include "storage/meta_mem/ob_meta_obj_struct.h"
|
||||
#include "logservice/palf/scn.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -61,7 +62,7 @@ public:
|
||||
K_(occupy_size),
|
||||
K_(original_size),
|
||||
K_(max_merged_trans_version),
|
||||
K_(ddl_log_ts),
|
||||
K_(ddl_scn),
|
||||
K_(contain_uncommitted_row),
|
||||
K_(compressor_type),
|
||||
K_(encrypt_id),
|
||||
@ -96,7 +97,7 @@ public:
|
||||
int64_t occupy_size_;
|
||||
int64_t original_size_;
|
||||
int64_t max_merged_trans_version_;
|
||||
int64_t ddl_log_ts_;
|
||||
palf::SCN ddl_scn_;
|
||||
palf::SCN filled_tx_scn_;
|
||||
bool contain_uncommitted_row_;
|
||||
common::ObCompressorType compressor_type_;
|
||||
|
@ -931,8 +931,8 @@ int ObTabletTableStore::build_ddl_sstables(
|
||||
// already pushed, do nothing
|
||||
} else {
|
||||
ObITable *last_ddl_sstable = ddl_sstables.at(ddl_sstables.count() - 1);
|
||||
const int64_t old_ddl_start_scn = static_cast<ObSSTable *>(last_ddl_sstable)->get_meta().get_basic_meta().get_ddl_log_ts();
|
||||
const int64_t new_ddl_start_scn = static_cast<ObSSTable *>(new_table)->get_meta().get_basic_meta().get_ddl_log_ts();
|
||||
const palf::SCN old_ddl_start_scn = static_cast<ObSSTable *>(last_ddl_sstable)->get_meta().get_basic_meta().get_ddl_scn();
|
||||
const palf::SCN new_ddl_start_scn = static_cast<ObSSTable *>(new_table)->get_meta().get_basic_meta().get_ddl_scn();
|
||||
if (new_ddl_start_scn > old_ddl_start_scn) {
|
||||
// ddl start log ts changed means task retry, clean up old ddl sstable
|
||||
ddl_sstables.reset();
|
||||
|
@ -255,7 +255,7 @@ void TestSSTableMeta::prepare_create_sstable_param()
|
||||
param_.column_cnt_ = table_schema_.get_column_count() + multi_version_col_cnt;
|
||||
param_.data_checksum_ = 0;
|
||||
param_.occupy_size_ = 0;
|
||||
param_.ddl_log_ts_ = 0;
|
||||
param_.ddl_scn_.set_min();
|
||||
param_.filled_tx_scn_.set_min();
|
||||
param_.original_size_ = 0;
|
||||
param_.compressor_type_ = ObCompressorType::NONE_COMPRESSOR;
|
||||
@ -557,7 +557,7 @@ TEST_F(TestMigrationSSTableParam, test_migrate_sstable)
|
||||
src_sstable_param.column_cnt_ = 1;
|
||||
src_sstable_param.data_checksum_ = 0;
|
||||
src_sstable_param.occupy_size_ = 0;
|
||||
src_sstable_param.ddl_log_ts_ = 0;
|
||||
src_sstable_param.ddl_scn_.set_min();
|
||||
src_sstable_param.filled_tx_scn_.set_min();
|
||||
src_sstable_param.original_size_ = 0;
|
||||
src_sstable_param.compressor_type_ = ObCompressorType::NONE_COMPRESSOR;
|
||||
|
Loading…
x
Reference in New Issue
Block a user