/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX STORAGE #include "storage/blocksstable/ob_micro_block_cache.h" #include "storage/blocksstable/ob_block_manager.h" #include "storage/blocksstable/ob_macro_block_handle.h" #include "storage/blocksstable/ob_shared_macro_block_manager.h" #include "storage/blocksstable/ob_macro_block_handle.h" #include "storage/blocksstable/ob_shared_macro_block_manager.h" #include "storage/blocksstable/cs_encoding/ob_cs_micro_block_transformer.h" namespace oceanbase { using namespace common; using namespace storage; namespace blocksstable { /** * -----------------------------------------------------ObMicroBlockCacheKey-------------------------------------------------- */ ObMicroBlockCacheKey::ObMicroBlockCacheKey( const uint64_t tenant_id, const MacroBlockId ¯o_id, const int64_t offset, const int64_t size) : tenant_id_(tenant_id), block_id_(macro_id, offset, size) { } ObMicroBlockCacheKey::ObMicroBlockCacheKey( const uint64_t tenant_id, const ObMicroBlockId &block_id) : tenant_id_(tenant_id), block_id_(block_id) { } ObMicroBlockCacheKey::ObMicroBlockCacheKey() : tenant_id_(common::OB_INVALID_TENANT_ID), block_id_() { } ObMicroBlockCacheKey::ObMicroBlockCacheKey(const ObMicroBlockCacheKey &other) { tenant_id_ = other.tenant_id_; block_id_ = other.block_id_; } ObMicroBlockCacheKey::~ObMicroBlockCacheKey() { } void ObMicroBlockCacheKey::set(const uint64_t tenant_id, const MacroBlockId &block_id, const int64_t offset, const int64_t size) { tenant_id_ = tenant_id; block_id_.macro_id_ = block_id; block_id_.offset_ = offset; block_id_.size_ = size; } bool ObMicroBlockCacheKey::operator ==(const ObIKVCacheKey &other) const { const ObMicroBlockCacheKey &other_key = reinterpret_cast (other); return tenant_id_ == other_key.tenant_id_ && block_id_ == other_key.block_id_; } uint64_t ObMicroBlockCacheKey::get_tenant_id() const { return tenant_id_; } uint64_t ObMicroBlockCacheKey::hash() const { return murmurhash(this, sizeof(ObMicroBlockCacheKey), 0); } int64_t ObMicroBlockCacheKey::size() const { return sizeof(*this); } int ObMicroBlockCacheKey::deep_copy(char *buf, const int64_t buf_len, ObIKVCacheKey *&key) const { int ret = OB_SUCCESS; if (OB_UNLIKELY(NULL == buf || buf_len < size())) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "Invalid argument, ", K(ret)); } else if (OB_UNLIKELY( 0 == tenant_id_ || OB_INVALID_TENANT_ID == tenant_id_ || !block_id_.is_valid())) { ret = OB_INVALID_DATA; STORAGE_LOG(WARN, "The micro block cache key is invalid, ", K(*this), K(ret)); } else { key = new (buf) ObMicroBlockCacheKey(tenant_id_, block_id_); } return ret; } /** * -----------------------------------------------------ObMicroBlockCacheValue-------------------------------------------------- */ ObMicroBlockCacheValue::ObMicroBlockCacheValue() : block_data_(), alloc_by_block_io_(false) { } ObMicroBlockCacheValue::ObMicroBlockCacheValue( const char *buf, const int64_t size, const char *extra_buf /* = NULL */, const int64_t extra_size /* = 0 */, const ObMicroBlockData::Type block_type /* = DATA_BLOCK */) : block_data_(buf, size, extra_buf, extra_size, block_type), alloc_by_block_io_(false) { } ObMicroBlockCacheValue::~ObMicroBlockCacheValue() { } int64_t ObMicroBlockCacheValue::size() const { return sizeof(blocksstable::ObMicroBlockCacheValue) + block_data_.total_size(); } int ObMicroBlockCacheValue::deep_copy(char *buf, const int64_t buf_len, ObIKVCacheValue *&value) const { int ret = OB_SUCCESS; ObMicroBlockCacheValue *pvalue = NULL; if (OB_UNLIKELY(NULL == buf || buf_len < size())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid argument, ", K(ret)); } else if (OB_UNLIKELY(!block_data_.is_valid())) { //buffer_ is allowed to be NULL ret = OB_INVALID_DATA; LOG_WARN("The micro block cache value is not valid, ", K(*this), K(ret)); } else { char *new_buf = buf + sizeof(blocksstable::ObMicroBlockCacheValue); MEMCPY(new_buf, block_data_.get_buf(), block_data_.get_buf_size()); if (NULL != block_data_.get_extra_buf() && block_data_.get_extra_size() > 0) { switch (block_data_.type_) { case ObMicroBlockData::Type::DATA_BLOCK: { MEMCPY(new_buf + block_data_.get_buf_size(), block_data_.get_extra_buf(), block_data_.get_extra_size()); if (block_data_.get_store_type() == ObRowStoreType::CS_ENCODING_ROW_STORE) { // no need update cached coder for CS_ENCODING_ROW_STORE } else if (OB_FAIL(ObMicroBlockDecoder::update_cached_decoders( new_buf + block_data_.get_buf_size(), block_data_.get_extra_size(), block_data_.get_buf(), new_buf, block_data_.get_buf_size()))) { LOG_WARN(" Update cached pointer failed", K(ret), K_(block_data), KP(new_buf)); } break; } case ObMicroBlockData::Type::INDEX_BLOCK: { const ObIndexBlockDataHeader *src_idx_header = reinterpret_cast(block_data_.get_extra_buf()); char *new_extra_buf = new_buf + block_data_.get_buf_size(); const int64_t new_extra_buf_size = buf_len - block_data_.get_buf_size(); ObIndexBlockDataHeader *dst_idx_header = reinterpret_cast(new_extra_buf); int64_t pos = sizeof(ObIndexBlockDataHeader); if (OB_FAIL(dst_idx_header->deep_copy_transformed_index_block( *src_idx_header, new_extra_buf_size, new_extra_buf, pos))) { LOG_WARN("Fail to deep copy transformed index block", K(ret)); } break; } default: ret = OB_NOT_SUPPORTED; LOG_WARN("Not Supported block data type", K(ret), K_(block_data)); } if (OB_SUCC(ret)) { pvalue = new (buf) ObMicroBlockCacheValue( new_buf, block_data_.get_buf_size(), new_buf + block_data_.get_buf_size(), block_data_.get_extra_size(), block_data_.type_); pvalue->alloc_by_block_io_ = alloc_by_block_io_; } } else { pvalue = new (buf) ObMicroBlockCacheValue(new_buf, block_data_.get_buf_size(), nullptr, 0, block_data_.type_); pvalue->alloc_by_block_io_ = alloc_by_block_io_; } value = pvalue; } return ret; } /*---------------------------------Multi Block IO parameters--------------------------------------*/ ObMultiBlockIOResult::ObMultiBlockIOResult() { reset(); } ObMultiBlockIOResult::~ObMultiBlockIOResult() { } void ObMultiBlockIOResult::reset() { micro_blocks_ = NULL; handles_ = NULL; block_count_ = 0; ret_code_ = OB_SUCCESS; } int ObMultiBlockIOResult::get_block_data( const int64_t index, ObMicroBlockData &block_data) const { int ret = OB_SUCCESS; if (OB_SUCCESS != ret_code_) { ret = ret_code_; STORAGE_LOG(WARN, "async process block failed", K(ret)); } else if (NULL == micro_blocks_ || NULL == handles_ || block_count_ <= 0) { ret = OB_INNER_STAT_ERROR; STORAGE_LOG(WARN, "inner stat error", K(ret), KP(micro_blocks_), KP(handles_), K_(block_count)); } else if (index >= block_count_ || index < 0) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(ERROR, "invalid index", K(ret), K(index), K_(block_count)); } else if (NULL == micro_blocks_[index]) { ret = OB_INNER_STAT_ERROR; STORAGE_LOG(WARN, "micro_block is null", K(ret), "handle validity", handles_[index].is_valid(), K(index)); } else { block_data = micro_blocks_[index]->get_block_data(); } return ret; } void ObMultiBlockIOParam::reset() { micro_index_infos_ = nullptr; start_index_ = -1; block_count_ = -1; } bool ObMultiBlockIOParam::is_valid() const { bool is_same_block = false; const bool basic_valid = nullptr != micro_index_infos_ && start_index_ >= 0 && block_count_ > 0 && micro_index_infos_->count() >= start_index_ + block_count_; if (basic_valid) { const ObMicroIndexInfo &first_micro = micro_index_infos_->at(start_index_); const ObMicroIndexInfo &last_micro = micro_index_infos_->at(start_index_ + block_count_ - 1); is_same_block = first_micro.get_macro_id() == last_micro.get_macro_id(); } return basic_valid && is_same_block; } void ObMultiBlockIOParam::get_io_range(int64_t &offset, int64_t &size) const { offset = 0; size = 0; if (block_count_ > 0) { const int64_t end_index = start_index_ + block_count_ - 1; offset = micro_index_infos_->at(start_index_).get_block_offset(); ObMicroIndexInfo &end_micro_index = micro_index_infos_->at(end_index); size = end_micro_index.get_block_offset() - offset + end_micro_index.get_block_size(); } } int ObMultiBlockIOParam::get_block_des_info(ObIMicroBlockIOCallback &io_callback) const { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid multi block io parameter", K(ret), K(*this)); } else { ObMicroIndexInfo &start_info = micro_index_infos_->at(start_index_); io_callback.set_micro_des_meta(start_info.row_header_); } return ret; } void ObMultiBlockIOCtx::reset() { micro_index_infos_ = nullptr; block_count_ = 0; } bool ObMultiBlockIOCtx::is_valid() const { return OB_NOT_NULL(micro_index_infos_) && block_count_ > 0; } /*---------------------------------------ObIMicroBlockIOCallback-------------------------------------*/ ObIMicroBlockIOCallback::ObIMicroBlockIOCallback() : cache_(nullptr), put_size_stat_(nullptr), allocator_(nullptr), data_buffer_(nullptr), tenant_id_(OB_INVALID_TENANT_ID), block_id_(), offset_(0), block_des_meta_(), use_block_cache_(true) { MEMSET(encrypt_key_, 0, sizeof(encrypt_key_)); block_des_meta_.encrypt_key_ = encrypt_key_; static_assert(sizeof(*this) <= CALLBACK_BUF_SIZE, "IOCallback buf size not enough"); } ObIMicroBlockIOCallback::~ObIMicroBlockIOCallback() { if (OB_NOT_NULL(allocator_) && OB_NOT_NULL(data_buffer_)) { allocator_->free(data_buffer_); data_buffer_ = nullptr; } allocator_ = nullptr; } void ObIMicroBlockIOCallback::set_micro_des_meta(const ObIndexBlockRowHeader *idx_row_header) { OB_ASSERT(nullptr != idx_row_header); block_des_meta_.compressor_type_ = idx_row_header->get_compressor_type(); block_des_meta_.encrypt_id_ = idx_row_header->get_encrypt_id(); block_des_meta_.master_key_id_ = idx_row_header->get_master_key_id(); block_des_meta_.row_store_type_ = idx_row_header->get_row_store_type(); MEMCPY(encrypt_key_, idx_row_header->get_encrypt_key(), share::OB_MAX_TABLESPACE_ENCRYPT_KEY_LENGTH); } int ObIMicroBlockIOCallback::alloc_data_buf(const char *io_data_buffer, const int64_t data_size) { //UNUSED NOW int ret = OB_SUCCESS; if (OB_ISNULL(allocator_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Unexpected error, the allocator is NULL, ", KP_(allocator), K(ret)); } else if (OB_UNLIKELY(data_size <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid data buffer size", K(ret), K(data_size)); } else { data_buffer_ = static_cast(allocator_->alloc(data_size)); for (int64_t i = 1; OB_ISNULL(data_buffer_) && i <= ALLOC_BUF_RETRY_TIMES; i++) { ob_usleep(ALLOC_BUF_RETRY_INTERVAL * i); data_buffer_ = static_cast(allocator_->alloc(data_size)); } if (OB_ISNULL(data_buffer_)) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("Fail to allocate memory", K(ret), K_(offset), K(data_size), KP(data_buffer_)); } else { MEMCPY(data_buffer_, io_data_buffer, data_size); } } return ret; } int ObIMicroBlockIOCallback::process_block( ObMacroBlockReader *reader, const char *buffer, const int64_t offset, const int64_t size, const ObMicroBlockCacheValue *µ_block, common::ObKVCacheHandle &cache_handle) { int ret = OB_SUCCESS; ObMicroBlockData block_data; ObMicroBlockHeader header; int64_t pos = 0; int64_t payload_size = 0; const char *payload_buf = nullptr; if (OB_UNLIKELY(NULL == reader || NULL == buffer || offset < 0 || size < 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid arguments", K(ret), KP(reader), KP(buffer), K(offset), K(size)); } else if (OB_FAIL(header.deserialize(buffer, size, pos))) { LOG_ERROR("Fail to deserialize record header", K(ret), K_(block_id), K(offset)); } else if (OB_FAIL(header.check_and_get_record( buffer, size, MICRO_BLOCK_HEADER_MAGIC, payload_buf, payload_size))) { LOG_ERROR("Micro block data is corrupted", K(ret), K_(block_id), K(offset), K(size), K_(tenant_id), KP(buffer), KP(this)); } else { if (OB_UNLIKELY(!use_block_cache_)) { // Won't put in cache if (OB_FAIL(read_block_and_copy(header, *reader, buffer, size, block_data, micro_block, cache_handle))) { LOG_WARN("Fail to read micro block and copy to cache value", K(ret)); } } else { ObIMicroBlockCache::BaseBlockCache *kvcache = nullptr; ObMicroBlockCacheKey key(tenant_id_, block_id_, offset, size); if (OB_FAIL(cache_->get_cache(kvcache))) { LOG_WARN("Fail to get kvcache", K(ret)); } else if (OB_UNLIKELY(OB_SUCCESS == (ret = kvcache->get(key, micro_block, cache_handle)))) { // entry exist, no need to put } else if (OB_FAIL(cache_->put_cache_block( block_des_meta_, buffer, key, *reader, *allocator_, micro_block, cache_handle))) { LOG_WARN("Failed to put block to cache", K(ret)); } } // NOTE: if block has column level compress and don't use kvcache, // the block not be full transformed here and left to be part transformed in // cs micro block decoder. } return ret; } int ObIMicroBlockIOCallback::read_block_and_copy( const ObMicroBlockHeader &header, ObMacroBlockReader &reader, const char *buffer, const int64_t size, ObMicroBlockData &block_data, const ObMicroBlockCacheValue *µ_block, ObKVCacheHandle &handle) { int ret = OB_SUCCESS; block_data.type_ = cache_->get_type(); // only full_transform for index_block with cs_encoding when don't use block cache if (ObMicroBlockData::INDEX_BLOCK == block_data.type_ && ObStoreFormat::is_row_store_type_with_cs_encoding(static_cast(header.row_store_type_))) { if (OB_FAIL(reader.decrypt_and_full_transform_data(header, block_des_meta_, buffer, size, block_data.get_buf(), block_data.get_buf_size(), nullptr))) { LOG_WARN("fail to decrypt_and_full_transform_data", K(ret), K(header), K_(block_des_meta)); } } else { bool is_compressed = false; if (OB_FAIL(reader.do_decrypt_and_decompress_data( header, block_des_meta_, buffer, size, block_data.get_buf(), block_data.get_buf_size(), is_compressed, false/*need deep copy*/, nullptr))) { LOG_WARN("fail to do decrypt and decompress data", K(ret)); } } if (OB_SUCC(ret)) { ObMicroBlockCacheValue value( block_data.get_buf(), block_data.get_buf_size(), nullptr, 0, block_data.type_); value.set_alloc_by_block_io(); char *buf = nullptr; const int64_t buf_len = value.size(); handle.reset(); micro_block = nullptr; ObIKVCacheValue *value_copy = nullptr; if (OB_ISNULL(buf = static_cast(allocator_->alloc(buf_len)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("Failed to allocate value", K(ret), K(buf_len)); } else if (OB_FAIL(value.deep_copy(buf, buf_len, value_copy))) { LOG_WARN("Failed to deep copy value", K(ret)); allocator_->free(buf); } else { micro_block = static_cast(value_copy); } } return ret; } /*-----------------------------------ObAsyncSingleMicroBlockIOCallback-----------------------------------*/ ObAsyncSingleMicroBlockIOCallback::ObAsyncSingleMicroBlockIOCallback() : ObIMicroBlockIOCallback(), micro_block_(nullptr), cache_handle_() { STATIC_ASSERT(sizeof(*this) <= CALLBACK_BUF_SIZE, "IOCallback buf size not enough"); } ObAsyncSingleMicroBlockIOCallback::~ObAsyncSingleMicroBlockIOCallback() { // release micro_block_ outside if (OB_NOT_NULL(allocator_) && OB_NOT_NULL(micro_block_) && !cache_handle_.is_valid()) { allocator_->free(const_cast(micro_block_)); micro_block_ = nullptr; } } int64_t ObAsyncSingleMicroBlockIOCallback::size() const { return sizeof(*this); } int ObAsyncSingleMicroBlockIOCallback::inner_process(const char *data_buffer, const int64_t size) { int ret = OB_SUCCESS; ObTimeGuard time_guard("AsyncSingle_Callback_Process", 100000); //100ms if (OB_ISNULL(cache_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Invalid micro block cache callback, ", KP_(cache), K(ret)); } else if (OB_UNLIKELY(size <= 0 || data_buffer == nullptr)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid data buffer size", K(ret), K(size), KP(data_buffer)); } else { ObMacroBlockReader *reader = nullptr; if (OB_ISNULL(reader = GET_TSI_MULT(ObMacroBlockReader, 1))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("Fail to allocate ObMacroBlockReader, ", K(ret)); } else if (OB_FAIL(process_block(reader, data_buffer, offset_, size, micro_block_, cache_handle_))) { LOG_WARN("process_block failed", K(ret)); } } return ret; } const char *ObAsyncSingleMicroBlockIOCallback::get_data() { return reinterpret_cast(micro_block_); } /*-----------------------------------ObMultiDataBlockIOCallback-----------------------------------*/ ObMultiDataBlockIOCallback::ObMultiDataBlockIOCallback() : ObIMicroBlockIOCallback(), io_ctx_(), io_result_() { STATIC_ASSERT(sizeof(*this) <= CALLBACK_BUF_SIZE, "IOCallback buf size not enough"); } ObMultiDataBlockIOCallback::~ObMultiDataBlockIOCallback() { free_result(); } int64_t ObMultiDataBlockIOCallback::size() const { return sizeof(*this); } int ObMultiDataBlockIOCallback::inner_process(const char *data_buffer, const int64_t size) { int ret = OB_SUCCESS; ObTimeGuard time_guard("MultiData_Callback_Process", 100000); //100ms if (OB_ISNULL(cache_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Invalid micro block cache callback, ", KP_(cache), K(ret)); } else if (OB_UNLIKELY(size <= 0 || data_buffer == nullptr)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid data buffer size", K(ret), K(size), KP(data_buffer)); } else { ObMacroBlockReader *reader = nullptr; if (OB_ISNULL(reader = GET_TSI_MULT(ObMacroBlockReader, 1))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("Fail to allocate ObMacroBlockReader, ", K(ret)); } else if (OB_FAIL(alloc_result())) { LOG_WARN("alloc_result failed", K(ret)); } const int64_t block_count = io_ctx_.block_count_; for (int64_t i = 0; OB_SUCC(ret) && i < block_count; ++i) { const int64_t data_size = io_ctx_.micro_index_infos_[i].get_block_size(); const int64_t data_offset = io_ctx_.micro_index_infos_[i].get_block_offset() - offset_; if (OB_FAIL(process_block( reader, data_buffer + data_offset, offset_ + data_offset, data_size, io_result_.micro_blocks_[i], io_result_.handles_[i]))) { LOG_WARN("process_block failed", K(ret)); } } } if (OB_FAIL(ret)) { io_result_.ret_code_ = ret; } return ret; } const char *ObMultiDataBlockIOCallback::get_data() { const char *data = nullptr; data = reinterpret_cast(&io_result_); return data; } int ObMultiDataBlockIOCallback::set_io_ctx( const ObMultiBlockIOParam &io_param) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!io_param.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid io_param", K(ret), K(io_param)); } else { io_ctx_.micro_index_infos_ = &io_param.micro_index_infos_->at(io_param.start_index_); io_ctx_.block_count_ = io_param.block_count_; } return ret; } int ObMultiDataBlockIOCallback::deep_copy_ctx( const ObMultiBlockIOCtx &io_ctx) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!io_ctx.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid io_ctx", K(ret), K(io_ctx)); } else if (OB_ISNULL(allocator_)) { ret = OB_INNER_STAT_ERROR; LOG_WARN("allocator_ is null", K(ret), KP(allocator_)); } else { void *ptr = nullptr; int64_t alloc_size = sizeof(ObMicroIndexInfo) * io_ctx.block_count_; if (OB_ISNULL(ptr = allocator_->alloc(alloc_size))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("alloc memory failed", K(ret), K(alloc_size)); } else { io_ctx_.micro_index_infos_ = reinterpret_cast(ptr); MEMCPY(io_ctx_.micro_index_infos_, io_ctx.micro_index_infos_, alloc_size); } if (OB_SUCC(ret)) { io_ctx_.block_count_ = io_ctx.block_count_; } } return ret; } int ObMultiDataBlockIOCallback::alloc_result() { int ret = OB_SUCCESS; void *ptr = nullptr; const int64_t block_count = io_ctx_.block_count_; if (OB_ISNULL(ptr = allocator_->alloc(sizeof(ObMicroBlockCacheValue *) * block_count))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("alloc failed", K(ret)); } else { io_result_.micro_blocks_ = reinterpret_cast(ptr); MEMSET(io_result_.micro_blocks_, 0, sizeof(ObMicroBlockCacheValue *) * block_count); } if (OB_SUCC(ret)) { if (OB_ISNULL(ptr = allocator_->alloc(sizeof(ObKVCacheHandle) * block_count))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("alloc failed", K(ret)); } else { io_result_.handles_ = new (ptr) ObKVCacheHandle[block_count]; io_result_.block_count_ = block_count; } } return ret; } void ObMultiDataBlockIOCallback::free_result() { if (OB_NOT_NULL(allocator_)) { if (OB_NOT_NULL(io_result_.micro_blocks_)) { if (OB_NOT_NULL(io_result_.handles_)) { for (int64_t i = 0; i < io_result_.block_count_; ++i) { if (!io_result_.handles_[i].is_valid() && OB_NOT_NULL(io_result_.micro_blocks_[i])) { allocator_->free(const_cast(io_result_.micro_blocks_[i])); } } } allocator_->free(io_result_.micro_blocks_); io_result_.micro_blocks_ = nullptr; } if (OB_NOT_NULL(io_result_.handles_)) { for (int64_t i = 0; i < io_result_.block_count_; ++i) { io_result_.handles_[i].~ObKVCacheHandle(); } allocator_->free(io_result_.handles_); io_result_.handles_ = nullptr; io_result_.block_count_ = 0; } } } /* ---------------------------------------- ObSyncSingleMicroBLockIOCallback ---------------------------------------- */ ObSyncSingleMicroBLockIOCallback::ObSyncSingleMicroBLockIOCallback() : ObIMicroBlockIOCallback(), macro_reader_(nullptr), block_data_(nullptr), is_data_block_(true) { } ObSyncSingleMicroBLockIOCallback::~ObSyncSingleMicroBLockIOCallback() { } int64_t ObSyncSingleMicroBLockIOCallback::size() const { return sizeof(*this); } int ObSyncSingleMicroBLockIOCallback::inner_process(const char *data_buffer, const int64_t size) { int ret = OB_SUCCESS; bool is_compressed = false; ObTimeGuard time_guard("SyncSingle_Callback_Process", 100000); //100ms if (OB_UNLIKELY(size <= 0 || data_buffer == nullptr)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid data buffer size", K(ret), K(size), KP(data_buffer)); } else { if (OB_UNLIKELY(nullptr == macro_reader_ || nullptr == block_data_ || nullptr == allocator_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("Unexpected reader or block data", K(ret), KP(macro_reader_), KP(block_data_), KP_(allocator)); } else { const char *src_block_buf = data_buffer; const int64_t src_buf_size = size; ObMicroBlockHeader header; if (OB_FAIL(header.deserialize_and_check_header(src_block_buf, src_buf_size))) { LOG_WARN("fail to deserialize_and_check_header", K(ret), KP(src_block_buf), K(src_buf_size)); } else if (ObStoreFormat::is_row_store_type_with_cs_encoding(static_cast(header.row_store_type_))) { if (OB_FAIL(macro_reader_->decrypt_and_full_transform_data( header, block_des_meta_, src_block_buf, src_buf_size, block_data_->get_buf(), block_data_->get_buf_size(), allocator_))) { LOG_WARN("fail to decrypt_and_full_transform_data", K(ret), K(header), K(block_des_meta_), K(is_data_block_)); } } else { // not cs_encoding if (OB_FAIL(macro_reader_->do_decrypt_and_decompress_data( header, block_des_meta_, src_block_buf, src_buf_size, block_data_->get_buf(), block_data_->get_buf_size(), is_compressed, true /* need_deep_copy */, allocator_))) { LOG_WARN("Fail to decrypt and decompress micro block data buf", K(ret)); } } } } return ret; } const char *ObSyncSingleMicroBLockIOCallback::get_data() { return reinterpret_cast(block_data_); } /*----------------------------------------ObIMicroBlockCache--------------------------------------*/ int ObIMicroBlockCache::get_cache_block( const uint64_t tenant_id, const MacroBlockId block_id, const int64_t offset, const int64_t size, ObMicroBlockBufferHandle &handle) { int ret = OB_SUCCESS; BaseBlockCache *cache = NULL; if (OB_UNLIKELY(0 == tenant_id || OB_INVALID_TENANT_ID == tenant_id || !block_id.is_valid() || offset < 0 || size <= 0)) { ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid arguments", K(ret), KP(cache), K(tenant_id), K(block_id), K(offset), K(size)); } else if (OB_FAIL(get_cache(cache))) { STORAGE_LOG(WARN, "get_cache failed", K(ret)); } else { ObMicroBlockCacheKey key(tenant_id, block_id, offset, size); if (OB_FAIL(cache->get(key, handle.micro_block_, handle.handle_))) { if (OB_ENTRY_NOT_EXIST != ret) { STORAGE_LOG(WARN, "Fail to get micro block from block cache, ", K(ret)); } } else { EVENT_INC(ObStatEventIds::BLOCK_CACHE_HIT); } } return ret; } int ObIMicroBlockCache::prefetch( const uint64_t tenant_id, const MacroBlockId ¯o_id, const ObMicroIndexInfo& idx_row, const bool use_cache, ObMacroBlockHandle ¯o_handle, ObIAllocator *allocator) { int ret = OB_SUCCESS; const ObIndexBlockRowHeader *idx_header = idx_row.row_header_; if (OB_ISNULL(idx_header)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid null index block row header", K(ret), K(idx_row)); } else if (OB_UNLIKELY(!idx_header->is_valid() || 0 >= idx_header->get_block_size() || nullptr == allocator)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid data index block row header ", K(ret), K(idx_row), KP(allocator)); } else { void *buf = nullptr; ObAsyncSingleMicroBlockIOCallback *callback = nullptr; if (OB_ISNULL(buf = allocator->alloc(sizeof(ObAsyncSingleMicroBlockIOCallback)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate callback memory failed", K(ret)); } else { callback = new (buf) ObAsyncSingleMicroBlockIOCallback; callback->allocator_ = allocator; callback->use_block_cache_ = use_cache; if (OB_FAIL(prefetch(tenant_id, macro_id, idx_row, macro_handle, *callback))) { LOG_WARN("Fail to prefetch data micro block", K(ret)); } if (OB_FAIL(ret) && OB_NOT_NULL(callback->get_allocator())) { //Avoid double_free with io_handle callback->~ObAsyncSingleMicroBlockIOCallback(); allocator->free(callback); } } } return ret; } int ObIMicroBlockCache::prefetch( const uint64_t tenant_id, const MacroBlockId ¯o_id, const ObMicroIndexInfo& idx_row, ObMacroBlockHandle ¯o_handle, ObIMicroBlockIOCallback &callback) { int ret = OB_SUCCESS; const ObIndexBlockRowHeader *idx_row_header = idx_row.row_header_; if (OB_ISNULL(idx_row_header)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret)); } else { // fill callback callback.cache_ = this; callback.put_size_stat_ = this; callback.tenant_id_ = tenant_id; callback.block_id_ = macro_id; callback.offset_ = idx_row.get_block_offset(); callback.set_micro_des_meta(idx_row_header); // fill read info ObMacroBlockReadInfo read_info; read_info.macro_block_id_ = macro_id; read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ); read_info.io_desc_.set_group_id(ObIOModule::MICRO_BLOCK_CACHE_IO); read_info.io_callback_ = &callback; read_info.offset_ = idx_row.get_block_offset(); read_info.size_ = idx_row.get_block_size(); read_info.io_timeout_ms_ = max(THIS_WORKER.get_timeout_remain() / 1000, 0); if (OB_FAIL(ObBlockManager::async_read_block(read_info, macro_handle))) { STORAGE_LOG(WARN, "Fail to async read block, ", K(ret), K(read_info)); } else { EVENT_INC(ObStatEventIds::IO_READ_PREFETCH_MICRO_COUNT); EVENT_ADD(ObStatEventIds::IO_READ_PREFETCH_MICRO_BYTES, idx_row.get_block_size()); } } return ret; } int ObIMicroBlockCache::prefetch( const uint64_t tenant_id, const MacroBlockId ¯o_id, const ObMultiBlockIOParam &io_param, const bool use_cache, ObMacroBlockHandle ¯o_handle, ObIMicroBlockIOCallback &callback) { int ret = OB_SUCCESS; int64_t offset = 0; int64_t size = 0; if (OB_FAIL(io_param.get_block_des_info(callback))) { LOG_WARN("Fail to get meta data for deserializing block data", K(ret), K(io_param)); } else { // fill callback io_param.get_io_range(offset, size); callback.cache_ = this; callback.put_size_stat_ = this; callback.tenant_id_ = tenant_id; callback.block_id_ = macro_id; callback.offset_ = offset; callback.use_block_cache_ = use_cache; // fill read info ObMacroBlockReadInfo read_info; read_info.macro_block_id_ = macro_id; read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ); read_info.io_desc_.set_group_id(ObIOModule::MICRO_BLOCK_CACHE_IO); read_info.io_callback_ = &callback; read_info.offset_ = offset; read_info.size_ = size; read_info.io_timeout_ms_ = max(THIS_WORKER.get_timeout_remain() / 1000, 0); if (OB_FAIL(ObBlockManager::async_read_block(read_info, macro_handle))) { STORAGE_LOG(WARN, "Fail to async read block, ", K(ret), K(read_info)); } else { EVENT_ADD(ObStatEventIds::IO_READ_PREFETCH_MICRO_COUNT, io_param.block_count_); EVENT_ADD(ObStatEventIds::IO_READ_PREFETCH_MICRO_BYTES, size); } } return ret; } int ObIMicroBlockCache::add_put_size(const int64_t put_size) { UNUSED(put_size); return OB_SUCCESS; } /*-----------------------------------ObMicroBlockBufTranformer-----------------------------------*/ ObMicroBlockBufTransformer::ObMicroBlockBufTransformer( const ObMicroBlockDesMeta &block_des_meta, ObMacroBlockReader *reader, ObMicroBlockHeader &header, const char *buf, const int64_t buf_size) : is_inited_(false), is_cs_full_transfrom_(ObStoreFormat::is_row_store_type_with_cs_encoding( static_cast(header.row_store_type_))), block_des_meta_(block_des_meta), reader_(reader), header_(header), payload_buf_(buf + header.header_size_), payload_size_(buf_size - header.header_size_) { } int ObMicroBlockBufTransformer::init() { int ret = OB_SUCCESS; if (is_cs_full_transfrom_) { bool is_compressed = false; #ifdef OB_BUILD_TDE_SECURITY if (share::ObEncryptionUtil::need_encrypt(static_cast(block_des_meta_.encrypt_id_))) { const char *decrypt_buf = NULL; int64_t decrypt_len = 0; if (OB_FAIL(reader_->decrypt_buf( block_des_meta_, payload_buf_, payload_size_, decrypt_buf, decrypt_len))) { LOG_WARN("Fail to decrypt data", K(ret)); } else { payload_buf_ = decrypt_buf; payload_size_ = decrypt_len; is_compressed = (header_.data_length_ != decrypt_len); } } #else is_compressed = header_.is_compressed_data(); #endif if (OB_SUCC(ret)) { if (OB_UNLIKELY(is_compressed)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("cs encoding must has no block-level compression", K(ret), K(header_)); } else if (OB_FAIL(transformer_.init(&header_, payload_buf_, payload_size_))) { LOG_WARN("fail to init cs micro block transformer", K(ret), K_(header)); } } } if (OB_SUCC(ret)) { is_inited_ = true; } return ret; } int ObMicroBlockBufTransformer::get_buf_size(int64_t &buf_size) const { int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else if (!is_cs_full_transfrom_) { buf_size = header_.header_size_ + header_.data_length_; } else if (OB_FAIL(transformer_.calc_full_transform_size(buf_size))) { LOG_WARN("fail to calc transformed size", K(ret), K_(header)); } return ret; } int ObMicroBlockBufTransformer::transfrom(char *block_buf, const int64_t buf_size) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not inited", K(ret)); } else if (OB_UNLIKELY(nullptr == block_buf || buf_size <= 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), KP(block_buf), K(buf_size)); } else if (!is_cs_full_transfrom_) { ObMicroBlockHeader *micro_header = nullptr; int64_t pos = 0; if (OB_FAIL(header_.deep_copy(block_buf, buf_size, pos, micro_header))) { LOG_WARN("Fail to deep copy header", K(ret), K_(header)); } else { #ifdef OB_BUILD_TDE_SECURITY const char *decrypt_buf = NULL; int64_t decrypt_len = 0; if (OB_FAIL(reader_->decrypt_buf( block_des_meta_, payload_buf_, payload_size_, decrypt_buf, decrypt_len))) { LOG_WARN("fail to decrypt data", K(ret), K_(header)); } else { payload_buf_ = decrypt_buf; payload_size_ = decrypt_len; } #endif if (OB_SUCC(ret)) { if (OB_FAIL(reader_->decompress_data_with_prealloc_buf( block_des_meta_.compressor_type_, payload_buf_, payload_size_, block_buf + pos, buf_size - pos))) { LOG_WARN("Fail to decompress data with preallocated buffer", K(ret), K_(header)); } } } } else { // is_cs_full_transfrom_ int64_t pos = 0; if (OB_FAIL(transformer_.full_transform(block_buf, buf_size, pos))) { LOG_WARN("fail to transfrom cs encoding mirco blcok", K(ret)); } else if (OB_UNLIKELY(pos != buf_size)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("pos should equal to buf_size", K(ret), K(pos), K(buf_size)); } } return ret; } /*-------------------------------------ObDataMicroBlockCache--------------------------------------*/ int ObDataMicroBlockCache::init(const char *cache_name, const int64_t priority) { int ret = OB_SUCCESS; const int64_t mem_limit = 4 * 1024 * 1024 * 1024LL; if (OB_SUCCESS != (ret = common::ObKVCache::init( cache_name, priority))) { STORAGE_LOG(WARN, "Fail to init kv cache, ", K(ret)); } else if (OB_FAIL(allocator_.init(mem_limit, OB_MALLOC_MIDDLE_BLOCK_SIZE, OB_MALLOC_MIDDLE_BLOCK_SIZE))) { STORAGE_LOG(WARN, "Fail to init io allocator, ", K(ret)); } else { allocator_.set_attr(SET_USE_500(ObMemAttr(OB_SERVER_TENANT_ID, ObModIds::OB_SSTABLE_MICRO_BLOCK_ALLOCATOR))); } return ret; } void ObDataMicroBlockCache::destroy() { common::ObKVCache::destroy(); allocator_.destroy(); } int ObDataMicroBlockCache::prefetch( const uint64_t tenant_id, const MacroBlockId ¯o_id, const ObMultiBlockIOParam &io_param, const bool use_cache, ObMacroBlockHandle ¯o_handle) { int ret = OB_SUCCESS; ObMultiDataBlockIOCallback *callback = nullptr; ObIAllocator *allocator = nullptr; if (OB_FAIL(get_allocator(allocator))) { LOG_WARN("Fail to get allocator", K(ret)); } else { void *buf = nullptr; if (OB_ISNULL(buf = allocator->alloc(sizeof(ObMultiDataBlockIOCallback)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate callback memory failed", K(ret)); } else { callback = new (buf) ObMultiDataBlockIOCallback; callback->allocator_ = allocator; if (OB_UNLIKELY(!io_param.is_valid() || 0 == tenant_id || OB_INVALID_TENANT_ID == tenant_id)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid input parameters", K(ret), K(tenant_id)); } else if (OB_FAIL(callback->set_io_ctx(io_param))) { LOG_WARN("Set io context failed", K(ret), K(io_param)); } else if (OB_FAIL(ObIMicroBlockCache::prefetch( tenant_id, macro_id, io_param, use_cache, macro_handle, *callback))) { LOG_WARN("Fail to prefetch multi data blocks", K(ret)); } if (OB_FAIL(ret) && OB_NOT_NULL(callback->get_allocator())) { //Avoid double_free with io_handle callback->~ObMultiDataBlockIOCallback(); allocator->free(callback); } } } return ret; } int ObDataMicroBlockCache::load_block( const ObMicroBlockId µ_block_id, const ObMicroBlockDesMeta &des_meta, ObMacroBlockReader *macro_reader, ObMicroBlockData &block_data, ObIAllocator *allocator) { int ret = OB_SUCCESS; ObMacroBlockReadInfo macro_read_info; ObMacroBlockHandle macro_handle; bool is_compressed = false; if (OB_UNLIKELY(!micro_block_id.is_valid() || nullptr == macro_reader || nullptr == allocator)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid argument", K(ret), K(micro_block_id), KP(macro_reader), KP(allocator)); } else { void *buf = nullptr; ObSyncSingleMicroBLockIOCallback *callback = nullptr; if (OB_ISNULL(buf = allocator->alloc(sizeof(ObSyncSingleMicroBLockIOCallback)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate callback memory failed", K(ret)); } else { callback = new (buf) ObSyncSingleMicroBLockIOCallback; callback->allocator_ = allocator; callback->offset_ = micro_block_id.offset_; callback->block_des_meta_ = des_meta; callback->block_data_ = &block_data; callback->macro_reader_ = macro_reader; callback->is_data_block_ = true; macro_read_info.macro_block_id_ = micro_block_id.macro_id_; macro_read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ); macro_read_info.io_desc_.set_group_id(ObIOModule::MICRO_BLOCK_CACHE_IO); macro_read_info.io_callback_ = callback; macro_read_info.offset_ = micro_block_id.offset_; macro_read_info.size_ = micro_block_id.size_; macro_read_info.io_timeout_ms_ = GCONF._data_storage_io_timeout / 1000L; if (OB_FAIL(ObBlockManager::read_block(macro_read_info, macro_handle))) { LOG_WARN("Fail to sync read block", K(ret), K(macro_read_info)); } else { block_data.type_ = ObMicroBlockData::DATA_BLOCK; EVENT_INC(ObStatEventIds::IO_READ_PREFETCH_MICRO_COUNT); EVENT_ADD(ObStatEventIds::IO_READ_PREFETCH_MICRO_BYTES, micro_block_id.size_); } if (OB_FAIL(ret) && OB_NOT_NULL(callback->get_allocator())) { //Avoid double_free with io_handle callback->~ObSyncSingleMicroBLockIOCallback(); allocator->free(callback); } } } return ret; } int ObDataMicroBlockCache::get_cache(BaseBlockCache *&cache) { int ret = OB_SUCCESS; cache = this; return ret; } int ObDataMicroBlockCache::get_allocator(common::ObIAllocator *&allocator) { int ret = OB_SUCCESS; allocator = &allocator_; return ret; } int64_t ObDataMicroBlockCache::calc_value_size(const int64_t data_length, const ObRowStoreType &type, bool &need_decoder) { need_decoder = false; int64_t value_size = sizeof(ObMicroBlockCacheValue) + data_length; if (ObStoreFormat::is_row_store_type_with_encoding(type)) { need_decoder = true; value_size += ObMicroBlockDecoder::MAX_CACHED_DECODER_BUF_SIZE; } return value_size; } int ObDataMicroBlockCache::write_extra_buf(const ObRowStoreType row_store_type, const char *block_buf, const int64_t block_size, char *extra_buf, ObMicroBlockData µ_data) { int ret = OB_SUCCESS; int64_t decoder_size = 0; if (ObStoreFormat::is_row_store_type_with_cs_encoding(row_store_type)) { if (OB_FAIL(ObMicroBlockCSDecoder::get_decoder_cache_size(block_buf, block_size, decoder_size))) { LOG_WARN("Fail to get decoder cache size", K(ret)); } else if (OB_FAIL(ObMicroBlockCSDecoder::cache_decoders(extra_buf, decoder_size, block_buf, block_size))) { LOG_WARN("Fail to set cache decoder", K(ret)); } } else if (OB_FAIL(ObMicroBlockDecoder::get_decoder_cache_size(block_buf, block_size, decoder_size))) { LOG_WARN("Fail to get decoder cache size", K(ret)); } else if (OB_FAIL(ObMicroBlockDecoder::cache_decoders(extra_buf, decoder_size, block_buf, block_size))) { LOG_WARN("Fail to set cache decoder", K(ret)); } if (OB_FAIL(ret)) { } else { micro_data.get_extra_buf() = extra_buf; micro_data.get_extra_size() = decoder_size; } return ret; } int ObDataMicroBlockCache::put_cache_block( const ObMicroBlockDesMeta &des_meta, const char *raw_block_buf, const ObMicroBlockCacheKey &key, ObMacroBlockReader &reader, ObIAllocator &allocator, const ObMicroBlockCacheValue *µ_block, common::ObKVCacheHandle &cache_handle) { UNUSED(allocator); int ret = OB_SUCCESS; ObMicroBlockHeader header; int64_t pos = 0; int64_t payload_size = 0; const char *payload_buf = nullptr; const ObMicroBlockId µ_id = key.get_micro_block_id(); ObIMicroBlockCache::BaseBlockCache *kvcache = nullptr; if (OB_UNLIKELY(!des_meta.is_valid() || !micro_id.is_valid()) || OB_ISNULL(raw_block_buf)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid argument", K(ret), K(key), K(des_meta)); } else if (OB_FAIL(header.deserialize(raw_block_buf, micro_id.size_, pos))) { LOG_ERROR("Fail to deserialize record header", K(ret), K(micro_id)); } else if (OB_FAIL(header.check_and_get_record( raw_block_buf, micro_id.size_, MICRO_BLOCK_HEADER_MAGIC, payload_buf, payload_size))) { LOG_ERROR("Micro block data is corrupted", K(ret), K(key), KP(raw_block_buf), KP(this)); } else if (OB_FAIL(get_cache(kvcache))) { LOG_WARN("Fail to get kvcache", K(ret)); } else { ObKVCacheInstHandle inst_handle; ObKVCachePair *kvpair = nullptr; int64_t block_size = 0; int64_t value_size = 0; int64_t extra_size = 0; bool need_decoder = false; ObMicroBlockBufTransformer buf_transformer(des_meta, &reader, header, payload_buf, payload_size); if (OB_FAIL(buf_transformer.init())) { LOG_WARN("Fail to init buf transformer", K(ret)); } else if (OB_FAIL(buf_transformer.get_buf_size(block_size))) { LOG_WARN("Fail to get block buf size", K(ret)); } else if (FALSE_IT(value_size = calc_value_size(block_size, des_meta.row_store_type_, need_decoder))) { } else if (OB_FAIL(alloc( key.get_tenant_id(), sizeof(ObMicroBlockCacheKey), value_size, kvpair, cache_handle, inst_handle))) { LOG_WARN("Fail to allocate kvpair from kvcache", K(ret), K(value_size), K(key)); } else { char *block_buf = reinterpret_cast(kvpair->value_) + sizeof(ObMicroBlockCacheValue); kvpair->key_ = new (kvpair->key_) ObMicroBlockCacheKey(key); ObMicroBlockCacheValue *cache_value = new (kvpair->value_) ObMicroBlockCacheValue(block_buf, block_size); ObMicroBlockData µ_data = cache_value->get_block_data(); micro_data.type_ = get_type(); if (OB_FAIL(buf_transformer.transfrom(block_buf, block_size))) { LOG_WARN("fail to transfrom", K(ret)); } else if (need_decoder && OB_FAIL(write_extra_buf( des_meta.row_store_type_, block_buf, block_size, block_buf + block_size, micro_data))) { LOG_WARN("Fail to cache decoder on extra buffer for data block", K(ret), K(header), KPC(cache_value)); } else if (FALSE_IT(micro_block = cache_value)) { } else if (OB_FAIL(put_kvpair(inst_handle, kvpair, cache_handle, false /* overwrite */))) { if (OB_ENTRY_EXIST != ret) { LOG_WARN("Fail to put micro block cache", K(ret)); } else { ret = OB_SUCCESS; } } else { const int64_t put_size = ObKVStoreMemBlock::get_align_size(key, *cache_value); if (OB_FAIL(add_put_size(put_size))) { LOG_WARN("add_put_size failed", K(ret), K(put_size)); } } if (OB_FAIL(ret)) { cache_handle.reset(); micro_block = nullptr; } } } return ret; } int ObDataMicroBlockCache::reserve_kvpair( const ObMicroBlockDesc µ_block_desc, ObKVCacheInstHandle &inst_handle, ObKVCacheHandle &cache_handle, ObKVCachePair *&kvpair, int64_t &kvpair_size) { int ret = OB_SUCCESS; kvpair_size = 0; int64_t block_size = 0; int64_t extra_size = 0; int64_t value_size = 0; bool need_decoder = false; const int64_t key_size = sizeof(ObMicroBlockCacheKey); const ObRowStoreType row_store_type = static_cast(micro_block_desc.header_->row_store_type_); ObCSMicroBlockTransformer transformer; if (OB_UNLIKELY(!micro_block_desc.is_valid() || inst_handle.is_valid() || cache_handle.is_valid() || nullptr != kvpair)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid argument", K(ret), K(micro_block_desc), K(inst_handle), K(cache_handle), KP(kvpair)); } else if (ObStoreFormat::is_row_store_type_with_cs_encoding(row_store_type)) { if (OB_FAIL(transformer.init(micro_block_desc.header_, micro_block_desc.buf_, micro_block_desc.buf_size_))) { LOG_WARN("fail to init transformer", K(ret), K(micro_block_desc)); } else if (OB_FAIL(transformer.calc_full_transform_size(block_size))) { LOG_WARN("fail to calc_full_transform_size", K(ret), K(micro_block_desc)); } } else { block_size = micro_block_desc.header_->header_size_ + micro_block_desc.data_size_; } if (OB_SUCC(ret)) { value_size = calc_value_size(block_size, row_store_type, need_decoder); BaseBlockCache *kvcache = nullptr; if (OB_FAIL(get_cache(kvcache))) { LOG_WARN("Fail to get cache", K(ret)); } else if (OB_FAIL(kvcache->alloc(MTL_ID(), key_size, value_size, kvpair, cache_handle, inst_handle))) { LOG_WARN("Fail to alloc cache buf", K(ret), K(key_size), K(value_size)); } else { char *block_buf = reinterpret_cast(kvpair->value_) + sizeof(ObMicroBlockCacheValue); kvpair->key_ = new (kvpair->key_) ObMicroBlockCacheKey(); ObMicroBlockCacheValue *cache_value = new (kvpair->value_) ObMicroBlockCacheValue(block_buf, block_size); int64_t pos = 0; if (ObStoreFormat::is_row_store_type_with_cs_encoding(row_store_type)) { if (OB_FAIL(transformer.full_transform(block_buf, block_size, pos))) { LOG_WARN("fail to full transform", K(ret), KP(block_buf), K(block_size)); } } else { MEMCPY(block_buf, micro_block_desc.header_, micro_block_desc.header_->header_size_); MEMCPY(block_buf + micro_block_desc.header_->header_size_, micro_block_desc.buf_, micro_block_desc.buf_size_); } } } if (OB_FAIL(ret)) { } else if (need_decoder && OB_FAIL(write_extra_buf( static_cast(micro_block_desc.header_->row_store_type_), reinterpret_cast(kvpair->value_) + sizeof(ObMicroBlockCacheValue), block_size, reinterpret_cast(kvpair->value_) + sizeof(ObMicroBlockCacheValue) + block_size, static_cast(kvpair->value_)->get_block_data()))) { LOG_WARN("Fail to write decoder in extra buf", K(ret)); } else { kvpair_size = sizeof(ObMicroBlockCacheKey) + value_size; } return ret; } ObMicroBlockData::Type ObDataMicroBlockCache::get_type() { return ObMicroBlockData::DATA_BLOCK; } /*-------------------------------------ObIndexMicroBlockCache-------------------------------------*/ ObIndexMicroBlockCache::ObIndexMicroBlockCache() : ObDataMicroBlockCache() { } ObIndexMicroBlockCache::~ObIndexMicroBlockCache() { } int ObIndexMicroBlockCache::init(const char *cache_name, const int64_t priority) { return ObDataMicroBlockCache::init(cache_name, priority); } int ObIndexMicroBlockCache::load_block( const ObMicroBlockId µ_block_id, const ObMicroBlockDesMeta &des_meta, ObMacroBlockReader *macro_reader, ObMicroBlockData &block_data, ObIAllocator *allocator) { UNUSED(macro_reader); int ret = OB_SUCCESS; ObMacroBlockReadInfo macro_read_info; ObMacroBlockHandle macro_handle; // TODO: @chengji make deserialize micro block with allocator static and remove tmp inner_macro_reader ObMacroBlockReader inner_macro_reader; bool is_compressed = false; if (OB_UNLIKELY(!micro_block_id.is_valid()) || OB_ISNULL(allocator)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid argument", K(ret), K(micro_block_id), KP(allocator)); } else { void *buf = nullptr; ObSyncSingleMicroBLockIOCallback *callback = nullptr; if (OB_ISNULL(buf = allocator->alloc(sizeof(ObSyncSingleMicroBLockIOCallback)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate callback memory failed", K(ret)); } else { callback = new (buf) ObSyncSingleMicroBLockIOCallback; callback->allocator_ = allocator; callback->offset_ = micro_block_id.offset_; callback->block_des_meta_ = des_meta; callback->block_data_ = &block_data; callback->macro_reader_ = &inner_macro_reader; callback->is_data_block_ = false; macro_read_info.macro_block_id_ = micro_block_id.macro_id_; macro_read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_DATA_READ); macro_read_info.io_desc_.set_group_id(ObIOModule::MICRO_BLOCK_CACHE_IO); macro_read_info.io_callback_ = callback; macro_read_info.offset_ = micro_block_id.offset_; macro_read_info.size_ = micro_block_id.size_; macro_read_info.io_timeout_ms_ = GCONF._data_storage_io_timeout / 1000L; ObIndexBlockDataTransformer idx_transformer; char *transform_buf = nullptr; char *raw_idx_block_buf = nullptr; if (OB_FAIL(ObBlockManager::read_block(macro_read_info, macro_handle))) { LOG_WARN("Fail to sync read block", K(ret), K(macro_read_info)); } else if (FALSE_IT(raw_idx_block_buf = const_cast(block_data.get_buf()))) { } else if (OB_FAIL(idx_transformer.transform(block_data, block_data, *allocator, transform_buf))) { LOG_WARN("Fail to transform index block to memory format", K(ret)); } else { EVENT_INC(ObStatEventIds::IO_READ_PREFETCH_MICRO_COUNT); EVENT_ADD(ObStatEventIds::IO_READ_PREFETCH_MICRO_BYTES, micro_block_id.size_); } if (nullptr != raw_idx_block_buf) { allocator->free(raw_idx_block_buf); } if (OB_FAIL(ret)) { if (nullptr != callback->get_allocator()) { //Avoid double_free with io_handle callback->~ObSyncSingleMicroBLockIOCallback(); allocator->free(callback); } if (nullptr != transform_buf) { allocator->free(transform_buf); block_data.reset(); } } } } return ret; } int ObIndexMicroBlockCache::put_cache_block( const ObMicroBlockDesMeta &des_meta, const char *raw_block_buf, const ObMicroBlockCacheKey &key, ObMacroBlockReader &reader, ObIAllocator &allocator, const ObMicroBlockCacheValue *µ_block, common::ObKVCacheHandle &cache_handle) { int ret = OB_SUCCESS; ObMicroBlockHeader header; int64_t pos = 0; int64_t payload_size = 0; const char *payload_buf = nullptr; const ObMicroBlockId µ_id = key.get_micro_block_id(); ObIMicroBlockCache::BaseBlockCache *kvcache = nullptr; if (OB_UNLIKELY(!des_meta.is_valid() || !micro_id.is_valid()) || OB_ISNULL(raw_block_buf)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid argument", K(ret), K(key), K(des_meta)); } else if (OB_FAIL(header.deserialize(raw_block_buf, micro_id.size_, pos))) { LOG_ERROR("Fail to deserialize record header", K(ret), K(micro_id)); } else if (OB_FAIL(header.check_and_get_record( raw_block_buf, micro_id.size_, MICRO_BLOCK_HEADER_MAGIC, payload_buf, payload_size))) { LOG_ERROR("Micro block data is corrupted", K(ret), K(key), KP(raw_block_buf), KP(this)); } else if (OB_FAIL(get_cache(kvcache))) { LOG_WARN("Fail to get kvcache", K(ret)); } else { int64_t block_size = 0; int64_t value_size = 0; char *block_buf = nullptr; ObMicroBlockBufTransformer buf_transformer(des_meta, &reader, header, payload_buf, payload_size); ObIndexBlockDataTransformer idx_transformer; if (OB_FAIL(buf_transformer.init())) { LOG_WARN("Fail to init block buf transformer", K(ret)); } else if (OB_FAIL(buf_transformer.get_buf_size(block_size))) { LOG_WARN("Fail to get transformed buf size", K(ret)); } else if (OB_ISNULL(block_buf = static_cast(allocator.alloc(block_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("Fail to allocate memory", K(ret), K(block_size)); } else if (OB_FAIL(buf_transformer.transfrom(block_buf, block_size))) { LOG_WARN("fail to transfrom block buf", K(ret)); } else { ObMicroBlockCacheValue cache_value(block_buf, block_size); ObMicroBlockData &block_data = cache_value.get_block_data(); block_data.type_ = get_type(); char *allocated_buf = nullptr; if (OB_FAIL(idx_transformer.transform(block_data, block_data, allocator, allocated_buf))) { LOG_WARN("Fail to transform index block to memory format", K(ret)); } else if (OB_FAIL(put_and_fetch(key, cache_value, micro_block, cache_handle, false /* overwrite */))) { if (OB_ENTRY_EXIST != ret) { LOG_WARN("Fail to put micro block cache", K(ret)); } else { ret = OB_SUCCESS; } } else { const int64_t put_size = ObKVStoreMemBlock::get_align_size(key, cache_value); if (OB_FAIL(add_put_size(put_size))) { LOG_WARN("add_put_size failed", K(ret), K(put_size)); } } if (nullptr != allocated_buf) { allocator.free(allocated_buf); } } if (nullptr != block_buf) { allocator.free(block_buf); } if (OB_FAIL(ret)) { cache_handle.reset(); micro_block = nullptr; } } return ret; } ObMicroBlockData::Type ObIndexMicroBlockCache::get_type() { return ObMicroBlockData::INDEX_BLOCK; } }//end namespace blocksstable }//end namespace oceanbase