MacroMeta Adapt ddl offset for migration&backup
This commit is contained in:
parent
2365f7b369
commit
f63f346a53
@ -1566,7 +1566,7 @@ TEST_F(TestIndexTree, test_estimate_meta_block_size)
|
||||
ObMacroBlock ¯o_block = data_writer.macro_blocks_[0];
|
||||
ObMacroBlockHandle ¯o_handle = data_writer.macro_handles_[0];
|
||||
OK(data_writer.build_micro_block());
|
||||
OK(data_writer.builder_->generate_macro_row(macro_block, macro_handle.get_macro_id()));
|
||||
OK(data_writer.builder_->generate_macro_row(macro_block, macro_handle.get_macro_id(), -1/*ddl_start_row_offset*/));
|
||||
const ObSSTableMacroBlockHeader ¯o_header_ = macro_block.macro_header_;
|
||||
ASSERT_EQ(macro_header_.fixed_header_.idx_block_offset_ + macro_header_.fixed_header_.idx_block_size_,
|
||||
macro_header_.fixed_header_.meta_block_offset_);
|
||||
@ -1652,7 +1652,8 @@ TEST_F(TestIndexTree, test_absolute_offset)
|
||||
ASSERT_EQ(test_row_num, merge_info_list->count());
|
||||
vector<int64_t> absolute_offsets;
|
||||
for (int meta_idx = 0; meta_idx < test_row_num; meta_idx += 10) {
|
||||
rebuilder.append_macro_row(*merge_info_list->at(meta_idx), meta_idx);
|
||||
merge_info_list->at(meta_idx)->val_.ddl_end_row_offset_ = meta_idx;
|
||||
rebuilder.append_macro_row(*merge_info_list->at(meta_idx));
|
||||
absolute_offsets.push_back(meta_idx);
|
||||
}
|
||||
OK(rebuilder.close());
|
||||
|
@ -99,17 +99,10 @@ int ObIndexTreeRootCtx::init(common::ObIAllocator &allocator)
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
use_absolute_offset_ for ddl paritial sst select.
|
||||
ddl cg sstable : use_absolute_offset == true, absolute_offsets == nullptr
|
||||
ddl co sstable : use_absolute_offset == true, absolute_offsets != nullptr
|
||||
not ddl sstable for column store : use_ablsolute_offset == false, absolute_offsets == nullptr
|
||||
*/
|
||||
bool ObIndexTreeRootCtx::is_absolute_vaild(const bool is_cg) const
|
||||
bool ObIndexTreeRootCtx::is_absolute_vaild() const
|
||||
{
|
||||
return (!use_absolute_offset_&& absolute_offsets_ == nullptr) ||
|
||||
(use_absolute_offset_ && !is_cg && absolute_offsets_ != nullptr) ||
|
||||
(use_absolute_offset_ && is_cg && absolute_offsets_ == nullptr);
|
||||
(use_absolute_offset_ && absolute_offsets_ != nullptr);
|
||||
}
|
||||
|
||||
int ObIndexTreeRootCtx::add_absolute_row_offset(const int64_t absolute_row_offset)
|
||||
@ -666,14 +659,17 @@ int ObSSTableIndexBuilder::trim_empty_roots()
|
||||
} else if (0 == roots_[i]->data_blocks_cnt_) {
|
||||
roots_[i]->~ObIndexTreeRootCtx(); // release
|
||||
} else {
|
||||
if (OB_FAIL(tmp_roots.push_back(roots_[i]))) {
|
||||
if (OB_UNLIKELY(!roots_[i]->is_absolute_vaild())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "invalid absolute offset", K(ret), K(i), KPC(roots_[i]));
|
||||
} else if (OB_FAIL(tmp_roots.push_back(roots_[i]))) {
|
||||
STORAGE_LOG(WARN, "fail to push back root", K(ret), KPC(roots_[i]));
|
||||
} else if (tmp_roots.count() == 1) {
|
||||
use_absolute_offset = tmp_roots.at(0)->use_absolute_offset_;
|
||||
} else if (OB_UNLIKELY((roots_[i]->use_absolute_offset_ != use_absolute_offset)
|
||||
|| !roots_[i]->is_absolute_vaild(index_store_desc_.get_desc().is_cg()))) {
|
||||
} else if (OB_UNLIKELY((roots_[i]->use_absolute_offset_ != use_absolute_offset))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "unexpected row offset", K(ret), K(use_absolute_offset), KPC(roots_[i]));
|
||||
STORAGE_LOG(WARN, "unexpected not same use absolute offset",
|
||||
K(ret), K(i), K(use_absolute_offset), KPC(roots_[i]));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -788,7 +784,7 @@ int ObSSTableIndexBuilder::merge_index_tree(ObSSTableMergeRes &res)
|
||||
ObIndexBlockRowDesc row_desc(data_desc.get_desc());
|
||||
int64_t row_idx = -1;
|
||||
ObLogicMacroBlockId prev_logic_id;
|
||||
const bool need_rewrite = index_store_desc_.get_desc().is_cg() && !roots_[0]->use_absolute_offset_;
|
||||
const bool need_rewrite = index_store_desc_.get_desc().is_cg();
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < roots_.count(); ++i) {
|
||||
ObMacroMetasArray *macro_metas = roots_[i]->macro_metas_;
|
||||
for (int64_t j = 0; OB_SUCC(ret) && j < macro_metas->count(); ++j) {
|
||||
@ -802,16 +798,7 @@ int ObSSTableIndexBuilder::merge_index_tree(ObSSTableMergeRes &res)
|
||||
// and we don't want more additional memory/time consumption, we only check continuous ids here
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "unexpected duplicate logic macro id", K(ret), KPC(macro_meta), K(prev_logic_id));
|
||||
} else if (need_rewrite) {
|
||||
// use row_idx to rewrite endkey unless absolute offset has been used
|
||||
macro_meta->end_key_.datums_[0].set_int(
|
||||
macro_meta->val_.row_count_ + row_idx);
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)){
|
||||
} else if (use_absolute_offset && index_store_desc_.get_desc().is_cg()) { //ddl cg
|
||||
absolute_row_offset = macro_meta->val_.row_count_ + row_idx;
|
||||
} else if (use_absolute_offset && !index_store_desc_.get_desc().is_cg()) { //ddl co
|
||||
} else if (use_absolute_offset) {
|
||||
absolute_row_offset = roots_[i]->absolute_offsets_->at(j);
|
||||
} else {
|
||||
absolute_row_offset = macro_meta->val_.row_count_ + row_idx;
|
||||
@ -821,6 +808,7 @@ int ObSSTableIndexBuilder::merge_index_tree(ObSSTableMergeRes &res)
|
||||
} else if (OB_UNLIKELY(absolute_row_offset < 0)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "unexpected absolute row offset", K(ret), K(use_absolute_offset), K(row_idx), KPC(macro_meta));
|
||||
} else if (need_rewrite && FALSE_IT(macro_meta->end_key_.datums_[0].set_int(absolute_row_offset))) {
|
||||
} else if (FALSE_IT(row_desc.row_offset_ = absolute_row_offset)) {
|
||||
} else if (OB_FAIL(index_builder_.append_row(*macro_meta, row_desc))) {
|
||||
STORAGE_LOG(WARN, "fail to append row", K(ret), KPC(macro_meta), K(j), KPC(roots_.at(i)));
|
||||
@ -2086,7 +2074,8 @@ int ObDataIndexBlockBuilder::append_macro_block(const ObDataMacroBlockMeta ¯
|
||||
int ObDataIndexBlockBuilder::write_meta_block(
|
||||
ObMacroBlock ¯o_block,
|
||||
const MacroBlockId &block_id,
|
||||
const ObIndexBlockRowDesc ¯o_row_desc)
|
||||
const ObIndexBlockRowDesc ¯o_row_desc,
|
||||
const int64_t ddl_start_row_offset)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t data_offset = 0;
|
||||
@ -2106,6 +2095,7 @@ int ObDataIndexBlockBuilder::write_meta_block(
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "check micro block count failed", K(ret), K_(macro_meta), K(macro_row_desc));
|
||||
} else if (FALSE_IT(macro_meta_.end_key_ = last_rowkey_)) { // fill rowkey for cg
|
||||
} else if (FALSE_IT(update_macro_meta_with_offset(macro_block.get_row_count(), ddl_start_row_offset))) {
|
||||
} else if (OB_FAIL(row_desc_to_meta(macro_row_desc, macro_meta_, meta_row_allocator_))) {
|
||||
STORAGE_LOG(WARN, "fail to convert macro row descriptor to meta", K(ret), K(macro_row_desc));
|
||||
} else if (OB_FAIL(macro_meta_.build_row(meta_row_, meta_row_allocator_))) {
|
||||
@ -2137,8 +2127,23 @@ int ObDataIndexBlockBuilder::write_meta_block(
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObDataIndexBlockBuilder::update_macro_meta_with_offset(const int64_t macro_block_row_count,
|
||||
const int64_t ddl_start_row_offset)
|
||||
{
|
||||
if (leaf_store_desc_->get_major_working_cluster_version() >= DATA_VERSION_4_3_1_0) {
|
||||
if (ddl_start_row_offset >= 0) {
|
||||
macro_meta_.val_.ddl_end_row_offset_ = ddl_start_row_offset + macro_block_row_count - 1;
|
||||
} else {
|
||||
macro_meta_.val_.ddl_end_row_offset_ = -1/*default*/;
|
||||
}
|
||||
} else {
|
||||
macro_meta_.val_.version_ = ObDataBlockMetaVal::DATA_BLOCK_META_VAL_VERSION;
|
||||
}
|
||||
}
|
||||
|
||||
int ObDataIndexBlockBuilder::append_index_micro_block(ObMacroBlock ¯o_block,
|
||||
const MacroBlockId &block_id)
|
||||
const MacroBlockId &block_id,
|
||||
const int64_t ddl_start_row_offset)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -2178,7 +2183,7 @@ int ObDataIndexBlockBuilder::append_index_micro_block(ObMacroBlock ¯o_block,
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "expect macro id equal", K(ret), K(leaf_block_desc), K_(macro_row_desc));
|
||||
} else if (FALSE_IT(macro_row_desc_.macro_id_ = ObIndexBlockRowHeader::DEFAULT_IDX_ROW_MACRO_ID)) {
|
||||
} else if (OB_FAIL(write_meta_block(macro_block, block_id, macro_row_desc_))) {
|
||||
} else if (OB_FAIL(write_meta_block(macro_block, block_id, macro_row_desc_, ddl_start_row_offset))) {
|
||||
STORAGE_LOG(WARN, "fail to build meta block", K(ret));
|
||||
} else {
|
||||
index_tree_root_ctx_->last_macro_size_ = data_offset + leaf_block_size + meta_block_size_;
|
||||
@ -2214,7 +2219,8 @@ int ObDataIndexBlockBuilder::set_parallel_task_idx(const int64_t task_idx)
|
||||
}
|
||||
|
||||
int ObDataIndexBlockBuilder::generate_macro_row(ObMacroBlock ¯o_block,
|
||||
const MacroBlockId &block_id)
|
||||
const MacroBlockId &block_id,
|
||||
const int64_t ddl_start_row_offset)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
@ -2222,11 +2228,11 @@ int ObDataIndexBlockBuilder::generate_macro_row(ObMacroBlock ¯o_block,
|
||||
STORAGE_LOG(WARN, "invalid index builder", K(ret), K(is_inited_));
|
||||
} else if (OB_UNLIKELY(!block_id.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
STORAGE_LOG(WARN, "invalid macro block id", K(block_id));
|
||||
STORAGE_LOG(WARN, "invalid macro block id", K(block_id), K(ddl_start_row_offset));
|
||||
} else if (OB_UNLIKELY(!macro_block.is_dirty())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "invalid empty macro block", K(ret));
|
||||
} else if (OB_FAIL(append_index_micro_block(macro_block, block_id))) {
|
||||
} else if (OB_FAIL(append_index_micro_block(macro_block, block_id, ddl_start_row_offset))) {
|
||||
STORAGE_LOG(WARN, "fail to append n-1 level micro block", K(ret));
|
||||
} else {
|
||||
++data_blocks_cnt_;
|
||||
@ -2603,7 +2609,7 @@ void ObIndexBlockRebuilder::reset()
|
||||
sstable_builder_ = nullptr;
|
||||
}
|
||||
|
||||
int ObIndexBlockRebuilder::init(ObSSTableIndexBuilder &sstable_builder, bool need_sort, const int64_t *task_idx, const bool use_absolute_offset)
|
||||
int ObIndexBlockRebuilder::init(ObSSTableIndexBuilder &sstable_builder, bool need_sort, const int64_t *task_idx, const bool is_ddl_merge_sstable)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t bucket_num = 109;
|
||||
@ -2625,7 +2631,7 @@ int ObIndexBlockRebuilder::init(ObSSTableIndexBuilder &sstable_builder, bool nee
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
need_sort_ = need_sort;
|
||||
index_tree_root_ctx_->use_absolute_offset_ = use_absolute_offset;
|
||||
index_tree_root_ctx_->use_absolute_offset_ = is_ddl_merge_sstable;
|
||||
is_inited_ = true;
|
||||
}
|
||||
}
|
||||
@ -2788,29 +2794,28 @@ int ObIndexBlockRebuilder::append_macro_row(const ObDataMacroBlockMeta ¯o_me
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObIndexBlockRebuilder::append_macro_row(
|
||||
const ObDataMacroBlockMeta ¯o_meta,
|
||||
const int64_t absolute_row_offset)
|
||||
int ObIndexBlockRebuilder::fill_abs_offset_for_ddl()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
STORAGE_LOG(WARN, "rebuilder not inited", K(ret), K_(is_inited));
|
||||
} else if (true == need_sort_ || false == index_tree_root_ctx_->use_absolute_offset_ || absolute_row_offset < 0) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "unexpected use for paritial data", K(ret), K(need_sort_), K(absolute_row_offset), KPC(index_tree_root_ctx_));
|
||||
} else if (OB_UNLIKELY(!macro_meta.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
STORAGE_LOG(WARN, "invalid macro meta", K(ret), K(macro_meta));
|
||||
} else {
|
||||
lib::ObMutexGuard guard(mutex_); // migration will append concurrently
|
||||
if (OB_FAIL(index_tree_root_ctx_->add_macro_block_meta(macro_meta))) {// inc_ref
|
||||
STORAGE_LOG(WARN, "failed to add macro block meta", K(ret), K(macro_meta));
|
||||
} else if (OB_FAIL(index_tree_root_ctx_->add_absolute_row_offset(absolute_row_offset))) {
|
||||
STORAGE_LOG(WARN, "failed to add abs row offset", K(ret), K(absolute_row_offset));
|
||||
} else {
|
||||
STORAGE_LOG(DEBUG, "append macro meta with absolute offset", K(absolute_row_offset), K(macro_meta));
|
||||
if (index_tree_root_ctx_->use_absolute_offset_) {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < macro_meta_list_->count(); ++i) {
|
||||
const ObDataMacroBlockMeta *macro_meta = macro_meta_list_->at(i);
|
||||
if (OB_ISNULL(macro_meta)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "unexpected null macro meta", K(ret), K(i), KPC(index_tree_root_ctx_));
|
||||
} else if (macro_meta->val_.version_ < ObDataBlockMetaVal::DATA_BLOCK_META_VAL_VERSION_V2
|
||||
|| macro_meta->val_.ddl_end_row_offset_ < 0) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "unexpectd ddl end row offset", K(ret), K(i), KPC(macro_meta));
|
||||
} else if (OB_FAIL(index_tree_root_ctx_->add_absolute_row_offset(macro_meta->val_.ddl_end_row_offset_))) {
|
||||
STORAGE_LOG(WARN, "failed to add abs row offset", K(ret), K(macro_meta->val_.ddl_end_row_offset_));
|
||||
} else {
|
||||
STORAGE_LOG(DEBUG, "append macro meta with absolute offset",
|
||||
K(macro_meta->val_.ddl_end_row_offset_), KPC(macro_meta));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// not ddl rebuild, do nothing.
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -2826,12 +2831,10 @@ int ObIndexBlockRebuilder::close()
|
||||
} else if (need_sort_ && FALSE_IT(std::sort(macro_meta_list_->begin(), macro_meta_list_->end(), cmp))) {
|
||||
} else if (OB_FAIL(ret)) {
|
||||
STORAGE_LOG(WARN, "fail to sort meta list", K(ret), KPC(index_store_desc_));
|
||||
} else if (nullptr != index_tree_root_ctx_->absolute_offsets_ &&
|
||||
index_tree_root_ctx_->absolute_offsets_->count() != macro_meta_list_->count()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "unexpected row offset count not same with macro metas", K(ret));
|
||||
} else if (macro_meta_list_->count() == 0) {
|
||||
// do not append root to sstable builder since it's empty
|
||||
} else if (OB_FAIL(fill_abs_offset_for_ddl())) {
|
||||
STORAGE_LOG(WARN, "fail to fill abs offset for ddl", K(ret), KPC_(index_tree_root_ctx));
|
||||
} else {
|
||||
const int64_t blocks_cnt = macro_meta_list_->count();
|
||||
ObDataMacroBlockMeta *last_meta = macro_meta_list_->at(blocks_cnt - 1);
|
||||
@ -2839,7 +2842,7 @@ int ObIndexBlockRebuilder::close()
|
||||
index_tree_root_ctx_->data_column_cnt_ = last_meta->val_.column_count_;
|
||||
// index_tree_root_ctx_->macro_metas_ = macro_meta_list_; // should be done in init method
|
||||
index_tree_root_ctx_->data_blocks_cnt_ = blocks_cnt;
|
||||
STORAGE_LOG(INFO, "succeed to close rebuilder", KPC_(index_tree_root_ctx));
|
||||
STORAGE_LOG(INFO, "succeed to close rebuilder", KPC_(index_tree_root_ctx), K(need_sort_));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -65,13 +65,7 @@ public:
|
||||
~ObIndexTreeRootCtx();
|
||||
int init(common::ObIAllocator &allocator);
|
||||
void reset();
|
||||
/*
|
||||
use_absolute_offset_ for ddl paritial sst select.
|
||||
ddl cg sstable : use_absolute_offset == true, absolute_offsets == nullptr
|
||||
ddl co sstable : use_absolute_offset == true, absolute_offsets != nullptr
|
||||
not ddl sstable for column store : use_ablsolute_offset == false, absolute_offsets == nullptr
|
||||
*/
|
||||
inline bool is_absolute_vaild(const bool is_cg) const;
|
||||
bool is_absolute_vaild() const;
|
||||
int add_absolute_row_offset(const int64_t absolute_row_offset);
|
||||
int add_macro_block_meta(const ObDataMacroBlockMeta ¯o_meta);
|
||||
int get_macro_id_array(common::ObIArray<blocksstable::MacroBlockId> &block_ids); //inc ref
|
||||
@ -341,7 +335,7 @@ public:
|
||||
ObSSTableIndexBuilder &sstable_builder);
|
||||
int append_row(const ObMicroBlockDesc µ_block_desc,
|
||||
const ObMacroBlock ¯o_block);
|
||||
int generate_macro_row(ObMacroBlock ¯o_block, const MacroBlockId &id);
|
||||
int generate_macro_row(ObMacroBlock ¯o_block, const MacroBlockId &id, const int64_t ddl_start_row_offset);
|
||||
int append_macro_block(const ObDataMacroBlockMeta ¯o_meta);
|
||||
int cal_macro_meta_block_size(const ObDatumRowkey &rowkey, int64_t &estimate_block_size);
|
||||
int set_parallel_task_idx(const int64_t task_idx);
|
||||
@ -351,11 +345,17 @@ public:
|
||||
ObMacroBlocksWriteCtx *data_write_ctx);
|
||||
void reset();
|
||||
private:
|
||||
int append_index_micro_block(ObMacroBlock ¯o_block, const MacroBlockId &block_id);
|
||||
int append_index_micro_block(
|
||||
ObMacroBlock ¯o_block,
|
||||
const MacroBlockId &block_id,
|
||||
const int64_t ddl_start_row_offset);
|
||||
int write_meta_block(
|
||||
ObMacroBlock ¯o_block,
|
||||
const MacroBlockId &block_id,
|
||||
const ObIndexBlockRowDesc ¯o_row_desc);
|
||||
const ObIndexBlockRowDesc ¯o_row_desc,
|
||||
const int64_t ddl_start_row_offset);
|
||||
|
||||
void update_macro_meta_with_offset(const int64_t macro_block_row_count, const int64_t ddl_start_row_offset);
|
||||
virtual int insert_and_update_index_tree(const ObDatumRow *index_row) override;
|
||||
int append_next_row(const ObMicroBlockDesc µ_block_desc, ObIndexBlockRowDesc ¯o_row_desc);
|
||||
int add_row_offset(ObIndexBlockRowDesc &row_desc);
|
||||
@ -423,13 +423,12 @@ class ObIndexBlockRebuilder final
|
||||
public:
|
||||
ObIndexBlockRebuilder();
|
||||
~ObIndexBlockRebuilder();
|
||||
int init(ObSSTableIndexBuilder &sstable_builder, bool need_sort = true, const int64_t *task_idx = nullptr, const bool use_absolute_offset = false);
|
||||
int init(ObSSTableIndexBuilder &sstable_builder, bool need_sort = true, const int64_t *task_idx = nullptr, const bool is_ddl_merge_sstable = false);
|
||||
int append_macro_row(
|
||||
const char *buf,
|
||||
const int64_t size,
|
||||
const MacroBlockId ¯o_id);
|
||||
int append_macro_row(const ObDataMacroBlockMeta ¯o_meta);
|
||||
int append_macro_row(const ObDataMacroBlockMeta ¯o_meta, const int64_t absolute_row_offset);
|
||||
int close();
|
||||
void reset();
|
||||
static int get_macro_meta(
|
||||
@ -452,6 +451,7 @@ private:
|
||||
common::ObIAllocator &allocator,
|
||||
ObSSTableMacroBlockHeader &header,
|
||||
ObMicroBlockData &meta_block);
|
||||
int fill_abs_offset_for_ddl();
|
||||
private:
|
||||
bool is_inited_;
|
||||
bool need_sort_;
|
||||
|
@ -33,6 +33,7 @@ public:
|
||||
const int64_t buf_len,
|
||||
const int64_t row_count) = 0;
|
||||
virtual int wait() = 0;
|
||||
virtual int64_t get_ddl_start_row_offset() const { return -1; };
|
||||
};
|
||||
|
||||
} // end namespace blocksstable
|
||||
|
@ -20,7 +20,7 @@ namespace oceanbase
|
||||
namespace blocksstable
|
||||
{
|
||||
ObDataBlockMetaVal::ObDataBlockMetaVal()
|
||||
: version_(DATA_BLOCK_META_VAL_VERSION),
|
||||
: version_(DATA_BLOCK_META_VAL_VERSION_V2),
|
||||
length_(0),
|
||||
data_checksum_(0),
|
||||
rowkey_count_(0),
|
||||
@ -52,13 +52,14 @@ ObDataBlockMetaVal::ObDataBlockMetaVal()
|
||||
has_string_out_row_(false),
|
||||
all_lob_in_row_(false),
|
||||
agg_row_len_(0),
|
||||
agg_row_buf_(nullptr)
|
||||
agg_row_buf_(nullptr),
|
||||
ddl_end_row_offset_(-1)
|
||||
{
|
||||
MEMSET(encrypt_key_, 0, share::OB_MAX_TABLESPACE_ENCRYPT_KEY_LENGTH);
|
||||
}
|
||||
|
||||
ObDataBlockMetaVal::ObDataBlockMetaVal(ObIAllocator &allocator)
|
||||
: version_(DATA_BLOCK_META_VAL_VERSION),
|
||||
: version_(DATA_BLOCK_META_VAL_VERSION_V2),
|
||||
length_(0),
|
||||
data_checksum_(0),
|
||||
rowkey_count_(0),
|
||||
@ -134,11 +135,12 @@ void ObDataBlockMetaVal::reset()
|
||||
all_lob_in_row_ = false;
|
||||
agg_row_len_ = 0;
|
||||
agg_row_buf_ = nullptr;
|
||||
ddl_end_row_offset_ = -1;
|
||||
}
|
||||
|
||||
bool ObDataBlockMetaVal::is_valid() const
|
||||
{
|
||||
return DATA_BLOCK_META_VAL_VERSION == version_
|
||||
return (DATA_BLOCK_META_VAL_VERSION == version_ || DATA_BLOCK_META_VAL_VERSION_V2 == version_)
|
||||
&& rowkey_count_ >= 0
|
||||
&& column_count_ > 0
|
||||
&& micro_block_count_ >= 0
|
||||
@ -155,7 +157,8 @@ return DATA_BLOCK_META_VAL_VERSION == version_
|
||||
&& row_store_type_ < ObRowStoreType::MAX_ROW_STORE
|
||||
&& logic_id_.is_valid()
|
||||
&& macro_id_.is_valid()
|
||||
&& (0 == agg_row_len_ || nullptr != agg_row_buf_);
|
||||
&& (0 == agg_row_len_ || nullptr != agg_row_buf_)
|
||||
&& (ddl_end_row_offset_ == -1 || (version_ >= DATA_BLOCK_META_VAL_VERSION_V2 && ddl_end_row_offset_ >= 0));
|
||||
}
|
||||
|
||||
int ObDataBlockMetaVal::assign(const ObDataBlockMetaVal &val)
|
||||
@ -201,6 +204,7 @@ int ObDataBlockMetaVal::assign(const ObDataBlockMetaVal &val)
|
||||
all_lob_in_row_ = val.all_lob_in_row_;
|
||||
agg_row_len_ = val.agg_row_len_;
|
||||
agg_row_buf_ = val.agg_row_buf_;
|
||||
ddl_end_row_offset_ = val.ddl_end_row_offset_;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -283,6 +287,9 @@ DEFINE_SERIALIZE(ObDataBlockMetaVal)
|
||||
if (OB_SUCC(ret)) {
|
||||
MEMCPY(buf + pos, agg_row_buf_, agg_row_len_);
|
||||
pos += agg_row_len_;
|
||||
if (version_ >= DATA_BLOCK_META_VAL_VERSION_V2) {
|
||||
LST_DO_CODE(OB_UNIS_ENCODE, ddl_end_row_offset_);
|
||||
}
|
||||
if (OB_UNLIKELY(length_ != pos - start_pos)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error, serialize may have bug", K(ret), K(pos), K(start_pos), KPC(this));
|
||||
@ -303,7 +310,7 @@ DEFINE_DESERIALIZE(ObDataBlockMetaVal)
|
||||
int64_t start_pos = pos;
|
||||
if (OB_FAIL(serialization::decode_i32(buf, data_len, pos, &version_))) {
|
||||
LOG_WARN("fail to decode version", K(ret), K(data_len), K(pos));
|
||||
} else if (OB_UNLIKELY(version_ != DATA_BLOCK_META_VAL_VERSION)) {
|
||||
} else if (OB_UNLIKELY(version_ != DATA_BLOCK_META_VAL_VERSION && version_ != DATA_BLOCK_META_VAL_VERSION_V2)) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
LOG_WARN("object version mismatch", K(ret), K(version_));
|
||||
} else if (OB_FAIL(serialization::decode_i32(buf, data_len, pos, &length_))) {
|
||||
@ -350,6 +357,11 @@ DEFINE_DESERIALIZE(ObDataBlockMetaVal)
|
||||
agg_row_buf_ = buf + pos;
|
||||
pos += agg_row_len_;
|
||||
}
|
||||
if (version_ >= DATA_BLOCK_META_VAL_VERSION_V2) {
|
||||
LST_DO_CODE(OB_UNIS_DECODE, ddl_end_row_offset_);
|
||||
} else {
|
||||
ddl_end_row_offset_ = -1;
|
||||
}
|
||||
if (OB_UNLIKELY(length_ != pos - start_pos)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected error, deserialize may has bug", K(ret), K(pos), K(start_pos), KPC(this));
|
||||
@ -366,6 +378,9 @@ int64_t ObDataBlockMetaVal::get_max_serialize_size() const
|
||||
len += sizeof(int64_t); // serialize column count
|
||||
len += sizeof(int64_t) * column_count_; // serialize each checksum
|
||||
len += agg_row_len_;
|
||||
if (version_ >= DATA_BLOCK_META_VAL_VERSION_V2) {
|
||||
len += sizeof(int64_t);
|
||||
}
|
||||
return len;
|
||||
}
|
||||
DEFINE_GET_SERIALIZE_SIZE(ObDataBlockMetaVal)
|
||||
@ -406,6 +421,9 @@ DEFINE_GET_SERIALIZE_SIZE(ObDataBlockMetaVal)
|
||||
is_last_row_last_flag_,
|
||||
agg_row_len_);
|
||||
len += agg_row_len_;
|
||||
if (version_ >= DATA_BLOCK_META_VAL_VERSION_V2) {
|
||||
LST_DO_CODE(OB_UNIS_ADD_LEN, ddl_end_row_offset_);
|
||||
}
|
||||
return len;
|
||||
}
|
||||
|
||||
|
@ -34,8 +34,9 @@ enum ObMacroBlockMetaType
|
||||
|
||||
class ObDataBlockMetaVal final
|
||||
{
|
||||
private:
|
||||
public:
|
||||
static const int32_t DATA_BLOCK_META_VAL_VERSION = 1;
|
||||
static const int32_t DATA_BLOCK_META_VAL_VERSION_V2 = 2;
|
||||
public:
|
||||
ObDataBlockMetaVal();
|
||||
explicit ObDataBlockMetaVal(ObIAllocator &allocator);
|
||||
@ -56,7 +57,7 @@ public:
|
||||
K_(master_key_id), K_(encrypt_id), K_(encrypt_key), K_(row_store_type),
|
||||
K_(schema_version), K_(snapshot_version), K_(is_last_row_last_flag),
|
||||
K_(logic_id), K_(macro_id), K_(column_checksums), K_(has_string_out_row), K_(all_lob_in_row),
|
||||
K_(agg_row_len), KP_(agg_row_buf));
|
||||
K_(agg_row_len), KP_(agg_row_buf), K_(ddl_end_row_offset));
|
||||
public:
|
||||
int32_t version_;
|
||||
int32_t length_;
|
||||
@ -92,6 +93,9 @@ public:
|
||||
bool all_lob_in_row_;
|
||||
int64_t agg_row_len_; // size of agg_row_buf_
|
||||
const char *agg_row_buf_; // data buffer for pre aggregated row
|
||||
// used for ddl sstable migration & backup rebuild sstable
|
||||
// eg: if only one macro block with 100 rows, ddl_end_row_offset_ is 99.
|
||||
int64_t ddl_end_row_offset_;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObDataBlockMetaVal);
|
||||
};
|
||||
|
@ -1354,7 +1354,7 @@ int ObMacroBlockWriter::flush_macro_block(ObMacroBlock ¯o_block)
|
||||
|
||||
ObMacroBlockHandle ¯o_handle = macro_handles_[current_index_];
|
||||
ObMacroBlockHandle &prev_handle = macro_handles_[(current_index_ + 1) % 2];
|
||||
|
||||
const int64_t ddl_start_row_offset = callback_ == nullptr ? -1 : callback_->get_ddl_start_row_offset();
|
||||
if (OB_UNLIKELY(!macro_block.is_dirty())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "empty macro block has no pre-alloc macro id", K(ret), K(current_index_));
|
||||
@ -1363,7 +1363,7 @@ int ObMacroBlockWriter::flush_macro_block(ObMacroBlock ¯o_block)
|
||||
} else if (is_need_macro_buffer_ && OB_FAIL(wait_io_finish(prev_handle))) {
|
||||
STORAGE_LOG(WARN, "Fail to wait io finish, ", K(ret));
|
||||
} else if (OB_NOT_NULL(builder_)
|
||||
&& OB_FAIL(builder_->generate_macro_row(macro_block, macro_handle.get_macro_id()))) {
|
||||
&& OB_FAIL(builder_->generate_macro_row(macro_block, macro_handle.get_macro_id(), ddl_start_row_offset))) {
|
||||
STORAGE_LOG(WARN, "fail to generate macro row", K(ret), K_(current_macro_seq));
|
||||
} else if (OB_FAIL(macro_block.flush(macro_handle, block_write_ctx_))) {
|
||||
STORAGE_LOG(WARN, "macro block writer fail to flush macro block.", K(ret));
|
||||
|
@ -586,18 +586,9 @@ int ObTabletDDLUtil::create_ddl_sstable(ObTablet &tablet,
|
||||
} else if (meta_array.empty()) {
|
||||
// do nothing
|
||||
} else {
|
||||
if (ddl_param.table_key_.is_ddl_merge_sstable()) {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < meta_array.count(); ++i) {
|
||||
const ObDDLBlockMeta &ddl_block_meta = meta_array.at(i);
|
||||
if (OB_FAIL(index_block_rebuilder.append_macro_row(*ddl_block_meta.block_meta_, ddl_block_meta.end_row_offset_))) {
|
||||
LOG_WARN("append block meta failed", K(ret), K(i), K(ddl_block_meta));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < meta_array.count(); ++i) {
|
||||
if (OB_FAIL(index_block_rebuilder.append_macro_row(*meta_array.at(i).block_meta_))) {
|
||||
LOG_WARN("append block meta failed", K(ret), K(i));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < meta_array.count(); ++i) {
|
||||
if (OB_FAIL(index_block_rebuilder.append_macro_row(*meta_array.at(i).block_meta_))) {
|
||||
LOG_WARN("append block meta failed", K(ret), K(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -358,6 +358,7 @@ public:
|
||||
const int64_t buf_len,
|
||||
const int64_t row_count);
|
||||
int wait();
|
||||
virtual int64_t get_ddl_start_row_offset() const override { return row_id_offset_; }
|
||||
private:
|
||||
bool is_column_group_info_valid() const;
|
||||
int retry(const int64_t timeout_us);
|
||||
@ -371,6 +372,9 @@ private:
|
||||
int64_t task_id_;
|
||||
share::SCN start_scn_;
|
||||
uint64_t data_format_version_;
|
||||
// if has one macro block with 100 rows before, this macro block's ddl_start_row_offset will be 100.
|
||||
// if current macro block finish with 50 rows, current macro block's end_row_offset will be 149.
|
||||
// end_row_offset = ddl_start_row_offset + curr_row_count - 1.
|
||||
int64_t row_id_offset_;
|
||||
};
|
||||
|
||||
|
@ -50,7 +50,8 @@ ObPhysicalCopyCtx::ObPhysicalCopyCtx()
|
||||
restore_macro_block_id_mgr_(nullptr),
|
||||
need_sort_macro_meta_(true),
|
||||
need_check_seq_(false),
|
||||
ls_rebuild_seq_(-1)
|
||||
ls_rebuild_seq_(-1),
|
||||
table_key_()
|
||||
{
|
||||
}
|
||||
|
||||
@ -63,7 +64,8 @@ bool ObPhysicalCopyCtx::is_valid() const
|
||||
bool bool_ret = false;
|
||||
bool_ret = tenant_id_ != OB_INVALID_ID && ls_id_.is_valid() && tablet_id_.is_valid()
|
||||
&& OB_NOT_NULL(bandwidth_throttle_) && OB_NOT_NULL(svr_rpc_proxy_) && OB_NOT_NULL(ha_dag_)
|
||||
&& OB_NOT_NULL(sstable_index_builder_) && ((need_check_seq_ && ls_rebuild_seq_ >= 0) || !need_check_seq_);
|
||||
&& OB_NOT_NULL(sstable_index_builder_) && ((need_check_seq_ && ls_rebuild_seq_ >= 0) || !need_check_seq_)
|
||||
&& table_key_.is_valid();
|
||||
if (bool_ret) {
|
||||
if (!is_leader_restore_) {
|
||||
bool_ret = src_info_.is_valid();
|
||||
@ -93,6 +95,7 @@ void ObPhysicalCopyCtx::reset()
|
||||
need_sort_macro_meta_ = true;
|
||||
need_check_seq_ = false;
|
||||
ls_rebuild_seq_ = -1;
|
||||
table_key_.reset();
|
||||
}
|
||||
|
||||
/******************ObPhysicalCopyTaskInitParam*********************/
|
||||
@ -330,7 +333,7 @@ int ObPhysicalCopyTask::fetch_macro_block_(
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected task_idx_", K(ret), K(task_idx_));
|
||||
} else if (OB_FAIL(index_block_rebuilder.init(
|
||||
*copy_ctx_->sstable_index_builder_, copy_ctx_->need_sort_macro_meta_, &task_idx_))) {
|
||||
*copy_ctx_->sstable_index_builder_, copy_ctx_->need_sort_macro_meta_, &task_idx_, copy_ctx_->table_key_.is_ddl_merge_sstable()))) {
|
||||
LOG_WARN("failed to init index block rebuilder", K(ret), K(copy_table_key_));
|
||||
} else if (OB_FAIL(get_macro_block_reader_(reader))) {
|
||||
LOG_WARN("fail to get macro block reader", K(ret));
|
||||
@ -670,6 +673,7 @@ int ObSSTableCopyFinishTask::init(
|
||||
copy_ctx_.need_sort_macro_meta_ = init_param.need_sort_macro_meta_;
|
||||
copy_ctx_.need_check_seq_ = init_param.need_check_seq_;
|
||||
copy_ctx_.ls_rebuild_seq_ = init_param.ls_rebuild_seq_;
|
||||
copy_ctx_.table_key_ = init_param.sstable_param_->table_key_;
|
||||
macro_range_info_index_ = 0;
|
||||
ls_ = init_param.ls_;
|
||||
sstable_param_ = init_param.sstable_param_;
|
||||
|
@ -51,7 +51,7 @@ struct ObPhysicalCopyCtx
|
||||
TO_STRING_KV(K_(tenant_id), K_(ls_id), K_(tablet_id), K_(src_info), KP_(bandwidth_throttle),
|
||||
KP_(svr_rpc_proxy), K_(is_leader_restore), KP_(restore_base_info),
|
||||
KP_(meta_index_store), KP_(second_meta_index_store), KP_(ha_dag),
|
||||
KP_(sstable_index_builder), KP_(restore_macro_block_id_mgr), K_(need_check_seq), K_(ls_rebuild_seq));
|
||||
KP_(sstable_index_builder), KP_(restore_macro_block_id_mgr), K_(need_check_seq), K_(ls_rebuild_seq), K_(table_key));
|
||||
common::SpinRWLock lock_;
|
||||
uint64_t tenant_id_;
|
||||
share::ObLSID ls_id_;
|
||||
@ -69,6 +69,7 @@ struct ObPhysicalCopyCtx
|
||||
bool need_sort_macro_meta_;
|
||||
bool need_check_seq_;
|
||||
int64_t ls_rebuild_seq_;
|
||||
ObITable::TableKey table_key_;
|
||||
DISALLOW_COPY_AND_ASSIGN(ObPhysicalCopyCtx);
|
||||
};
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user