/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX STORAGE #include "ob_admin_dumpsst_utils.h" #include "share/ob_define.h" #include "lib/utility/utility.h" #include "lib/oblog/ob_log_module.h" #include "lib/compress/ob_compressor.h" #include "lib/compress/ob_compressor_pool.h" #include "storage/blocksstable/ob_row_reader.h" #include "storage/ob_i_store.h" namespace oceanbase { using namespace common; using namespace share; using namespace serialization; using namespace blocksstable; using namespace storage; namespace tools { int parse_string(const char* src, const char del, const char* dst[], int64_t& size) { int ret = OB_SUCCESS; int64_t obj_index = 0; char* str = (char*) ob_malloc(OB_MAX_FILE_NAME_LENGTH, ObModIds::TEST); strncpy(str, src, strlen(src) + 1); char* st_ptr = str; char* et_ptr = NULL; //use for strchar,the fist position of del in src char* last_ptr = str + strlen(str) - 1; //end of the str //skip white space while (0 != *st_ptr && ' ' == *st_ptr) ++st_ptr; //parse rowkey_str from st_str division by ';' ,except the last item while (NULL != st_ptr && NULL != (et_ptr = strchr(st_ptr, del))) { *et_ptr = 0; if (size <= obj_index) { ret = OB_SIZE_OVERFLOW; break; } else { dst[obj_index++] = st_ptr; st_ptr = et_ptr + 1; while (0 != *st_ptr && ' ' == *st_ptr) ++st_ptr; if (size - 1 == obj_index) { dst[obj_index++] = st_ptr; break; } } } if (OB_SUCCESS == ret && 0 != *st_ptr && size > obj_index) { //skip white space of end while (last_ptr > st_ptr && ' ' == *last_ptr) { *last_ptr = 0; --last_ptr; } dst[obj_index++] = st_ptr; } if (OB_SUCC(ret)) { size = obj_index; } return ret; } int parse_table_key(const char *str, ObITable::TableKey &table_key) { int ret = OB_SUCCESS; int pret = 0; int64_t tmp = 0; const int64_t TABLE_KEY_ATTR_CNT = 6; int64_t size = TABLE_KEY_ATTR_CNT; const char *dst[TABLE_KEY_ATTR_CNT]; if (OB_FAIL(parse_string(str, ',', dst, size))) { LOG_WARN("failed to parse string", K(ret)); } if (OB_SUCC(ret)) { // parse table type pret = sscanf(dst[0], "%ld", &tmp); if (pret != 1) { ret = OB_INVALID_ARGUMENT; printf("table type is not valid"); } else { table_key.table_type_ = (ObITable::TableType)tmp; } } if (OB_SUCC(ret)) { if (OB_FAIL(parse_partition_key(dst[1], table_key.pkey_))) { LOG_WARN("failed to parse partition key", K(ret)); } } if (OB_SUCC(ret)) { pret = sscanf(dst[2], "%lu", &table_key.table_id_); if (pret != 1) { ret = OB_INVALID_ARGUMENT; printf("index id is not valid"); } } if (OB_SUCC(ret)) { if (OB_FAIL(parse_version_range(dst[3], table_key.trans_version_range_))) { LOG_WARN("failed to parse version range"); } } if (OB_SUCC(ret)) { if (OB_FAIL(parse_log_ts_range(dst[4], table_key.log_ts_range_))) { LOG_WARN("failed to parse log ts range"); } } if (OB_SUCC(ret)) { pret = sscanf(dst[5], "%ld", &tmp); if (pret != 1) { ret = OB_INVALID_ARGUMENT; printf("major version is not valid"); } else { table_key.version_.version_ = tmp; } } if (OB_SUCC(ret)) { LOG_INFO("success to parse table key", K(table_key)); } return ret; } int parse_partition_key(const char *str, ObPartitionKey &pkey) { int ret = OB_SUCCESS; int pret = 0; int64_t size = 2; const char *dst[2]; uint64_t table_id = 0; int64_t part_id = 0; if (OB_FAIL(parse_string(str, ':', dst, size))) { STORAGE_LOG(ERROR, "failed to parse string", K(ret)); } else if (2 != size) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(ERROR, "table id and partition id should be provided", K(ret), K(str)); } if (OB_SUCC(ret)) { pret = sscanf(dst[0], "%lu", &table_id); if (pret != 1) { ret = OB_INVALID_ARGUMENT; printf("table id is not valid"); } } if (OB_SUCC(ret)) { pret = sscanf(dst[1], "%ld", &part_id); if (pret != 1) { ret = OB_INVALID_ARGUMENT; printf("partition id is not valid"); } } if (OB_SUCC(ret)) { if (OB_FAIL(pkey.init(table_id, part_id, 0))) { STORAGE_LOG(ERROR, "faile to init pkey", K(ret), K(table_id), K(part_id)); } } return ret; } int parse_version_range(const char *str, ObVersionRange &version_range) { int ret = OB_SUCCESS; int pret = 0; int64_t size = 3; const char *dst[3]; if (OB_FAIL(parse_string(str, ':', dst, size))) { STORAGE_LOG(ERROR, "failed to parse string", K(ret)); } else if (3 != size) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(ERROR, "version range has three member variables", K(ret), K(str)); } if (OB_SUCC(ret)) { pret = sscanf(dst[0], "%ld", &version_range.base_version_); if (pret != 1) { ret = OB_INVALID_ARGUMENT; printf("base version is not valid"); } } if (OB_SUCC(ret)) { pret = sscanf(dst[1], "%ld", &version_range.multi_version_start_); if (pret != 1) { ret = OB_INVALID_ARGUMENT; printf("multi_version_start is not valid"); } } if (OB_SUCC(ret)) { pret = sscanf(dst[2], "%ld", &version_range.snapshot_version_); if (pret != 1) { ret = OB_INVALID_ARGUMENT; printf("snapshot_version is not valid"); } } return ret; } int parse_log_ts_range(const char *str, ObLogTsRange &log_ts_range) { int ret = OB_SUCCESS; int pret = 0; int64_t size = 3; const char *dst[3]; if (OB_FAIL(parse_string(str, ':', dst, size))) { STORAGE_LOG(ERROR, "failed to parse string", K(ret)); } else if (3 != size) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(ERROR, "version range has three member variables", K(ret), K(str)); } if (OB_SUCC(ret)) { pret = sscanf(dst[0], "%ld", &log_ts_range.start_log_ts_); if (pret != 1) { ret = OB_INVALID_ARGUMENT; printf("start_log_ts is not valid"); } } if (OB_SUCC(ret)) { pret = sscanf(dst[1], "%ld", &log_ts_range.end_log_ts_); if (pret != 1) { ret = OB_INVALID_ARGUMENT; printf("end_log_ts is not valid"); } } if (OB_SUCC(ret)) { pret = sscanf(dst[2], "%ld", &log_ts_range.max_log_ts_); if (pret != 1) { ret = OB_INVALID_ARGUMENT; printf("max_log_ts is not valid"); } } return ret; } static const char* obj_type_name[] = { "ObNullType", "ObTinyIntType", "ObSmallIntType", "ObMediumIntType", "ObInt32Type", "ObIntType", "ObUTinyIntType", "ObUSmallIntType", "ObUMediumIntType", "ObUInt32Type", "ObUInt64Type", "ObFloatType", "ObDoubleType", "ObUFloatType", "ObUDoubleType", "ObNumberType", "ObUNumberType", "ObDateTimeType", "ObTimestampType", "ObDateType", "ObTimeType", "ObYearType", "ObVarcharType", "ObCharType", "ObHexStringType", "ObExtendType", "ObUnknowType", "ObTinyTextType", "ObTextType", "ObMediumTextType", "ObLongTextType", "ObBitType"}; int MacroBlock::setup(const char *data, const int64_t size, const int64_t macro_id) { int ret = OB_SUCCESS; int64_t pos = 0; if (OB_ISNULL(data) || OB_UNLIKELY(size <= 0) || OB_UNLIKELY(macro_id < 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), KP(data), K(size), K(macro_id)); } else { data_ = data; size_ = size; macro_id_ = macro_id; if (OB_FAIL(common_header_.deserialize(data_, size_, pos))) { LOG_ERROR("deserialize common header fail", K(ret), KP(data), K(size), K(pos)); } else if (OB_FAIL(common_header_.check_integrity())) { LOG_ERROR("invalid common header", K(ret), K_(common_header)); } } return ret; } int MicroBlock::setup(const char *data, const int64_t size, const int64_t micro_id, const ObColumnMap *column_map, const int64_t row_store_type, const uint16_t *column_id_list, const int64_t column_cnt) { int ret = OB_SUCCESS; if (OB_ISNULL(data) || OB_UNLIKELY(size <= 0) || OB_UNLIKELY(micro_id < 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), KP(data), K(size), K(micro_id)); } else { data_ = data; size_ = size; micro_id_ = micro_id; row_store_type_ = row_store_type; column_map_ = column_map; column_cnt_ = column_cnt; column_id_list_ = column_id_list; } return ret; } int MacroBlockReader::init(const char *data, const int64_t size, const int64_t macro_id, const ObMacroBlockMeta *macro_meta) { int ret = OB_SUCCESS; if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret)); } else if (OB_ISNULL(data) || size <= 0 || macro_id <= 0 || OB_ISNULL(macro_meta)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), KP(data), K(size), K(macro_id), KP(macro_meta)); } else if (OB_FAIL(macro_block_.setup(data, size, macro_id))) { STORAGE_LOG(ERROR, "failed to setup macro block", K(ret), KP(data), K(size), K(macro_id)); } else { macro_meta_ = macro_meta; int64_t pos = 0; // parse headers pos += macro_block_.common_header_.get_serialize_size(); sstable_header_ = reinterpret_cast(macro_block_.data_ + pos); pos += sizeof(ObSSTableMacroBlockHeader); const int64_t column_cnt = sstable_header_->column_count_; column_id_list_ = reinterpret_cast(macro_block_.data_ + pos); pos += sizeof(uint16_t) * column_cnt; column_type_list_ = reinterpret_cast(macro_block_.data_ + pos); pos += sizeof(common::ObObjMeta) * column_cnt; column_checksum_ = reinterpret_cast(macro_block_.data_ + pos); pos += sizeof(int64_t) * column_cnt; if (OB_FAIL(parse_micro_block_index())) { LOG_WARN("failed to parse micro index", K(ret)); } else if (OB_FAIL(build_column_map())) { LOG_WARN("failed to build column map", K(ret)); } else { is_inited_ = true; } } return ret; } void MacroBlockReader::reset() { macro_block_.reset(); macro_meta_ = NULL; sstable_header_ = NULL; column_id_list_ = NULL; column_type_list_ = NULL; column_checksum_ = NULL; curr_micro_block_id_ = -1; curr_micro_block_.reset(); column_map_.reuse(); micro_index_pos_.reset(); micro_index_keys_.reset(); micro_mark_deletions_.reset(); cur_mib_rh_ = NULL; is_inited_ = false; } int64_t MacroBlockReader::count() const { return sstable_header_->micro_block_count_; } int MacroBlockReader::set_index(const int64_t index) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else if (OB_UNLIKELY(index < 0) || OB_UNLIKELY(index >= count())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(index)); } else { curr_micro_block_id_ = index; if (OB_FAIL(set_current_micro_block())) { if (OB_ITER_END != ret) { LOG_WARN("failed to set current micro block", K(ret)); } } } return ret; } int MacroBlockReader::get_value(const MicroBlock *µ_block) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else { micro_block = NULL; if (curr_micro_block_id_ >= micro_index_pos_.count()) { ret = OB_ITER_END; } else { micro_block = &curr_micro_block_; } } return ret; } int64_t MacroBlockReader::get_index() const { return curr_micro_block_id_; } int MacroBlockReader::set_current_micro_block() { int ret = OB_SUCCESS; const char *out = NULL; int64_t outsize = 0; const char* compressor_name = sstable_header_->compressor_name_; if (curr_micro_block_id_ >= micro_index_pos_.count()) { ret = OB_ITER_END; } else { ObPosition datapos = micro_index_pos_.at(curr_micro_block_id_); if (OB_FAIL(get_micro_block_payload_buffer(compressor_name, macro_block_.data_ + sstable_header_->micro_block_data_offset_ + datapos.offset_, datapos.length_, out, outsize, cur_mib_rh_))) { LOG_WARN("failed to read micro block", K(ret), K(curr_micro_block_id_), K(macro_block_.macro_id_)); } else { //cur_mib_payload_ = out; //cur_mib_payload_size_ = outsize; curr_micro_block_.reset(); if (OB_FAIL(curr_micro_block_.setup(out, outsize, curr_micro_block_id_, &column_map_, sstable_header_->row_store_type_, column_id_list_, sstable_header_->column_count_))) { LOG_WARN("failed to setup curr micro block", K(ret), K(curr_micro_block_id_)); } } } return ret; } int MacroBlockReader::get_micro_block_payload_buffer(const char* compressor_name, const char* buf, const int64_t size, const char*& out, int64_t& outsize, const blocksstable::ObRecordHeaderV3 *&rh) { int ret = OB_SUCCESS; int64_t pos = 0; int64_t max_size = size; char *uncompressed_buf = NULL; rh = reinterpret_cast(buf); pos += sizeof(ObRecordHeaderV3); if (rh->data_length_ != rh->data_zlength_) { ObCompressor* compressor = NULL; if (OB_FAIL(ObCompressorPool::get_instance().get_compressor(compressor_name, compressor))) { LOG_WARN("get compressor failed", K(ret)); } else if (OB_FAIL(compressor->get_max_overflow_size(rh->data_length_, max_size))) { LOG_WARN("get max overflow size failed", K(ret)); } else if ((NULL == uncompressed_buf) && (NULL == (uncompressed_buf = reinterpret_cast(ob_malloc(max_size + rh->data_length_, ObModIds::OB_SSTABLE_AIO))))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc memory for uncompress buf", K(ret), K(size)); } else if (OB_FAIL(compressor->decompress(buf + pos, rh->data_zlength_, uncompressed_buf, max_size + rh->data_length_, outsize))) { LOG_WARN("failed to decompress data", K(ret), K(rh->data_zlength_), K(rh->data_length_), K(max_size)); } else if (outsize != rh->data_length_) { ret = OB_ERROR; LOG_WARN("decompress error", K(ret), "expect_size", rh->data_length_, "real_size", outsize); } else { out = uncompressed_buf; } } else { out = reinterpret_cast(buf) + pos; outsize = rh->data_length_; } return ret; } int MacroBlockReader::parse_micro_block_index() { int ret = OB_SUCCESS; const char *index_ptr = NULL; int32_t index_size = 0; const char* endkey_ptr = NULL; blocksstable::ObPosition data_pos; blocksstable::ObPosition key_pos; micro_index_pos_.reuse(); micro_index_keys_.reuse(); micro_mark_deletions_.reuse(); index_ptr = macro_block_.data_ + sstable_header_->micro_block_index_offset_; index_size = sstable_header_->micro_block_index_size_; UNUSED(index_size); endkey_ptr = macro_block_.data_ + sstable_header_->micro_block_endkey_offset_; UNUSED(endkey_ptr); for (int32_t i = 0; OB_SUCC(ret) && i < sstable_header_->micro_block_count_; ++i) { parse_one_micro_block_index(index_ptr, i, data_pos, key_pos); if (OB_FAIL(micro_index_pos_.push_back(data_pos))) { LOG_WARN("failed to push back micro index pos", K(ret), K(data_pos)); } else if (OB_FAIL(micro_index_keys_.push_back(key_pos))) { LOG_WARN("failed to push back micro index key", K(ret), K(key_pos)); } } if (OB_SUCC(ret)) { const char *deletion_ptr = macro_block_.data_ + macro_meta_->micro_block_mark_deletion_offset_; for (int64_t i = 0; OB_SUCC(ret) && i < sstable_header_->micro_block_count_; ++i) { uint8_t mark_deletion = deletion_ptr[i]; LOG_INFO("micro block mark deletion info", K(mark_deletion), K(i)); if (OB_FAIL(micro_mark_deletions_.push_back(mark_deletion > 0 ? true : false))) { LOG_WARN("fail to push back micro mark deletion", K(ret)); } } } return ret; } void MacroBlockReader::parse_one_micro_block_index(const char* index_ptr, const int64_t idx, ObPosition& datapos, ObPosition& keypos) { int32_t off1[2]; int32_t off2[2]; off1[0] = *reinterpret_cast(index_ptr + sizeof(int32_t) * 2 * idx); off1[1] = *reinterpret_cast(index_ptr + sizeof(int32_t) * 2 * idx + sizeof(int32_t)); off2[0] = *reinterpret_cast(index_ptr + sizeof(int32_t) * 2 * (idx + 1)); off2[1] = *reinterpret_cast(index_ptr + sizeof(int32_t) * 2 * (idx + 1) + sizeof(int32_t)); datapos.offset_ = off1[0]; datapos.length_ = off2[0] - off1[0]; keypos.offset_ = off1[1]; keypos.length_ = off2[1] - off1[1]; } int MacroBlockReader::build_column_map() { int ret = OB_SUCCESS; hash::ObArrayIndexHashSet column_hash; column_hash.reset(); column_hash.init(column_id_list_); for (int64_t i = 0; OB_SUCC(ret) && i < sstable_header_->column_count_; ++i) { if (OB_FAIL(column_hash.set_index(i))) { LOG_WARN("column hash set index failed", K(ret), K(i)); } } uint64_t idx = 0; if (OB_SUCC(ret)) { column_map_.reuse(); ObArray out_cols; ObArray projector; for (int64_t i = 0; OB_SUCC(ret) && i < sstable_header_->column_count_; ++i) { ret = column_hash.get_index(static_cast(column_id_list_[i]), idx); if (OB_ENTRY_NOT_EXIST == ret) { idx = -1; ret = OB_SUCCESS; } share::schema::ObColDesc col; col.col_id_ = column_id_list_[i]; col.col_type_ = column_type_list_[i]; if (OB_FAIL(ret)) { } else if (OB_FAIL(out_cols.push_back(col))) { LOG_WARN("fail to add column to out column array", K(ret), K(col)); } else if (OB_FAIL(projector.push_back(static_cast(idx)))) { LOG_WARN("fail to add index to projector", K(ret), K(idx)); } } if (OB_FAIL(ret)) { } else if (OB_FAIL(column_map_.init(allocator_, macro_meta_->schema_version_, sstable_header_->rowkey_column_count_, sstable_header_->column_count_, out_cols, nullptr, &projector))) { LOG_WARN("failed to init column map", K(ret), K(macro_meta_->schema_version_), K(sstable_header_->rowkey_column_count_), K(sstable_header_->column_count_), K(out_cols), K(projector)); } } return ret; } int MacroBlockReader::dump_header() { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else { dump_common_header(¯o_block_.common_header_); dump_sstable_header(sstable_header_); // dump column id, type, checksum list; if (sstable_header_->column_count_ > 0) { PrintHelper::print_dump_cols_info_start("column_id", "column_type", "column_checksum", "collation type"); for (int64_t i = 0; i < sstable_header_->column_count_; ++i) { PrintHelper::print_dump_cols_info_line(*(column_id_list_ + i), obj_type_name[(column_type_list_ + i)->get_type()], *(column_checksum_ + i), (column_type_list_ + i)->get_collation_type()); } } PrintHelper::print_end_line(); // dump micro_block_index; ObObj objs[sstable_header_->rowkey_column_count_]; const char *endkey_ptr = macro_block_.data_ + sstable_header_->micro_block_endkey_offset_; int64_t endkey_ptr_pos = 0; for (int32_t i = 0; i < sstable_header_->micro_block_count_; ++i) { parse_micro_block_key(endkey_ptr, endkey_ptr_pos, endkey_ptr_pos + micro_index_keys_.at(i).length_, column_type_list_, objs, sstable_header_->rowkey_column_count_); ObRowkey endkey(objs, sstable_header_->rowkey_column_count_); dump_micro_index(endkey, micro_index_pos_.at(i), micro_index_keys_.at(i), micro_mark_deletions_.at(i), i); } } return ret; } int MacroBlockReader::parse_micro_block_key(const char* buf, int64_t& pos, int64_t row_end_pos, const ObObjMeta* type_list, ObObj* objs, int32_t rowkey_count) { int ret = OB_SUCCESS; ObFlatRowReader reader; ObNewRow row; row.cells_ = objs; row.count_ = rowkey_count; if (OB_SUCC(ret)) { if (OB_SUCCESS != (ret = reader.read_compact_rowkey(type_list, rowkey_count, buf /*+ pos*/, row_end_pos, pos, row))) { fprintf(stderr, "fail to read row." "ret = %d, pos = %ld, end_pos = %ld\n", ret, pos, row_end_pos); } else { pos = row_end_pos; } } return ret; } void MacroBlockReader::dump_micro_index(const ObRowkey& endkey, const blocksstable::ObPosition& datapos, const blocksstable::ObPosition& keypos, const bool mark_deletion, int32_t num) { PrintHelper::print_dump_title("Micro Index", num * 1L, 1); PrintHelper::print_dump_line("endkey", to_cstring(endkey)); PrintHelper::print_dump_line("datapos offset", datapos.offset_); PrintHelper::print_dump_line("datapos length", datapos.length_); PrintHelper::print_dump_line("keypos offset", keypos.offset_); PrintHelper::print_dump_line("keypos length", keypos.length_); PrintHelper::print_dump_line("mark deletion", mark_deletion); PrintHelper::print_end_line(); } int MacroBlockReader::dump(const int64_t micro_id) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else if (OB_FAIL(dump_header())) { LOG_WARN("failed to dump header", K(ret)); } else if (micro_id < ALL_MINOR_INDEX) { LOG_WARN("invalid micro id", K(ret), K(micro_id)); } else { const MicroBlock *micro_block = NULL; MicroBlockReader micro_reader; for (int64_t i = 0; OB_SUCC(ret) && i < count(); ++i) { if (micro_id == ALL_MINOR_INDEX || micro_id == i) { micro_block = NULL; micro_reader.reset(); if (OB_FAIL(set_index(i))) { LOG_WARN("failed to set index", K(ret), K(i)); } else if (OB_FAIL(get_value(micro_block))) { LOG_WARN("failed to get micro block", K(ret)); } else if (OB_ISNULL(micro_block)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("micro block is NULL", K(ret)); } else if (OB_FAIL(micro_reader.init(*micro_block))) { LOG_WARN("failed to setup micro reader", K(ret)); } else if (OB_FAIL(micro_reader.dump(-1))) { LOG_WARN("dump micro block failed", K(ret)); } } } } return ret; } int MicroBlockReader::init(const MicroBlock µ_block) { int ret = OB_SUCCESS; if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret)); } else if (OB_UNLIKELY(!micro_block.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(micro_block)); } else { micro_block_ = micro_block; if (FLAT_ROW_STORE == micro_block_.row_store_type_) { micro_reader_ = &flat_micro_reader_; } else { ret = OB_ERR_UNEXPECTED; LOG_WARN("unexpected row store type", K(ret), "type", micro_block_.row_store_type_); } if (OB_SUCC(ret)) { if (OB_FAIL(micro_reader_->init(micro_block))) { LOG_WARN("failed to init micro reader", K(ret), K(micro_block)); } else { is_inited_ = true; } } } return ret; } int64_t MicroBlockReader::count() const { return micro_reader_ == NULL ? 0 : micro_reader_->count(); } int MicroBlockReader::set_index(const int64_t index) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else if (OB_FAIL(micro_reader_->set_index(index))) { LOG_WARN("failed to set index", K(ret), K(index)); } return ret; } int64_t MicroBlockReader::get_index() const { return micro_reader_ == NULL ? -1 : micro_reader_->get_index(); } int MicroBlockReader::get_value(const storage::ObStoreRow *&value) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else if (OB_FAIL(micro_reader_->get_value(value))) { LOG_WARN("failed to get value", K(ret)); } return ret; } void MicroBlockReader::reset() { micro_block_.reset(); micro_reader_ = NULL; flat_micro_reader_.reset(); is_inited_ = false; } int MicroBlockReader::dump(const int64_t) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else if (OB_FAIL(micro_reader_->dump(-1))) { LOG_WARN("failed to dump", K(ret)); } return ret; } int FlatMicroBlockReader::init(const MicroBlock µ_block) { int ret = OB_SUCCESS; if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("init twice", K(ret)); } else if (OB_UNLIKELY(!micro_block.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(micro_block)); } else { micro_block_ = micro_block; micro_block_header_ = reinterpret_cast(micro_block_.data_); index_buffer_ = reinterpret_cast(micro_block_.data_ + micro_block_header_->row_index_offset_); curr_row_.row_val_.cells_ = columns_; curr_row_.row_val_.count_ = micro_block_header_->column_count_; is_inited_ = true; } return ret; } int64_t FlatMicroBlockReader::count() const { return micro_block_header_->row_count_; } int FlatMicroBlockReader::set_index(const int64_t index) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else if (OB_UNLIKELY(index < 0) || OB_UNLIKELY(index >= count())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(index)); } else if (index >= count()) { ret = OB_ITER_END; } else { curr_row_index_ = index; } return ret; } int64_t FlatMicroBlockReader::get_index() const { return curr_row_index_; } int FlatMicroBlockReader::get_value(const storage::ObStoreRow *&row) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else { int64_t pos = micro_block_header_->header_size_ + *(index_buffer_ + curr_row_index_); int64_t end_pos = micro_block_header_->header_size_ + *(index_buffer_ + curr_row_index_ + 1); if (curr_row_index_ >= micro_block_header_->row_count_ || end_pos > micro_block_.size_) { ret = OB_ITER_END; } else { reader_.reuse_allocator(); if (OB_FAIL( reader_.read_row( micro_block_.data_, end_pos, pos, *micro_block_.column_map_, allocator_, curr_row_))) { LOG_WARN("failed to read meta row", K(ret), K(pos), K(end_pos)); } else { row = &curr_row_; } } } return ret; } int FlatMicroBlockReader::dump(const int64_t) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else { const ObStoreRow *row = NULL; dump_micro_header(micro_block_header_); PrintHelper::print_dump_title("Micro Block", micro_block_.micro_id_, 1); PrintHelper::print_dump_title("Total Rows", count(), 1); for (int64_t i = 0; OB_SUCC(ret) && i < count(); ++i) { if (OB_FAIL(set_index(i))) { LOG_WARN("failed to set index", K(ret), K(i)); } else if (OB_FAIL(get_value(row))) { LOG_WARN("failed to get micro block", K(ret)); } else if (OB_ISNULL(row)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("row is NULL", K(ret)); } else { PrintHelper::print_row_title(false /*use csv*/, row, i); dump_row(row); } } } return ret; } void FlatMicroBlockReader::reset() { micro_block_.reset(); micro_block_header_ = NULL; index_buffer_ = NULL; curr_row_index_ = 0; curr_row_.row_val_.count_ = 0; is_inited_ = false; } void FlatMicroBlockReader::dump_micro_header(const ObMicroBlockHeader *micro_block_header) { PrintHelper::print_dump_title("Micro Header"); PrintHelper::print_dump_line("header_size", micro_block_header->header_size_); PrintHelper::print_dump_line("version", micro_block_header->version_); PrintHelper::print_dump_line("magic", micro_block_header->magic_); PrintHelper::print_dump_line("attr", micro_block_header->attr_); PrintHelper::print_dump_line("column_count", micro_block_header->column_count_); PrintHelper::print_dump_line("row_index_offset", micro_block_header->row_index_offset_); PrintHelper::print_dump_line("row_count", micro_block_header->row_count_); PrintHelper::print_end_line(); } int ObDumpsstPartitionImage::init() { int ret = OB_SUCCESS; if (OB_FAIL(SLOGGER.register_redo_module(OB_REDO_LOG_PARTITION, this))) { STORAGE_LOG(WARN, "failed to register_redo_module", K(ret)); } return ret; } ObDumpsstPartitionImage &ObDumpsstPartitionImage::get_instance() { static ObDumpsstPartitionImage pi; return pi; } } }