diff --git a/src/storage/blocksstable/ob_tmp_file_store.cpp b/src/storage/blocksstable/ob_tmp_file_store.cpp index cc5db3bbb7..42a9c7052e 100644 --- a/src/storage/blocksstable/ob_tmp_file_store.cpp +++ b/src/storage/blocksstable/ob_tmp_file_store.cpp @@ -823,6 +823,19 @@ int ObTmpTenantMacroBlockManager::free_macro_block(const int64_t block_id) return ret; } +int ObTmpTenantMacroBlockManager::get_disk_macro_block_count(int64_t &count) const +{ + int ret = OB_SUCCESS; + + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + STORAGE_LOG(WARN, "ObTmpMacroBlockManager has not been inited", K(ret)); + } else { + count = blocks_.size(); + } + + return ret; +} int ObTmpTenantMacroBlockManager::get_disk_macro_block_list( common::ObIArray ¯o_id_list) @@ -1511,6 +1524,19 @@ int ObTmpTenantFileStore::wait_write_finish(const int64_t block_id, const int64_ return ret; } +int ObTmpTenantFileStore::get_disk_macro_block_count(int64_t &count) const +{ + int ret = OB_SUCCESS; + SpinRLockGuard guard(lock_); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + STORAGE_LOG(WARN, "ObTmpTenantFileStore has not been inited", K(ret)); + } else if (OB_FAIL(tmp_block_manager_.get_disk_macro_block_count(count))) { + STORAGE_LOG(WARN, "fail to get disk macro block count from tmp_block_manager_", K(ret)); + } + return ret; +} + int ObTmpTenantFileStore::get_disk_macro_block_list(common::ObIArray ¯o_id_list) { int ret = OB_SUCCESS; @@ -1843,22 +1869,20 @@ int ObTmpFileStore::get_macro_block_list(ObIArray &tmp_bl STORAGE_LOG(WARN, "ObTmpFileStore has not been inited", K(ret)); } else { tmp_block_cnt_pairs.reset(); - common::ObSEArray macro_id_list; - macro_id_list.set_attr(ObMemAttr(MTL_ID(), "TMP_MB_LIST")); TenantFileStoreMap::iterator iter; ObTmpTenantFileStore *tmp = NULL; for (iter = tenant_file_stores_.begin(); OB_SUCC(ret) && iter != tenant_file_stores_.end(); ++iter) { + int64_t macro_id_count = 0; TenantTmpBlockCntPair pair; - macro_id_list.reset(); if (OB_ISNULL(tmp = iter->second.get_tenant_store())) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "fail to iterate tmp tenant file store", K(ret)); - } else if (OB_FAIL(tmp->get_disk_macro_block_list(macro_id_list))){ + } else if (OB_FAIL(tmp->get_disk_macro_block_count(macro_id_count))){ STORAGE_LOG(WARN, "fail to get list of tenant macro block in disk", K(ret)); - } else if (OB_FAIL(pair.init(iter->first, macro_id_list.count()))) { + } else if (OB_FAIL(pair.init(iter->first, macro_id_count))) { STORAGE_LOG(WARN, "fail to init tenant tmp block count pair", K(ret), "tenant id", - iter->first, "macro block count", macro_id_list.count()); + iter->first, "macro block count", macro_id_count); } else if (OB_FAIL(tmp_block_cnt_pairs.push_back(pair))) { STORAGE_LOG(WARN, "fail to push back tmp_block_cnt_pairs", K(ret), K(pair)); } diff --git a/src/storage/blocksstable/ob_tmp_file_store.h b/src/storage/blocksstable/ob_tmp_file_store.h index 773403e586..c0455158eb 100644 --- a/src/storage/blocksstable/ob_tmp_file_store.h +++ b/src/storage/blocksstable/ob_tmp_file_store.h @@ -223,6 +223,7 @@ public: int alloc_macro_block(const int64_t dir_id, const uint64_t tenant_id, ObTmpMacroBlock *&t_mblk); int free_macro_block(const int64_t block_id); int get_macro_block(const int64_t block_id, ObTmpMacroBlock *&t_mblk); + int get_disk_macro_block_count(int64_t &count) const; int get_disk_macro_block_list(common::ObIArray ¯o_id_list); void print_block_usage(); @@ -253,6 +254,7 @@ public: void refresh_memory_limit(const uint64_t tenant_id); int sync_block(const int64_t block_id, ObTmpTenantMemBlockManager::ObIOWaitInfoHandle &handle); int wait_write_finish(const int64_t block_id, const int64_t timeout_ms); + int get_disk_macro_block_count(int64_t &count) const; int get_disk_macro_block_list(common::ObIArray ¯o_id_list); int get_macro_block(const int64_t block_id, ObTmpMacroBlock *&t_mblk); // use io_allocator_ to allocate tenant extent memory. diff --git a/src/storage/direct_load/ob_direct_load_data_block_decoder.h b/src/storage/direct_load/ob_direct_load_data_block_decoder.h index a9cee37e9f..432e014cac 100644 --- a/src/storage/direct_load/ob_direct_load_data_block_decoder.h +++ b/src/storage/direct_load/ob_direct_load_data_block_decoder.h @@ -43,11 +43,13 @@ public: OB_INLINE int64_t get_end_pos() const { return buf_size_; } TO_STRING_KV(K_(header), K_(compressor_type), KP_(compressor), KP_(buf), K_(buf_size), K_(pos), KP_(decompress_buf), KP_(decompress_buf_size)); +protected: + int realloc_decompress_buf(const int64_t size); protected: Header header_; common::ObCompressorType compressor_type_; common::ObCompressor *compressor_; - common::ObArenaAllocator allocator_; + int64_t data_block_size_; char *buf_; int64_t buf_size_; int64_t pos_; @@ -61,7 +63,7 @@ template ObDirectLoadDataBlockDecoder
::ObDirectLoadDataBlockDecoder() : compressor_type_(ObCompressorType::INVALID_COMPRESSOR), compressor_(nullptr), - allocator_("TLD_DBDecoder"), + data_block_size_(0), buf_(nullptr), buf_size_(0), pos_(0), @@ -92,12 +94,15 @@ void ObDirectLoadDataBlockDecoder
::reset() header_.reset(); compressor_type_ = common::ObCompressorType::INVALID_COMPRESSOR; compressor_ = nullptr; + data_block_size_ = 0; buf_ = nullptr; buf_size_ = 0; pos_ = 0; - decompress_buf_ = nullptr; + if (decompress_buf_ != nullptr) { + ob_free(decompress_buf_); + decompress_buf_ = nullptr; + } decompress_buf_size_ = 0; - allocator_.reset(); is_inited_ = false; } @@ -114,28 +119,41 @@ int ObDirectLoadDataBlockDecoder
::init(int64_t data_block_size, ret = common::OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid args", KR(ret), K(data_block_size), K(compressor_type)); } else { - allocator_.set_tenant_id(MTL_ID()); if (common::ObCompressorType::NONE_COMPRESSOR != compressor_type) { - char *buf = nullptr; - if (OB_ISNULL(buf = static_cast(allocator_.alloc(data_block_size)))) { - ret = common::OB_ALLOCATE_MEMORY_FAILED; - STORAGE_LOG(WARN, "fail to alloc buf", KR(ret), K(data_block_size)); - } else if (OB_FAIL(common::ObCompressorPool::get_instance().get_compressor(compressor_type, + if (OB_FAIL(common::ObCompressorPool::get_instance().get_compressor(compressor_type, compressor_))) { STORAGE_LOG(WARN, "fail to get compressor, ", KR(ret), K(compressor_type)); - } else { - decompress_buf_ = buf; - decompress_buf_size_ = data_block_size; } } if (OB_SUCC(ret)) { compressor_type_ = compressor_type; + data_block_size_ = data_block_size; is_inited_ = true; } } return ret; } +template +int ObDirectLoadDataBlockDecoder
::realloc_decompress_buf(const int64_t size) +{ + int ret = OB_SUCCESS; + if (decompress_buf_size_ != size) { + if (decompress_buf_ != nullptr) { + ob_free(decompress_buf_); + decompress_buf_ = nullptr; + } + decompress_buf_ = (char *)ob_malloc(size, ObMemAttr(MTL_ID(), "TLD_DBDecoder")); + if (decompress_buf_ == nullptr) { + ret = OB_ALLOCATE_MEMORY_FAILED; + STORAGE_LOG(WARN, "fail to alloc mem", KR(ret), K(size)); + } else { + decompress_buf_size_ = size; + } + } + return ret; +} + template int ObDirectLoadDataBlockDecoder
::prepare_data_block(char *buf, int64_t buf_size, int64_t &data_size) @@ -175,7 +193,18 @@ int ObDirectLoadDataBlockDecoder
::prepare_data_block(char *buf, int64_t // do decompress if (OB_SUCC(ret) && header_.occupy_size_ != header_.data_size_) { int64_t decompress_size = 0; - if (OB_UNLIKELY(common::ObCompressorType::NONE_COMPRESSOR == compressor_type_)) { + if (header_.data_size_ > data_block_size_) { + if (OB_FAIL(realloc_decompress_buf(header_.data_size_))) { + STORAGE_LOG(WARN, "fail to realloc_decompress_buf", KR(ret)); + } + } else { + if (OB_FAIL(realloc_decompress_buf(data_block_size_))) { + STORAGE_LOG(WARN, "fail to realloc_decompress_buf", KR(ret)); + } + } + if (OB_FAIL(ret)) { + //pass + } else if (OB_UNLIKELY(common::ObCompressorType::NONE_COMPRESSOR == compressor_type_)) { ret = common::OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "unexpected compressor type", KR(ret)); } else if (OB_FAIL(compressor_->decompress(buf + pos_, header_.occupy_size_ - pos_, diff --git a/src/storage/direct_load/ob_direct_load_data_block_encoder.h b/src/storage/direct_load/ob_direct_load_data_block_encoder.h index d834a9d1c7..752c8dea9b 100644 --- a/src/storage/direct_load/ob_direct_load_data_block_encoder.h +++ b/src/storage/direct_load/ob_direct_load_data_block_encoder.h @@ -40,16 +40,16 @@ public: int64_t get_pos() const { return pos_; } Header &get_header() { return header_; } int build_data_block(char *&buf, int64_t &buf_size); - template - int build_data_block(const T &item, char *buf, int64_t buf_size, int64_t &data_size); TO_STRING_KV(K_(header), K_(header_size), K_(compressor_type), KP_(compressor), KP_(buf), K_(buf_size), K_(pos), KP_(compress_buf), K_(compress_buf_size)); +protected: + int realloc_bufs(const int64_t size); protected: Header header_; int64_t header_size_; common::ObCompressorType compressor_type_; common::ObCompressor *compressor_; - common::ObArenaAllocator allocator_; + int64_t data_block_size_; char *buf_; int64_t buf_size_; // buf capacity int64_t pos_; @@ -64,7 +64,7 @@ ObDirectLoadDataBlockEncoder
::ObDirectLoadDataBlockEncoder() : header_size_(0), compressor_type_(common::ObCompressorType::INVALID_COMPRESSOR), compressor_(nullptr), - allocator_("TLD_DBEncoder"), + data_block_size_(0), buf_(nullptr), buf_size_(0), pos_(0), @@ -94,15 +94,79 @@ void ObDirectLoadDataBlockEncoder
::reset() header_size_ = 0; compressor_type_ = common::ObCompressorType::INVALID_COMPRESSOR; compressor_ = nullptr; - buf_ = nullptr; + data_block_size_ = 0; + if (buf_ != nullptr) { + ob_free(buf_); + buf_ = nullptr; + } buf_size_ = 0; pos_ = 0; - compress_buf_ = nullptr; + if (compress_buf_ != nullptr) { + ob_free(compress_buf_); + compress_buf_ = nullptr; + } compress_buf_size_ = 0; - allocator_.reset(); is_inited_ = false; } + +template +int ObDirectLoadDataBlockEncoder
::realloc_bufs(const int64_t size) +{ + int ret = OB_SUCCESS; + int64_t align_size = ALIGN_UP(size, DIO_ALIGN_SIZE); + if (buf_size_ != align_size) { + char *tmp_buf = (char *)ob_malloc(align_size, ObMemAttr(MTL_ID(), "TLD_DBEncoder")); + if (tmp_buf == nullptr) { + ret = OB_ALLOCATE_MEMORY_FAILED; + STORAGE_LOG(WARN, "fail to alloc buf", K(align_size), KR(ret)); + } + if (OB_SUCC(ret) && buf_ != nullptr && pos_ > 0) { + if (pos_ > align_size) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "pos is bigger than buf align_size", K(pos_), K(align_size), KR(ret)); + } else { + MEMCPY(tmp_buf, buf_, pos_); + } + } + if (OB_SUCC(ret)) { + if (buf_ != nullptr) { + ob_free(buf_); + } + buf_ = tmp_buf; + buf_size_ = align_size; + } + } + + + if (compressor_ != nullptr) { + int64_t max_overflow_size = 0; + int64_t compress_buf_size = 0; + if (OB_SUCC(ret)) { + if (OB_FAIL(compressor_->get_max_overflow_size(size, max_overflow_size))) { + STORAGE_LOG(WARN, "fail to get max_overflow_size", KR(ret), K(size), K(max_overflow_size)); + } else { + compress_buf_size = ALIGN_UP(size + max_overflow_size, DIO_ALIGN_SIZE); + } + } + + if (OB_SUCC(ret) && compress_buf_size_ != compress_buf_size) { + if (compress_buf_ != nullptr) { + ob_free(compress_buf_); + compress_buf_ = nullptr; + } + compress_buf_ = (char *)ob_malloc(compress_buf_size, ObMemAttr(MTL_ID(), "TLD_DBEncoder")); + if (compress_buf_ == nullptr) { + ret = OB_ALLOCATE_MEMORY_FAILED; + STORAGE_LOG(WARN, "fail to alloc compress buf", K(compress_buf_size), KR(ret)); + } else { + compress_buf_size_ = compress_buf_size; + } + } + } + return ret; +} + template int ObDirectLoadDataBlockEncoder
::init(int64_t data_block_size, common::ObCompressorType compressor_type) @@ -116,28 +180,14 @@ int ObDirectLoadDataBlockEncoder
::init(int64_t data_block_size, ret = common::OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid args", KR(ret), K(data_block_size), K(compressor_type)); } else { - allocator_.set_tenant_id(MTL_ID()); - const int64_t alloc_buf_size = - data_block_size + - (compressor_type != common::ObCompressorType::NONE_COMPRESSOR ? data_block_size : 0); - char *buf = nullptr; - if (OB_ISNULL(buf = static_cast(allocator_.alloc(alloc_buf_size)))) { - ret = common::OB_ALLOCATE_MEMORY_FAILED; - STORAGE_LOG(WARN, "fail to alloc buf", KR(ret), K(alloc_buf_size)); - } else if (common::ObCompressorType::NONE_COMPRESSOR != compressor_type && + if (common::ObCompressorType::NONE_COMPRESSOR != compressor_type && OB_FAIL(common::ObCompressorPool::get_instance().get_compressor(compressor_type, compressor_))) { STORAGE_LOG(WARN, "fail to get compressor, ", KR(ret), K(compressor_type)); } else { header_size_ = header_.get_serialize_size(); compressor_type_ = compressor_type; - buf_ = buf; - buf_size_ = data_block_size; - pos_ = header_size_; - if (ObCompressorType::NONE_COMPRESSOR != compressor_type_) { - compress_buf_ = buf + data_block_size; - compress_buf_size_ = data_block_size; - } + data_block_size_ = data_block_size; is_inited_ = true; } } @@ -150,12 +200,28 @@ int ObDirectLoadDataBlockEncoder
::write_item(const T &item) { int ret = common::OB_SUCCESS; const int64_t item_size = item.get_serialize_size(); - if (item_size > buf_size_ - header_size_) { - ret = common::OB_SIZE_OVERFLOW; - } else if (item.get_serialize_size() + pos_ > buf_size_) { - ret = common::OB_BUF_NOT_ENOUGH; - } else if (OB_FAIL(item.serialize(buf_, buf_size_, pos_))) { - STORAGE_LOG(WARN, "fail to serialize item", KR(ret)); + + // 没有分配内存,和内存太大,都需要重新分配内存 + if (item_size + pos_ < data_block_size_ || buf_size_ < data_block_size_) { + if (OB_FAIL(realloc_bufs(data_block_size_))) { + STORAGE_LOG(WARN, "fail to realloc bufs", KR(ret)); + } + } + + if (OB_SUCC(ret)) { + if (item_size > data_block_size_ - header_size_ && item_size > buf_size_ - header_size_) { + if (OB_FAIL(realloc_bufs(item_size + header_size_))) { + STORAGE_LOG(WARN, "fail to realloc bufs", KR(ret)); + } + } + } + + if (OB_SUCC(ret)) { + if (item_size + pos_ > buf_size_) { + ret = common::OB_BUF_NOT_ENOUGH; + } else if (OB_FAIL(item.serialize(buf_, buf_size_, pos_))) { + STORAGE_LOG(WARN, "fail to serialize item", KR(ret)); + } } return ret; } @@ -204,40 +270,5 @@ int ObDirectLoadDataBlockEncoder
::build_data_block(char *&buf, int64_t & return ret; } -template -template -int ObDirectLoadDataBlockEncoder
::build_data_block(const T &item, char *buf, - int64_t buf_size, int64_t &data_size) -{ - int ret = common::OB_SUCCESS; - if (IS_NOT_INIT) { - ret = common::OB_NOT_INIT; - STORAGE_LOG(WARN, "ObDirectLoadDataBlockEncoder not init", KR(ret), KP(this)); - } else if (OB_UNLIKELY(nullptr == buf || buf_size <= 0)) { - ret = common::OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid args", KR(ret), KP(buf), K(buf_size)); - } else if (OB_UNLIKELY(item.get_serialize_size() + header_size_ > buf_size)) { - ret = common::OB_BUF_NOT_ENOUGH; - STORAGE_LOG(WARN, "buf not enough", KR(ret), K(buf_size), K(item.get_serialize_size())); - } else { - // serialize item - if (OB_FAIL(item.serialize(buf, buf_size, pos_))) { - STORAGE_LOG(WARN, "fail to serialize item", KR(ret)); - } - // serialize header - else { - int64_t pos = 0; - data_size = pos_; - header_.data_size_ = data_size; - header_.occupy_size_ = data_size; - header_.checksum_ = ob_crc64_sse42(0, buf + header_size_, data_size - header_size_); - if (OB_FAIL(header_.serialize(buf, buf_size, pos))) { - STORAGE_LOG(WARN, "fail to serialize header", KR(ret)); - } - } - } - return ret; -} - } // namespace storage } // namespace oceanbase diff --git a/src/storage/direct_load/ob_direct_load_data_block_reader.h b/src/storage/direct_load/ob_direct_load_data_block_reader.h index 6701192a7b..086fb0db0f 100644 --- a/src/storage/direct_load/ob_direct_load_data_block_reader.h +++ b/src/storage/direct_load/ob_direct_load_data_block_reader.h @@ -30,7 +30,7 @@ public: virtual ~ObDirectLoadDataBlockReader(); void reuse(); void reset(); - int init(int64_t data_block_size, int64_t buf_size, common::ObCompressorType compressor_type); + int init(int64_t data_block_size, common::ObCompressorType compressor_type); int open(const ObDirectLoadTmpFileHandle &file_handle, int64_t offset, int64_t size); int get_next_item(const T *&item) override; OB_INLINE int64_t get_block_count() const { return block_count_; } @@ -39,8 +39,8 @@ protected: private: int read_next_buffer(); int switch_next_block(); + int realloc_buf(int64_t size); protected: - common::ObArenaAllocator allocator_; int64_t data_block_size_; char *buf_; int64_t buf_capacity_; @@ -60,8 +60,7 @@ protected: template ObDirectLoadDataBlockReader::ObDirectLoadDataBlockReader() - : allocator_("TLD_DBReader"), - data_block_size_(0), + : data_block_size_(0), buf_(nullptr), buf_capacity_(0), buf_size_(0), @@ -99,12 +98,14 @@ template void ObDirectLoadDataBlockReader::reset() { data_block_size_ = 0; - buf_ = nullptr; + if (buf_ != nullptr) { + ob_free(buf_); + buf_ = nullptr; + } buf_capacity_ = 0; buf_size_ = 0; buf_pos_ = 0; io_timeout_ms_ = 0; - allocator_.reset(); data_block_reader_.reset(); file_io_handle_.reset(); curr_item_.reset(); @@ -116,7 +117,7 @@ void ObDirectLoadDataBlockReader::reset() } template -int ObDirectLoadDataBlockReader::init(int64_t data_block_size, int64_t buf_size, +int ObDirectLoadDataBlockReader::init(int64_t data_block_size, common::ObCompressorType compressor_type) { int ret = common::OB_SUCCESS; @@ -124,21 +125,14 @@ int ObDirectLoadDataBlockReader::init(int64_t data_block_size, int64_ ret = common::OB_INIT_TWICE; STORAGE_LOG(WARN, "ObDirectLoadDataBlockReader init twice", KR(ret), KP(this)); } else if (OB_UNLIKELY(data_block_size <= 0 || data_block_size % DIO_ALIGN_SIZE != 0 || - buf_size <= 0 || buf_size % DIO_ALIGN_SIZE != 0 || - data_block_size > buf_size || compressor_type <= common::ObCompressorType::INVALID_COMPRESSOR)) { ret = common::OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid args", KR(ret), K(data_block_size), K(buf_size), K(compressor_type)); + STORAGE_LOG(WARN, "invalid args", KR(ret), K(data_block_size), K(compressor_type)); } else { - allocator_.set_tenant_id(MTL_ID()); - if (OB_ISNULL(buf_ = static_cast(allocator_.alloc(buf_size)))) { - ret = common::OB_ALLOCATE_MEMORY_FAILED; - STORAGE_LOG(WARN, "fail to allocate memory", KR(ret), K(buf_size)); - } else if (OB_FAIL(data_block_reader_.init(buf_size, compressor_type))) { + if (OB_FAIL(data_block_reader_.init(data_block_size, compressor_type))) { STORAGE_LOG(WARN, "fail to init data block reader", KR(ret)); } else { data_block_size_ = data_block_size; - buf_capacity_ = buf_size; io_timeout_ms_ = std::max(GCONF._data_storage_io_timeout / 1000, DEFAULT_IO_WAIT_TIME_MS); is_inited_ = true; } @@ -167,7 +161,7 @@ int ObDirectLoadDataBlockReader::open(const ObDirectLoadTmpFileHandle if (OB_FAIL(file_io_handle_.open(file_handle))) { STORAGE_LOG(WARN, "fail to open file handle", KR(ret)); } else if (OB_FAIL(switch_next_block())) { - STORAGE_LOG(WARN, "fail to switch next block", KR(ret)); + STORAGE_LOG(WARN, "fail to switch next block", KR(ret), K(offset), K(size)); } else { is_opened_ = true; } @@ -202,31 +196,70 @@ int ObDirectLoadDataBlockReader::read_next_buffer() return ret; } +template +int ObDirectLoadDataBlockReader::realloc_buf(int64_t size) +{ + int ret = OB_SUCCESS; + int64_t align_size = ALIGN_UP(size, DIO_ALIGN_SIZE); + if (buf_capacity_ != align_size && buf_size_ - buf_pos_ <= align_size) { + char *tmp_buf = (char *)ob_malloc(align_size, ObMemAttr(MTL_ID(), "TLD_DBReader")); + if (tmp_buf == nullptr) { + ret = OB_ALLOCATE_MEMORY_FAILED; + STORAGE_LOG(WARN, "fail to alloc mem", K(align_size), KR(ret)); + } else { + if (buf_ != nullptr) { + MEMCPY(tmp_buf, buf_ + buf_pos_, buf_size_ - buf_pos_); + ob_free(buf_); + buf_ = nullptr; + } + } + if (OB_SUCC(ret)) { + buf_ = tmp_buf; + buf_size_ = buf_size_ - buf_pos_; + buf_pos_ = 0; + buf_capacity_ = align_size; + } + } + return ret; +} + template int ObDirectLoadDataBlockReader::switch_next_block() { int ret = common::OB_SUCCESS; int64_t data_size = 0; - if (buf_size_ - buf_pos_ < data_block_size_ && OB_FAIL(read_next_buffer())) { + if (OB_FAIL(realloc_buf(data_block_size_))) { + STORAGE_LOG(WARN, "fail to realloc buf", K(data_block_size_), KR(ret)); + } else if (buf_size_ - buf_pos_ < DIO_ALIGN_SIZE && OB_FAIL(read_next_buffer())) { if (OB_UNLIKELY(common::OB_ITER_END != ret)) { STORAGE_LOG(WARN, "fail to read next buffer", KR(ret)); } } else if (OB_FAIL(data_block_reader_.prepare_data_block(buf_ + buf_pos_, buf_size_ - buf_pos_, data_size))) { + if (OB_UNLIKELY(common::OB_BUF_NOT_ENOUGH != ret)) { STORAGE_LOG(WARN, "fail to prepare data block", KR(ret), K(buf_pos_), K(buf_size_)); } else { - if (OB_FAIL(read_next_buffer())) { + ret = OB_SUCCESS; + if (data_size > buf_capacity_) { + if (OB_FAIL(realloc_buf(data_size))) { + STORAGE_LOG(WARN, "fail to alloc buf", KR(ret)); + } + } + + if (OB_FAIL(ret)) { + //pass + } else if (OB_FAIL(read_next_buffer())) { STORAGE_LOG(WARN, "fail to read next buffer", KR(ret)); } else if (OB_FAIL(data_block_reader_.prepare_data_block(buf_ + buf_pos_, buf_size_ - buf_pos_, data_size))) { - STORAGE_LOG(WARN, "fail to prepare data block", KR(ret), K(buf_pos_), K(buf_size_)); + STORAGE_LOG(WARN, "fail to prepare data block", KR(ret), K(buf_pos_), K(buf_size_), K(data_size)); } } } if (OB_SUCC(ret)) { const int64_t data_block_size = ALIGN_UP(data_size, DIO_ALIGN_SIZE); - buf_pos_ += MAX(data_block_size_, data_block_size); + buf_pos_ += data_block_size; ++block_count_; if (OB_FAIL(prepare_read_block())) { STORAGE_LOG(WARN, "fail to prepare read block", KR(ret)); diff --git a/src/storage/direct_load/ob_direct_load_data_block_writer.h b/src/storage/direct_load/ob_direct_load_data_block_writer.h index 3b13377be8..61a128a6fc 100644 --- a/src/storage/direct_load/ob_direct_load_data_block_writer.h +++ b/src/storage/direct_load/ob_direct_load_data_block_writer.h @@ -48,7 +48,6 @@ protected: virtual int pre_write_item() { return common::OB_SUCCESS; } virtual int pre_flush_buffer() { return common::OB_SUCCESS; } int flush_buffer(); - int flush_extra_buffer(const T &item); protected: int64_t data_block_size_; char *extra_buf_; @@ -188,14 +187,6 @@ int ObDirectLoadDataBlockWriter::write_item(const T &item) } else if (OB_FAIL(data_block_writer_.write_item(item))) { STORAGE_LOG(WARN, "fail to write item", KR(ret)); } - } else if (common::OB_SIZE_OVERFLOW == ret && nullptr != extra_buf_) { - if (data_block_writer_.has_item() && OB_FAIL(flush_buffer())) { - STORAGE_LOG(WARN, "fail to flush buffer", KR(ret)); - } else if (OB_FAIL(pre_write_item())) { - STORAGE_LOG(WARN, "fail to pre write item", KR(ret)); - } else if (OB_FAIL(flush_extra_buffer(item))) { - STORAGE_LOG(WARN, "fail to flush extra buffer", KR(ret)); - } } else { STORAGE_LOG(WARN, "fail to write item", KR(ret)); } @@ -214,48 +205,20 @@ int ObDirectLoadDataBlockWriter::flush_buffer() } else { char *buf = nullptr; int64_t buf_size = 0; + int64_t align_buf_size = 0; if (OB_FAIL(data_block_writer_.build_data_block(buf, buf_size))) { STORAGE_LOG(WARN, "fail to build data block", KR(ret)); - } else if (OB_FAIL(file_io_handle_.write(buf, data_block_size_))) { + } else if (FALSE_IT(align_buf_size = ALIGN_UP(buf_size, DIO_ALIGN_SIZE))) { + } else if (OB_FAIL(file_io_handle_.aio_write(buf, align_buf_size))) { STORAGE_LOG(WARN, "fail to do aio write tmp file", KR(ret)); - } else if (nullptr != callback_ && OB_FAIL(callback_->write(buf, data_block_size_, offset_))) { + } else if (nullptr != callback_ && OB_FAIL(callback_->write(buf, align_buf_size, offset_))) { STORAGE_LOG(WARN, "fail to callback write", KR(ret)); } else { - OB_TABLE_LOAD_STATISTICS_INC(external_write_bytes, data_block_size_); + OB_TABLE_LOAD_STATISTICS_INC(external_write_bytes, align_buf_size); data_block_writer_.reuse(); - offset_ += data_block_size_; + offset_ += align_buf_size; ++block_count_; - max_block_size_ = MAX(max_block_size_, data_block_size_); - } - } - return ret; -} - -template -int ObDirectLoadDataBlockWriter::flush_extra_buffer(const T &item) -{ - OB_TABLE_LOAD_STATISTICS_TIME_COST(INFO, external_flush_buffer_time_us); - int ret = common::OB_SUCCESS; - if (OB_FAIL(pre_flush_buffer())) { - STORAGE_LOG(WARN, "fail to pre flush buffer", KR(ret)); - } else { - int64_t data_size = 0; - int64_t data_block_size = 0; - if (OB_FAIL( - data_block_writer_.build_data_block(item, extra_buf_, extra_buf_size_, data_size))) { - STORAGE_LOG(WARN, "fail to build data block", KR(ret)); - } else if (FALSE_IT(data_block_size = ALIGN_UP(data_size, DIO_ALIGN_SIZE))) { - } else if (OB_FAIL(file_io_handle_.write(extra_buf_, data_block_size))) { - STORAGE_LOG(WARN, "fail to do aio write tmp file", KR(ret)); - } else if (nullptr != callback_ && - OB_FAIL(callback_->write(extra_buf_, data_block_size, offset_))) { - STORAGE_LOG(WARN, "fail to callback write", KR(ret)); - } else { - OB_TABLE_LOAD_STATISTICS_INC(external_write_bytes, data_block_size); - data_block_writer_.reuse(); - offset_ += data_block_size; - ++block_count_; - max_block_size_ = MAX(max_block_size_, data_block_size); + max_block_size_ = MAX(max_block_size_, align_buf_size); } } return ret; diff --git a/src/storage/direct_load/ob_direct_load_external_scanner.h b/src/storage/direct_load/ob_direct_load_external_scanner.h index 2535f16d77..01c229eb42 100644 --- a/src/storage/direct_load/ob_direct_load_external_scanner.h +++ b/src/storage/direct_load/ob_direct_load_external_scanner.h @@ -33,7 +33,7 @@ public: virtual ~ObDirectLoadExternalSequentialScanner(); void reuse(); void reset(); - int init(int64_t data_block_size, int64_t buf_size, common::ObCompressorType compressor_type, + int init(int64_t data_block_size, common::ObCompressorType compressor_type, const ObDirectLoadExternalFragmentArray &fragments); int get_next_item(const T *&item) override; private: @@ -74,7 +74,7 @@ void ObDirectLoadExternalSequentialScanner::reset() template int ObDirectLoadExternalSequentialScanner::init( - int64_t data_block_size, int64_t buf_size, common::ObCompressorType compressor_type, + int64_t data_block_size, common::ObCompressorType compressor_type, const ObDirectLoadExternalFragmentArray &fragments) { int ret = common::OB_SUCCESS; @@ -82,16 +82,14 @@ int ObDirectLoadExternalSequentialScanner::init( ret = common::OB_INIT_TWICE; STORAGE_LOG(WARN, "ObDirectLoadExternalSequentialScanner init twice", KR(ret), KP(this)); } else if (OB_UNLIKELY(data_block_size <= 0 || data_block_size % DIO_ALIGN_SIZE != 0 || - buf_size <= 0 || buf_size % DIO_ALIGN_SIZE != 0 || - data_block_size > buf_size || compressor_type <= common::ObCompressorType::INVALID_COMPRESSOR || fragments.empty())) { ret = common::OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", KR(ret), K(buf_size), K(compressor_type), K(fragments)); + STORAGE_LOG(WARN, "invalid argument", KR(ret), K(compressor_type), K(fragments)); } else { if (OB_FAIL(fragments_.assign(fragments))) { STORAGE_LOG(WARN, "fail to assign fragments", KR(ret)); - } else if (OB_FAIL(reader_.init(data_block_size, buf_size, compressor_type))) { + } else if (OB_FAIL(reader_.init(data_block_size, compressor_type))) { STORAGE_LOG(WARN, "fail to init fragment reader", KR(ret)); } else if (OB_FAIL(switch_next_fragment())) { STORAGE_LOG(WARN, "fail to switch next fragment", KR(ret)); @@ -158,7 +156,7 @@ class ObDirectLoadExternalSortScanner : public ObDirectLoadExternalIterator public: ObDirectLoadExternalSortScanner(); virtual ~ObDirectLoadExternalSortScanner(); - int init(int64_t data_block_size, int64_t buf_size, common::ObCompressorType compressor_type, + int init(int64_t data_block_size, common::ObCompressorType compressor_type, const ObDirectLoadExternalFragmentArray &fragments, Compare *compare); int get_next_item(const T *&item) override; void reuse(); @@ -197,7 +195,7 @@ void ObDirectLoadExternalSortScanner::reuse() template int ObDirectLoadExternalSortScanner::init( - int64_t data_block_size, int64_t buf_size, common::ObCompressorType compressor_type, + int64_t data_block_size, common::ObCompressorType compressor_type, const ObDirectLoadExternalFragmentArray &fragments, Compare *compare) { int ret = common::OB_SUCCESS; @@ -205,12 +203,10 @@ int ObDirectLoadExternalSortScanner::init( ret = common::OB_INIT_TWICE; STORAGE_LOG(WARN, "ObDirectLoadExternalSortScanner init twice", KR(ret), KP(this)); } else if (OB_UNLIKELY(data_block_size <= 0 || data_block_size % DIO_ALIGN_SIZE != 0 || - buf_size <= 0 || buf_size % DIO_ALIGN_SIZE != 0 || - data_block_size > buf_size || compressor_type <= common::ObCompressorType::INVALID_COMPRESSOR || fragments.empty() || nullptr == compare)) { ret = common::OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", KR(ret), K(buf_size), K(compressor_type), K(fragments), + STORAGE_LOG(WARN, "invalid argument", KR(ret), K(compressor_type), K(fragments), KP(compare)); } else { allocator_.set_tenant_id(MTL_ID()); @@ -220,7 +216,7 @@ int ObDirectLoadExternalSortScanner::init( if (OB_ISNULL(reader = OB_NEWx(ExternalReader, (&allocator_)))) { ret = common::OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "fail to new fragment reader", KR(ret)); - } else if (OB_FAIL(reader->init(data_block_size, buf_size, compressor_type))) { + } else if (OB_FAIL(reader->init(data_block_size, compressor_type))) { STORAGE_LOG(WARN, "fail to init fragment reader", KR(ret)); } else if (OB_FAIL(reader->open(fragment.file_handle_, 0, fragment.file_size_))) { STORAGE_LOG(WARN, "fail to open fragment", KR(ret)); diff --git a/src/storage/direct_load/ob_direct_load_mem_loader.cpp b/src/storage/direct_load/ob_direct_load_mem_loader.cpp index 2006abf7d3..d258099310 100644 --- a/src/storage/direct_load/ob_direct_load_mem_loader.cpp +++ b/src/storage/direct_load/ob_direct_load_mem_loader.cpp @@ -69,7 +69,6 @@ int ObDirectLoadMemLoader::work() ObDirectLoadExternalFragment &fragment = fragments_.at(i); ExternalReader external_reader; if (OB_FAIL(external_reader.init(mem_ctx_->table_data_desc_.external_data_block_size_, - fragment.max_data_block_size_, mem_ctx_->table_data_desc_.compressor_type_))) { LOG_WARN("fail to init external reader", KR(ret)); } else if (OB_FAIL(external_reader.open(fragment.file_handle_, 0, fragment.file_size_))) { diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_block_reader.cpp b/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_block_reader.cpp index 64e27b98d7..358102ccf2 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_block_reader.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_block_reader.cpp @@ -30,7 +30,7 @@ ObDirectLoadMultipleHeapTableIndexBlockReader::~ObDirectLoadMultipleHeapTableInd int ObDirectLoadMultipleHeapTableIndexBlockReader::init(int64_t data_block_size, ObCompressorType compressor_type) { - return ParentType::init(data_block_size, data_block_size, compressor_type); + return ParentType::init(data_block_size, compressor_type); } int ObDirectLoadMultipleHeapTableIndexBlockReader::get_next_index( diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table_scanner.cpp b/src/storage/direct_load/ob_direct_load_multiple_heap_table_scanner.cpp index 071533dcbf..bd3da37ee9 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table_scanner.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table_scanner.cpp @@ -49,7 +49,6 @@ int ObDirectLoadMultipleHeapTableTabletWholeScanner::init( if (OB_FAIL(index_scanner_.init(heap_table, tablet_id, table_data_desc))) { LOG_WARN("fail to init index scanner", KR(ret)); } else if (OB_FAIL(data_block_reader_.init(table_data_desc.sstable_data_block_size_, - heap_table->get_meta().max_data_block_size_, table_data_desc.compressor_type_))) { LOG_WARN("fail to init data block reader", KR(ret)); } else if (OB_FAIL(switch_next_fragment())) { diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp b/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp index 2e821834d7..416d0a3ea2 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp @@ -172,7 +172,6 @@ int ObDirectLoadMultipleHeapTableSorter::work() const ObDirectLoadExternalFragment &fragment = fragments_.at(i); ExternalReader external_reader; if (OB_FAIL(external_reader.init(mem_ctx_->table_data_desc_.external_data_block_size_, - fragment.max_data_block_size_, mem_ctx_->table_data_desc_.compressor_type_))) { LOG_WARN("fail to init external reader", KR(ret)); } else if (OB_FAIL(external_reader.open(fragment.file_handle_, 0, fragment.file_size_))) { diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_data_block_scanner.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable_data_block_scanner.cpp index e8b5cd4603..9d5cf1b4a2 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_data_block_scanner.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_data_block_scanner.cpp @@ -63,7 +63,6 @@ int ObDirectLoadMultipleSSTableDataBlockScanner::init( table_data_desc.compressor_type_))) { LOG_WARN("fail to index block reader", KR(ret)); } else if (OB_FAIL(data_block_reader.init(table_data_desc.sstable_data_block_size_, - sstable->get_meta().max_data_block_size_, table_data_desc.compressor_type_))) { LOG_WARN("fail to data block reader", KR(ret)); } else if (OB_FAIL(locate_left_border(index_block_reader, data_block_reader))) { diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_index_block_meta_scanner.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable_index_block_meta_scanner.cpp index 90bed17ed1..9d80fbd19e 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_index_block_meta_scanner.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_index_block_meta_scanner.cpp @@ -62,7 +62,6 @@ int ObDirectLoadMultipleSSTableIndexBlockMetaWholeScanner::init( table_data_desc.compressor_type_))) { LOG_WARN("fail to index block reader", KR(ret)); } else if (OB_FAIL(data_block_reader_.init(table_data_desc.sstable_data_block_size_, - sstable->get_meta().max_data_block_size_, table_data_desc.compressor_type_))) { LOG_WARN("fail to data block reader", KR(ret)); } else { @@ -148,7 +147,6 @@ int ObDirectLoadMultipleSSTableIndexBlockMetaTabletWholeScanner::init( table_data_desc.compressor_type_))) { LOG_WARN("fail to index block reader", KR(ret)); } else if (OB_FAIL(data_block_reader_.init(table_data_desc.sstable_data_block_size_, - sstable->get_meta().max_data_block_size_, table_data_desc.compressor_type_))) { LOG_WARN("fail to data block reader", KR(ret)); } else if (OB_FAIL(locate_left_border(index_block_reader_, data_block_reader_))) { diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_scanner.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable_scanner.cpp index ea55d031b3..c5f3013132 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_scanner.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_scanner.cpp @@ -60,7 +60,6 @@ int ObDirectLoadMultipleSSTableScanner::init(ObDirectLoadMultipleSSTable *sstabl if (OB_FAIL(data_block_scanner_.init(sstable, table_data_desc, range, datum_utils))) { LOG_WARN("fail to init data block scanner", KR(ret)); } else if (OB_FAIL(data_block_reader_.init(table_data_desc.sstable_data_block_size_, - sstable->get_meta().max_data_block_size_, table_data_desc.compressor_type_))) { LOG_WARN("fail to init data block reader", KR(ret)); } else { diff --git a/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp b/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp index 019dc1f407..4d790462de 100644 --- a/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp +++ b/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp @@ -573,7 +573,6 @@ int ObDirectLoadPartitionHeapTableMergeTask::RowIterator::init( } // init scanner_ else if (OB_FAIL(scanner_.init(merge_param.table_data_desc_.external_data_block_size_, - external_table->get_meta().max_data_block_size_, merge_param.table_data_desc_.compressor_type_, external_table->get_fragments()))) { LOG_WARN("fail to init fragment scanner", KR(ret)); diff --git a/src/storage/direct_load/ob_direct_load_sstable_index_block_reader.cpp b/src/storage/direct_load/ob_direct_load_sstable_index_block_reader.cpp index 24969aa892..106074bc39 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_index_block_reader.cpp +++ b/src/storage/direct_load/ob_direct_load_sstable_index_block_reader.cpp @@ -31,7 +31,7 @@ ObDirectLoadSSTableIndexBlockReader::~ObDirectLoadSSTableIndexBlockReader() int ObDirectLoadSSTableIndexBlockReader::init(int64_t data_block_size, ObCompressorType compressor_type) { - return ParentType::init(data_block_size, data_block_size, compressor_type); + return ParentType::init(data_block_size, compressor_type); } int ObDirectLoadSSTableIndexBlockReader::get_next_entry(const ObDirectLoadSSTableIndexEntry *&entry)