[bugfix]Persisting the iteration results of ObIndexBlockMacroIterator

This commit is contained in:
renju96
2024-02-21 09:45:58 +00:00
committed by ob-robot
parent 7b0674771d
commit 8ec8baeaa2
5 changed files with 148 additions and 48 deletions

View File

@ -496,11 +496,11 @@ ObDDLSStableAllRangeIterator::ObDDLSStableAllRangeIterator()
: is_iter_start_(false),
is_iter_finish_(true),
rowkey_read_info_(nullptr),
cur_rowkey_(nullptr),
cur_header_(nullptr),
index_macro_iter_(),
iter_param_(),
macro_iter_allocator_("DDLMerge_Iter", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID())
cur_index_info_(),
macro_iter_allocator_("DDLMerge_Iter", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
idx_row_allocator_("DDL_IdxRow", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID())
{
}
@ -515,10 +515,10 @@ void ObDDLSStableAllRangeIterator::reset()
is_iter_finish_ = true;
is_iter_start_ = false;
rowkey_read_info_ = nullptr;
cur_rowkey_ = nullptr;
cur_header_ = nullptr;
index_macro_iter_.reset();
cur_index_info_.reset();
macro_iter_allocator_.reset();
idx_row_allocator_.reset();
iter_param_.reset();
}
@ -527,10 +527,10 @@ void ObDDLSStableAllRangeIterator::reuse()
is_iter_finish_ = true;
is_iter_start_ = false;
rowkey_read_info_ = nullptr;
cur_rowkey_ = nullptr;
cur_header_ = nullptr;
index_macro_iter_.reset();
cur_index_info_.reset();
macro_iter_allocator_.reset();
idx_row_allocator_.reset();
iter_param_.reset();
}
@ -633,8 +633,8 @@ int ObDDLSStableAllRangeIterator::check_blockscan(const ObDatumRowkey &rowkey, b
int ObDDLSStableAllRangeIterator::get_current(const ObIndexBlockRowHeader *&idx_row_header,
const ObDatumRowkey *&endkey)
{
endkey = cur_rowkey_;
idx_row_header = cur_header_;
endkey = cur_index_info_.endkey_;
idx_row_header = cur_index_info_.idx_row_header_;
return OB_SUCCESS;
}
@ -660,24 +660,21 @@ int ObDDLSStableAllRangeIterator::get_next(const ObIndexBlockRowHeader *&idx_row
bool is_start_key = false;
bool is_end_key = false;
bool reach_cursor_end = false;
ObMicroIndexInfo tmp_idx_block_row;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("Iter not opened yet", K(ret), KPC(this));
} else if (OB_FAIL(index_macro_iter_.get_next_idx_row(tmp_idx_block_row, row_offset, reach_cursor_end))) {
LOG_WARN("fail to get next idx info", K(ret), KP(endkey), KP(idx_row_header), K(reach_cursor_end), K(index_macro_iter_));
} else if (OB_UNLIKELY(nullptr == tmp_idx_block_row.row_header_ || nullptr == tmp_idx_block_row.endkey_)) {
} else if (OB_FAIL(index_macro_iter_.get_next_idx_row(idx_row_allocator_, cur_index_info_, row_offset, reach_cursor_end))) {
LOG_WARN("fail to get next idx info", K(ret), K(cur_index_info_), K(reach_cursor_end), K(index_macro_iter_));
} else if (OB_UNLIKELY(nullptr == cur_index_info_.idx_row_header_ || nullptr == cur_index_info_.endkey_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected null index block row header/endkey", K(ret), K(tmp_idx_block_row));
LOG_WARN("Unexpected null index block row endkey", K(ret), K(cur_index_info_));
} else {
idx_row_header = tmp_idx_block_row.row_header_;
endkey = tmp_idx_block_row.endkey_;
idx_minor_info = tmp_idx_block_row.minor_meta_info_;
agg_row_buf = tmp_idx_block_row.agg_row_buf_;
agg_buf_size = tmp_idx_block_row.agg_buf_size_;
cur_header_ = idx_row_header;
cur_rowkey_ = endkey;
idx_row_header = cur_index_info_.idx_row_header_;
endkey = cur_index_info_.endkey_;
idx_minor_info = cur_index_info_.idx_minor_info_;
agg_row_buf = cur_index_info_.agg_row_buf_;
agg_buf_size = cur_index_info_.agg_buf_size_;
if (is_iter_start_) {
is_start_key = true;
is_iter_start_ = false;
@ -722,12 +719,12 @@ int ObDDLSStableAllRangeIterator::get_index_row_count(const ObDatumRange &range,
LOG_WARN("tmp all range iter locate range failed", K(ret), K(range));
} else {
bool tmp_reach_cursor_end = false;
ObMicroIndexInfo tmp_idx_block_row;
ObMicroIndexRowItem tmp_index_item;
int64_t tmp_row_offset = 0;
while (OB_SUCC(ret)) {
if (OB_FAIL(tmp_index_macro_iter.get_next_idx_row(tmp_idx_block_row, tmp_row_offset, tmp_reach_cursor_end))) {
if (OB_FAIL(tmp_index_macro_iter.get_next_idx_row(idx_row_allocator_, tmp_index_item, tmp_row_offset, tmp_reach_cursor_end))) {
if (OB_ITER_END != ret) {
LOG_WARN("fail to get next idx info", K(ret), K(tmp_idx_block_row), K(tmp_reach_cursor_end), K(tmp_index_macro_iter));
LOG_WARN("fail to get next idx info", K(ret), K(tmp_index_item), K(tmp_reach_cursor_end), K(tmp_index_macro_iter));
} else {
ret = OB_SUCCESS;
break;
@ -1842,20 +1839,20 @@ int ObDDLMergeBlockRowIterator::MergeIndexItem::init(ObIAllocator *allocator,
}
if (OB_FAIL(ret) || OB_ISNULL(idx_minor_info)) {
} else if (OB_ISNULL(minor_info_buf = item_allocator_->alloc(sizeof(agg_row_buf)))) {
} else if (OB_ISNULL(minor_info_buf = item_allocator_->alloc(sizeof(ObIndexBlockRowMinorMetaInfo)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret), K(sizeof(ObIndexBlockRowMinorMetaInfo)));
} else if (FALSE_IT(idx_minor_info = new (minor_info_buf) ObIndexBlockRowMinorMetaInfo())) {
} else if (FALSE_IT(idx_minor_info_ = new (minor_info_buf) ObIndexBlockRowMinorMetaInfo())) {
} else {
*idx_minor_info_ = *idx_minor_info;
}
if (OB_FAIL(ret) || OB_ISNULL(agg_row_buf)) {
} else if (OB_ISNULL(agg_buf = item_allocator_->alloc(STRLEN(agg_row_buf) + 1))) { //+1 for null
} else if (OB_ISNULL(agg_buf = item_allocator_->alloc(agg_buf_size))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret), K(STRLEN(agg_row_buf)));
LOG_WARN("allocate memory failed", K(ret), K(agg_buf_size));
} else {
MEMCPY(agg_buf, agg_row_buf, STRLEN(agg_row_buf) + 1);
MEMCPY(agg_buf, agg_row_buf, agg_buf_size);
agg_row_buf_ = reinterpret_cast<char *>(agg_buf);
}
}

View File

@ -123,17 +123,17 @@ public:
virtual void reuse() override;
virtual void reset() override;
INHERIT_TO_STRING_KV("base iterator:", ObIndexBlockRowIterator, "format:", "ObDDLSStableAllRangeIterator", K(is_iter_start_), K(is_iter_finish_),
KPC(rowkey_read_info_), K(index_macro_iter_), K(iter_param_), KP(cur_rowkey_), KP(cur_header_));
KPC(rowkey_read_info_), K(index_macro_iter_), K(iter_param_), K(cur_index_info_));
private:
bool is_iter_start_;
bool is_iter_finish_;
const ObITableReadInfo *rowkey_read_info_;
const blocksstable::ObDatumRowkey *cur_rowkey_;
const blocksstable::ObIndexBlockRowHeader *cur_header_;
ObIndexBlockMacroIterator index_macro_iter_;
ObIndexBlockIterParam iter_param_;
ObMicroIndexRowItem cur_index_info_;
ObArenaAllocator macro_iter_allocator_;
ObArenaAllocator idx_row_allocator_;
};
// for empty ddl_merge_sstable

View File

@ -35,6 +35,78 @@ void ObMacroBlockDesc::reuse()
is_deleted_ = false;
}
int ObMicroIndexRowItem::init(ObIAllocator &allocator,
const ObIndexBlockRowHeader *idx_row_header,
const ObDatumRowkey *endkey,
const ObIndexBlockRowMinorMetaInfo *idx_minor_info,
const char *agg_row_buf,
const int64_t agg_buf_size)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(idx_row_header) || OB_ISNULL(endkey)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid arguemen", K(ret), KP(idx_row_header), KP(endkey), KP(idx_minor_info), KP(agg_row_buf));
} else {
allocator_ = &allocator;
endkey_ = endkey; // already deep_copied outside
agg_buf_size_ = agg_buf_size;
void *header_buf = nullptr;
if (OB_ISNULL(header_buf = allocator_->alloc(sizeof(ObIndexBlockRowHeader)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret), K(sizeof(ObIndexBlockRowHeader)));
} else if (FALSE_IT(idx_row_header_ = new (header_buf) ObIndexBlockRowHeader())) {
} else {
*idx_row_header_ =*idx_row_header;
}
void *minor_info_buf = nullptr;
if (OB_FAIL(ret) || OB_ISNULL(idx_minor_info)) {
} else if (OB_ISNULL(minor_info_buf = allocator_->alloc(sizeof(ObIndexBlockRowMinorMetaInfo)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret), K(sizeof(ObIndexBlockRowMinorMetaInfo)));
} else if (FALSE_IT(idx_minor_info_ = new (minor_info_buf) ObIndexBlockRowMinorMetaInfo())) {
} else {
*idx_minor_info_ = *idx_minor_info;
}
void *agg_buf = nullptr;
if (OB_FAIL(ret) || OB_ISNULL(agg_row_buf)) {
} else if (OB_ISNULL(agg_buf = allocator_->alloc(agg_buf_size))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret), K(agg_buf_size));
} else {
MEMCPY(agg_buf, agg_row_buf, agg_buf_size);
agg_row_buf_ = reinterpret_cast<char *>(agg_buf);
}
}
return ret;
}
void ObMicroIndexRowItem::reset()
{
int ret = OB_SUCCESS;
endkey_ = nullptr;
agg_buf_size_ = 0;
if (OB_NOT_NULL(allocator_)) {
if (OB_NOT_NULL(idx_row_header_)){
idx_row_header_->~ObIndexBlockRowHeader();
allocator_->free(idx_row_header_);
idx_row_header_ = nullptr;
}
if (OB_NOT_NULL(idx_minor_info_)){
idx_minor_info_->~ObIndexBlockRowMinorMetaInfo();
allocator_->free(idx_minor_info_);
idx_minor_info_ = nullptr;
}
if (OB_NOT_NULL(agg_row_buf_)){
allocator_->free(agg_row_buf_);
agg_row_buf_ = nullptr;
}
}
allocator_ = nullptr;
}
ObIndexBlockMacroIterator::ObIndexBlockMacroIterator()
: sstable_(nullptr), iter_range_(nullptr),
tree_cursor_(), allocator_(nullptr),
@ -220,11 +292,15 @@ int ObIndexBlockMacroIterator::deep_copy_rowkey(const ObDatumRowkey &src_key, Ob
return ret;
}
int ObIndexBlockMacroIterator::get_next_idx_row(ObMicroIndexInfo &idx_block_row, int64_t &row_offset, bool &reach_cursor_end)
int ObIndexBlockMacroIterator::get_next_idx_row(ObIAllocator &item_allocator, ObMicroIndexRowItem &macro_index_item, int64_t &row_offset, bool &reach_cursor_end)
{
int ret = OB_SUCCESS;
row_offset = 0;
const ObIndexBlockRowHeader *idx_row_header = nullptr;
const ObIndexBlockRowParser *idx_row_parser = nullptr;
const ObIndexBlockRowMinorMetaInfo *minor_meta_info = nullptr;
const char *agg_row_buf = nullptr;
int64_t agg_buf_size = 0;
MacroBlockId macro_id;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
@ -238,9 +314,9 @@ int ObIndexBlockMacroIterator::get_next_idx_row(ObMicroIndexInfo &idx_block_row,
} else if (OB_ISNULL(idx_row_parser)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected null idx_row_parser", K(ret));
} else if (OB_FAIL(idx_row_parser->get_header(idx_block_row.row_header_))) {
} else if (OB_FAIL(idx_row_parser->get_header(idx_row_header))) {
LOG_WARN("Fail to get idx row header", K(ret), KPC(idx_row_parser));
} else if (OB_ISNULL(idx_block_row.row_header_)) {
} else if (OB_ISNULL(idx_row_header)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected null index row header", K(ret));
} else {
@ -253,13 +329,13 @@ int ObIndexBlockMacroIterator::get_next_idx_row(ObMicroIndexInfo &idx_block_row,
// traverse all node of this macro block and collect index info
ObDatumRowkey rowkey;
row_offset = idx_row_parser->get_row_offset();
if (idx_block_row.row_header_->is_data_index() && !idx_block_row.row_header_->is_major_node()) {
if (OB_FAIL(idx_row_parser->get_minor_meta(idx_block_row.minor_meta_info_))) {
if (idx_row_header->is_data_index() && !idx_row_header->is_major_node()) {
if (OB_FAIL(idx_row_parser->get_minor_meta(minor_meta_info))) {
LOG_WARN("Fail to get minor meta info", K(ret));
}
} else if (!idx_block_row.row_header_->is_major_node() || !idx_block_row.row_header_->is_pre_aggregated()) {
} else if (!idx_row_header->is_major_node() || !idx_row_header->is_pre_aggregated()) {
// Do not have aggregate data
} else if (OB_FAIL(idx_row_parser->get_agg_row(idx_block_row.agg_row_buf_, idx_block_row.agg_buf_size_))) {
} else if (OB_FAIL(idx_row_parser->get_agg_row(agg_row_buf, agg_buf_size))) {
LOG_WARN("Fail to get aggregate", K(ret));
}
@ -285,6 +361,10 @@ int ObIndexBlockMacroIterator::get_next_idx_row(ObMicroIndexInfo &idx_block_row,
LOG_WARN("Fail to get current endkey", K(ret), K_(tree_cursor));
} else if (OB_FAIL(deep_copy_rowkey(rowkey, curr_key_, curr_key_buf_))) {
STORAGE_LOG(WARN, "Failed to save curr key", K(ret), K(rowkey));
} else if (FALSE_IT(macro_index_item.reset())) {
} else if (FALSE_IT(item_allocator.reuse())) {
} else if (OB_FAIL(macro_index_item.init(item_allocator, idx_row_header, &curr_key_, minor_meta_info, agg_row_buf, agg_buf_size))) {
STORAGE_LOG(WARN, "Failed to init macro index item", K(ret), K(macro_index_item));
} else if (OB_FAIL(tree_cursor_.move_forward(is_reverse_scan_))) {
if (OB_LIKELY(OB_ITER_END == ret)) {
is_iter_end_ = true;
@ -294,7 +374,6 @@ int ObIndexBlockMacroIterator::get_next_idx_row(ObMicroIndexInfo &idx_block_row,
}
}
if (OB_SUCC(ret)) {
idx_block_row.endkey_ = &curr_key_;
reach_cursor_end = is_iter_end_;
}
}

View File

@ -62,6 +62,35 @@ struct ObMacroBlockDesc
K_(row_count_delta), K_(contain_uncommitted_row), K_(is_deleted));
};
// ObIndexBlockMacroIterator cannot guarantee the life of previous iteration products after move_forward()
// todo @qilu: refine after delete ObIndexBlockMacroIterator
struct ObMicroIndexRowItem final
{
public:
ObMicroIndexRowItem() : allocator_(nullptr), endkey_(nullptr), idx_row_header_(nullptr),
idx_minor_info_(nullptr), agg_row_buf_(nullptr), agg_buf_size_(0) {}
~ObMicroIndexRowItem()
{
reset();
}
int init(ObIAllocator &allocator,
const ObIndexBlockRowHeader *idx_row_header,
const ObDatumRowkey *endkey,
const ObIndexBlockRowMinorMetaInfo *idx_minor_info,
const char *agg_row_buf,
const int64_t agg_buf_size);
void reset();
TO_STRING_KV(KPC(idx_row_header_), KPC(endkey_), KPC(idx_minor_info_), K(agg_row_buf_), K(agg_buf_size_), KP(allocator_));
public:
ObIAllocator *allocator_;
const blocksstable::ObDatumRowkey *endkey_;
ObIndexBlockRowHeader *idx_row_header_;
ObIndexBlockRowMinorMetaInfo *idx_minor_info_;
char *agg_row_buf_; //max 1024
int64_t agg_buf_size_;
};
class ObIMacroBlockIterator
{
public:
@ -99,7 +128,7 @@ public:
const bool need_record_micro_info = false) override;
int get_next_macro_block(MacroBlockId &macro_block_id, int64_t &start_row_offset);
int get_next_macro_block(blocksstable::ObMacroBlockDesc &block_desc);
int get_next_idx_row(ObMicroIndexInfo &idx_block_row, int64_t &row_offset, bool &reach_cursor_end);
int get_next_idx_row(ObIAllocator &item_allocator, ObMicroIndexRowItem &macro_index_item, int64_t &row_offset, bool &reach_cursor_end);
int get_cs_range(
const ObITableReadInfo &rowkey_read_info,
const bool is_start,

View File

@ -1123,15 +1123,10 @@ int ObDirectLoadSliceWriter::fill_column_group(const ObStorageSchema *storage_sc
LOG_WARN("get next row failed", K(ret));
}
} else {
if (OB_NOT_NULL(insert_monitor)) {
insert_monitor->inserted_cg_row_cnt_ += 1;
}
if (OB_FAIL(cur_writer->append_row(stored_row))) {
LOG_WARN("append row failed", K(ret), KPC(stored_row));
} else {
if (OB_NOT_NULL(insert_monitor)) {
insert_monitor->inserted_cg_row_cnt_ += 1;
}
} else if (OB_NOT_NULL(insert_monitor)) {
insert_monitor->inserted_cg_row_cnt_ += 1;
}
}
}