diff --git a/src/storage/blocksstable/ob_tmp_file.cpp b/src/storage/blocksstable/ob_tmp_file.cpp index 3d4e2d0c79..bd97e7e05a 100644 --- a/src/storage/blocksstable/ob_tmp_file.cpp +++ b/src/storage/blocksstable/ob_tmp_file.cpp @@ -43,17 +43,27 @@ bool ObTmpFileIOInfo::is_valid() const } ObTmpFileIOHandle::ObTmpFileIOHandle() - : tmp_file_(NULL), io_handles_(), page_cache_handles_(), block_cache_handles_(), buf_(NULL), - size_(0), is_read_(false), has_wait_(false) -{ -} + : tmp_file_(NULL), + io_handles_(), + page_cache_handles_(), + block_cache_handles_(), + buf_(NULL), + size_(0), + is_read_(false), + has_wait_(false), + expect_read_size_(0), + last_read_offset_(-1), + io_flag_(), + update_offset_in_file_(false) +{} ObTmpFileIOHandle::~ObTmpFileIOHandle() { reset(); } -int ObTmpFileIOHandle::prepare_read(char* read_buf, ObTmpFile* file) +int ObTmpFileIOHandle::prepare_read(const int64_t read_size, const int64_t read_offset, const common::ObIODesc &io_flag, + char *read_buf, ObTmpFile *file) { int ret = OB_SUCCESS; if (NULL == read_buf || NULL == file) { @@ -65,6 +75,9 @@ int ObTmpFileIOHandle::prepare_read(char* read_buf, ObTmpFile* file) tmp_file_ = file; is_read_ = true; has_wait_ = false; + expect_read_size_ = read_size; + last_read_offset_ = read_offset; + io_flag_ = io_flag; } return ret; } @@ -81,6 +94,8 @@ int ObTmpFileIOHandle::prepare_write(char* write_buf, const int64_t write_size, tmp_file_ = file; is_read_ = false; has_wait_ = false; + expect_read_size_ = 0; + last_read_offset_ = -1; } return ret; } @@ -91,33 +106,68 @@ int ObTmpFileIOHandle::wait(const int64_t timeout_ms) if (OB_UNLIKELY(has_wait_ && is_read_)) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(ERROR, "read wait() isn't reentrant interface, shouldn't call again", K(ret)); - } else { - for (int32_t i = 0; OB_SUCC(ret) && i < block_cache_handles_.count(); i++) { - ObBlockCacheHandle &tmp = block_cache_handles_.at(i); - MEMCPY(tmp.buf_, tmp.block_handle_.value_->get_buffer() + tmp.offset_, tmp.size_); - tmp.block_handle_.reset(); - } - block_cache_handles_.reset(); - - for (int32_t i = 0; OB_SUCC(ret) && i < page_cache_handles_.count(); i++) { - ObPageCacheHandle &tmp = page_cache_handles_.at(i); - MEMCPY(tmp.buf_, tmp.page_handle_.value_->get_buffer() + tmp.offset_, tmp.size_); - tmp.page_handle_.reset(); - } - page_cache_handles_.reset(); - - for (int32_t i = 0; OB_SUCC(ret) && i < io_handles_.count(); i++) { - ObIOReadHandle &tmp = io_handles_.at(i); - if (OB_FAIL(tmp.macro_handle_.wait(timeout_ms))) { - STORAGE_LOG(WARN, "fail to wait tmp read io", K(ret)); - } else { - MEMCPY(tmp.buf_, tmp.macro_handle_.get_buffer() + tmp.offset_, tmp.size_); - tmp.macro_handle_.reset(); + } else if (OB_FAIL(do_wait(timeout_ms))) { + STORAGE_LOG(WARN, "fail to wait tmp file io", K(ret), K(timeout_ms)); + } else if (is_read_ && !has_wait_) { + if (size_ == expect_read_size_) { + // do nothing + } else if (OB_UNLIKELY(size_ > expect_read_size_)) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "read size more than expected size", K(ret), K(timeout_ms)); + } else { + ObTmpFileIOInfo io_info; + io_info.fd_ = tmp_file_->get_fd(); + io_info.dir_id_ = tmp_file_->get_dir_id(); + io_info.tenant_id_ = tmp_file_->get_tenant_id(); + io_info.size_ = expect_read_size_; + io_info.buf_ = buf_; + io_info.io_desc_ = io_flag_; + while (OB_SUCC(ret) && size_ < expect_read_size_) { + if (OB_FAIL(tmp_file_->once_aio_read_batch(io_info, update_offset_in_file_, last_read_offset_, *this))) { + STORAGE_LOG(WARN, "fail to read once batch", K(ret), K(timeout_ms), K(io_info), K(*this)); + } else if (OB_FAIL(do_wait(timeout_ms))) { + STORAGE_LOG(WARN, "fail to wait tmp file io", K(ret), K(timeout_ms)); + } } } - io_handles_.reset(); - has_wait_ = true; } + + if (OB_SUCC(ret) || OB_ITER_END == ret) { + has_wait_ = true; + expect_read_size_ = 0; + last_read_offset_ = -1; + update_offset_in_file_ = false; + } + return ret; +} + +int ObTmpFileIOHandle::do_wait(const int64_t timeout_ms) +{ + int ret = OB_SUCCESS; + for (int32_t i = 0; OB_SUCC(ret) && i < block_cache_handles_.count(); i++) { + ObBlockCacheHandle &tmp = block_cache_handles_.at(i); + MEMCPY(tmp.buf_, tmp.block_handle_.value_->get_buffer() + tmp.offset_, tmp.size_); + tmp.block_handle_.reset(); + } + block_cache_handles_.reset(); + + for (int32_t i = 0; OB_SUCC(ret) && i < page_cache_handles_.count(); i++) { + ObPageCacheHandle &tmp = page_cache_handles_.at(i); + MEMCPY(tmp.buf_, tmp.page_handle_.value_->get_buffer() + tmp.offset_, tmp.size_); + tmp.page_handle_.reset(); + } + page_cache_handles_.reset(); + + for (int32_t i = 0; OB_SUCC(ret) && i < io_handles_.count(); i++) { + ObIOReadHandle &tmp = io_handles_.at(i); + if (OB_FAIL(tmp.macro_handle_.wait(timeout_ms))) { + STORAGE_LOG(WARN, "fail to wait tmp read io", K(ret)); + } else { + MEMCPY(tmp.buf_, tmp.macro_handle_.get_buffer() + tmp.offset_, tmp.size_); + tmp.macro_handle_.reset(); + } + } + io_handles_.reset(); return ret; } @@ -140,6 +190,9 @@ void ObTmpFileIOHandle::reset() tmp_file_ = NULL; is_read_ = false; has_wait_ = false; + expect_read_size_ = 0; + last_read_offset_ = -1; + update_offset_in_file_ = false; } bool ObTmpFileIOHandle::is_valid() @@ -556,51 +609,98 @@ int64_t ObTmpFile::find_first_extent(const int64_t offset) return first_extent; } -int ObTmpFile::aio_pread_without_lock(const ObTmpFileIOInfo &io_info, - int64_t &offset, ObTmpFileIOHandle &handle) +int ObTmpFile::aio_read_without_lock(const ObTmpFileIOInfo &io_info, int64_t &offset, ObTmpFileIOHandle &handle) { int ret = OB_SUCCESS; - char* buf = io_info.buf_; - int64_t size = io_info.size_; - int64_t read_size = 0; - ObTmpFileExtent* tmp = file_meta_.get_last_extent(); - if (NULL == tmp) { + ObTmpFileExtent *tmp = nullptr; + + if (OB_ISNULL(tmp = file_meta_.get_last_extent())) { ret = OB_BAD_NULL_ERROR; - STORAGE_LOG(WARN, "fail to read, because this temporary file is empty", K(ret), K(io_info)); - } else if (size > 0 && offset >= tmp->get_global_end()) { + STORAGE_LOG(WARN, "fail to read, because the tmp file is empty", K(ret), KP(tmp), K(io_info)); + } else if (OB_UNLIKELY(io_info.size_ > 0 && offset >= tmp->get_global_end())) { ret = OB_ITER_END; - } else if (OB_FAIL(handle.prepare_read(io_info.buf_, this))) { - STORAGE_LOG(WARN, "fail to prepare read io handle", K(ret)); + } else if (OB_FAIL(handle.prepare_read(io_info.size_, offset, io_info.io_desc_, io_info.buf_, this))) { + STORAGE_LOG(WARN, "fail to prepare read io handle", K(ret), K(io_info), K(offset)); + } else if (once_aio_read_batch_without_lock(io_info, offset, handle)) { + STORAGE_LOG(WARN, "fail to read one batch", K(ret), K(offset), K(handle)); } else { - int64_t ith_extent = 0; - common::ObIArray &extents = file_meta_.get_extents(); - if (offset >= last_extent_min_offset_ && offset_ <= last_extent_max_offset_) { - ith_extent = last_extent_id_; - } else { - ith_extent = find_first_extent(offset); - } - for (; OB_SUCC(ret) && ith_extent < extents.count() && size > 0; ++ith_extent) { - tmp = extents.at(ith_extent); - if (tmp->get_global_start() <= offset && offset < tmp->get_global_end()) { - if (offset + size > tmp->get_global_end()) { - read_size = tmp->get_global_end() - offset; - } else { - read_size = size; - } - // read from the extent. - if (OB_FAIL(tmp->read(io_info, offset - tmp->get_global_start(), read_size, buf, handle))) { - STORAGE_LOG(WARN, "fail to read the extent", K(ret), K(io_info), K(buf), KP_(io_info.buf)); - } else { - offset += read_size; - size -= read_size; - buf += read_size; - handle.add_data_size(read_size); - last_extent_id_ = ith_extent; - last_extent_min_offset_ = tmp->get_global_start(); - last_extent_max_offset_ = tmp->get_global_end(); - } + handle.set_last_read_offset(offset); + } + return ret; +} + +int ObTmpFile::once_aio_read_batch( + const ObTmpFileIOInfo &io_info, const bool need_update_offset, int64_t &offset, ObTmpFileIOHandle &handle) +{ + int ret = OB_SUCCESS; + ObTmpFileExtent *tmp = nullptr; + const int64_t remain_size = io_info.size_ - handle.get_data_size(); + + SpinWLockGuard guard(lock_); + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + STORAGE_LOG(WARN, "ObTmpFile has not been initialized", K(ret)); + } else if (OB_UNLIKELY(offset < 0 || remain_size < 0) || OB_ISNULL(io_info.buf_)) { + ret = OB_INVALID_ARGUMENT; + STORAGE_LOG(WARN, "invalid argument", K(ret), K(offset), K(remain_size), KP(io_info.buf_)); + } else if (OB_ISNULL(tmp = file_meta_.get_last_extent())) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "unexpected error, null tmp file extent", K(ret), KP(tmp), K(io_info)); + } else if (OB_UNLIKELY(remain_size > 0 && offset >= tmp->get_global_end())) { + ret = OB_ITER_END; + } else if (once_aio_read_batch_without_lock(io_info, offset, handle)) { + STORAGE_LOG(WARN, "fail to read one batch", K(ret), K(offset), K(handle)); + } else { + handle.set_last_read_offset(offset); + } + + if (need_update_offset) { + offset_ = offset; + } + return ret; +} + +int ObTmpFile::once_aio_read_batch_without_lock( + const ObTmpFileIOInfo &io_info, int64_t &offset, ObTmpFileIOHandle &handle) +{ + int ret = OB_SUCCESS; + int64_t one_batch_read_size = 0; + char *buf = io_info.buf_ + handle.get_data_size(); + int64_t remain_size = io_info.size_ - handle.get_data_size(); + int64_t read_size = 0; + int64_t ith_extent = 0; + ObTmpFileExtent *tmp = nullptr; + common::ObIArray &extents = file_meta_.get_extents(); + + if (offset >= last_extent_min_offset_ && offset_ <= last_extent_max_offset_) { + ith_extent = last_extent_id_; + } else { + ith_extent = find_first_extent(offset); + } + + while (OB_SUCC(ret) && ith_extent < extents.count() && remain_size > 0 && one_batch_read_size < READ_SIZE_PER_BATCH) { + tmp = extents.at(ith_extent); + if (tmp->get_global_start() <= offset && offset < tmp->get_global_end()) { + if (offset + remain_size > tmp->get_global_end()) { + read_size = tmp->get_global_end() - offset; + } else { + read_size = remain_size; + } + // read from the extent. + if (OB_FAIL(tmp->read(io_info, offset - tmp->get_global_start(), read_size, buf, handle))) { + STORAGE_LOG(WARN, "fail to read the extent", K(ret), K(io_info), K(buf), KP_(io_info.buf)); + } else { + offset += read_size; + remain_size -= read_size; + buf += read_size; + one_batch_read_size += read_size; + handle.add_data_size(read_size); + last_extent_id_ = ith_extent; + last_extent_min_offset_ = tmp->get_global_start(); + last_extent_max_offset_ = tmp->get_global_end(); } } + ++ith_extent; } return ret; } @@ -614,10 +714,12 @@ int ObTmpFile::aio_read(const ObTmpFileIOInfo& io_info, ObTmpFileIOHandle& handl } else { tenant_id_ = io_info.tenant_id_; SpinWLockGuard guard(lock_); - if (OB_FAIL(aio_pread_without_lock(io_info, offset_, handle))) { + if (OB_FAIL(aio_read_without_lock(io_info, offset_, handle))) { if (OB_ITER_END != ret) { - STORAGE_LOG(WARN, "fail to do aio pread without lock", K(ret)); + STORAGE_LOG(WARN, "fail to do aio read without lock", K(ret)); } + } else { + handle.set_update_offset_in_file(); } } return ret; @@ -631,9 +733,9 @@ int ObTmpFile::aio_pread(const ObTmpFileIOInfo& io_info, const int64_t offset, O } else { int64_t tmp_offset = offset; SpinRLockGuard guard(lock_); - if (OB_FAIL(aio_pread_without_lock(io_info, tmp_offset, handle))) { + if (OB_FAIL(aio_read_without_lock(io_info, tmp_offset, handle))) { if (OB_ITER_END != ret) { - STORAGE_LOG(WARN, "fail to do aio pread without lock", K(ret)); + STORAGE_LOG(WARN, "fail to do aio read without lock", K(ret)); } } } @@ -677,7 +779,9 @@ int ObTmpFile::read(const ObTmpFileIOInfo& io_info, const int64_t timeout_ms, Ob } } } else if (OB_FAIL(handle.wait(timeout_ms))) { - STORAGE_LOG(WARN, "fail to wait io finish", K(ret), K(timeout_ms)); + if (OB_ITER_END != ret) { + STORAGE_LOG(WARN, "fail to wait io finish", K(ret), K(timeout_ms)); + } } return ret; } @@ -700,7 +804,9 @@ int ObTmpFile::pread( } } } else if (OB_FAIL(handle.wait(timeout_ms))) { - STORAGE_LOG(WARN, "fail to wait io finish", K(ret), K(timeout_ms)); + if (OB_ITER_END != ret) { + STORAGE_LOG(WARN, "fail to wait io finish", K(ret), K(timeout_ms)); + } } return ret; } @@ -811,7 +917,7 @@ uint64_t ObTmpFile::get_tenant_id() const return tenant_id_; } -int64_t ObTmpFile::get_fd() +int64_t ObTmpFile::get_fd() const { return file_meta_.get_fd(); } diff --git a/src/storage/blocksstable/ob_tmp_file.h b/src/storage/blocksstable/ob_tmp_file.h index a5a0c939f3..e8b2e347fe 100644 --- a/src/storage/blocksstable/ob_tmp_file.h +++ b/src/storage/blocksstable/ob_tmp_file.h @@ -91,8 +91,9 @@ public: { return size_; } - int prepare_read(char* read_buf, ObTmpFile* file); - int prepare_write(char* write_buf, const int64_t write_size, ObTmpFile* file); + int prepare_read(const int64_t read_size, const int64_t read_offset, const common::ObIODesc &io_flag, char *read_buf, + ObTmpFile *file); + int prepare_write(char *write_buf, const int64_t write_size, ObTmpFile *file); OB_INLINE void add_data_size(const int64_t size) { size_ += size; @@ -101,6 +102,14 @@ public: { size_ -= size; } + OB_INLINE void set_update_offset_in_file() + { + update_offset_in_file_ = true; + } + OB_INLINE void set_last_read_offset(const int64_t last_read_offset) + { + last_read_offset_ = last_read_offset; + } int wait(const int64_t timeout_ms); void reset(); bool is_valid(); @@ -116,7 +125,16 @@ public: { return block_cache_handles_; } - TO_STRING_KV(KP_(buf), K_(size), K_(is_read)); + OB_INLINE int64_t get_last_read_offset() const + { + return last_read_offset_; + } + + TO_STRING_KV(KP_(buf), K_(size), K_(is_read), K_(has_wait), K_(expect_read_size), K_(last_read_offset), K_(io_flag), + K_(update_offset_in_file)); + +private: + int do_wait(const int64_t timeout_ms); private: ObTmpFile* tmp_file_; @@ -127,6 +145,10 @@ private: int64_t size_; // has read or to write size. bool is_read_; bool has_wait_; + int64_t expect_read_size_; + int64_t last_read_offset_; // only for more than 8MB read. + common::ObIODesc io_flag_; + bool update_offset_in_file_; DISALLOW_COPY_AND_ASSIGN(ObTmpFileIOHandle); }; @@ -280,15 +302,21 @@ public: int clear(); int64_t get_dir_id() const; uint64_t get_tenant_id() const; - int64_t get_fd(); + int64_t get_fd() const; int sync(const int64_t timeout_ms); int deep_copy(char* buf, const int64_t buf_len, ObTmpFile*& value) const; inline int64_t get_deep_copy_size() const; + void get_file_size(int64_t &file_size); + // only for ObTmpFileIOHandle, once more than READ_SIZE_PER_BATCH read. + int once_aio_read_batch( + const ObTmpFileIOInfo &io_info, const bool need_update_offset, int64_t &offset, ObTmpFileIOHandle &handle); + TO_STRING_KV(K_(file_meta), K_(is_big), K_(tenant_id), K_(is_inited)); private: - int write_file_extent(const ObTmpFileIOInfo& io_info, ObTmpFileExtent* file_extent, int64_t& size, char*& buf); - int aio_pread_without_lock(const ObTmpFileIOInfo& io_info, int64_t& offset, ObTmpFileIOHandle& handle); + int write_file_extent(const ObTmpFileIOInfo &io_info, ObTmpFileExtent *file_extent, int64_t &size, char *&buf); + int aio_read_without_lock(const ObTmpFileIOInfo &io_info, int64_t &offset, ObTmpFileIOHandle &handle); + int once_aio_read_batch_without_lock(const ObTmpFileIOInfo &io_info, int64_t &offset, ObTmpFileIOHandle &handle); int64_t small_file_prealloc_size(); int64_t big_file_prealloc_size(); int64_t find_first_extent(const int64_t offset); @@ -299,6 +327,7 @@ private: // SMALL_FILE_MAX_THRESHOLD < BIG_FILE_PREALLOC_EXTENT_SIZE < block size static const int64_t SMALL_FILE_MAX_THRESHOLD = 4; static const int64_t BIG_FILE_PREALLOC_EXTENT_SIZE = 8; + static const int64_t READ_SIZE_PER_BATCH = 8 * 1024 * 1024; // 8MB ObTmpFileMeta file_meta_; bool is_big_; diff --git a/unittest/storage/blocksstable/test_tmp_file.cpp b/unittest/storage/blocksstable/test_tmp_file.cpp index 1beecc8e94..05b88c5534 100644 --- a/unittest/storage/blocksstable/test_tmp_file.cpp +++ b/unittest/storage/blocksstable/test_tmp_file.cpp @@ -585,11 +585,20 @@ TEST_F(TestTmpFile, test_big_file) ASSERT_EQ(OB_SUCCESS, ret); io_info.buf_ = read_buf; + io_info.size_ = write_size; + ret = ObTmpFileManager::get_instance().aio_read(io_info, handle); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_TRUE(handle.size_ < handle.expect_read_size_); + ASSERT_EQ(OB_SUCCESS, handle.wait(timeout_ms)); + ASSERT_EQ(write_size, handle.get_data_size()); + int cmp = memcmp(handle.get_buffer(), write_buf, handle.get_data_size()); + ASSERT_EQ(0, cmp); + io_info.size_ = macro_block_size; ret = ObTmpFileManager::get_instance().pread(io_info, 100, timeout_ms, handle); ASSERT_EQ(OB_SUCCESS, ret); ASSERT_EQ(macro_block_size, handle.get_data_size()); - int cmp = memcmp(handle.get_buffer(), write_buf + 100, handle.get_data_size()); + cmp = memcmp(handle.get_buffer(), write_buf + 100, handle.get_data_size()); ASSERT_EQ(0, cmp); io_info.size_ = write_size; @@ -1068,7 +1077,7 @@ TEST_F(TestTmpFile, test_write_less_than_macro_block_size) ASSERT_EQ(0, cmp); ret = ObTmpFileManager::get_instance().pread(io_info, 20, timeout_ms, handle); - ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(OB_ITER_END, ret); ASSERT_EQ(256 - 20, handle.get_data_size()); cmp = memcmp(handle.get_buffer(), write_buf + 20, 256 - 20); ASSERT_EQ(0, cmp); @@ -1157,7 +1166,7 @@ TEST_F(TestTmpFile, test_write_more_than_one_macro_block) io_info.size_ = macro_block_size; ret = ObTmpFileManager::get_instance().pread(io_info, 400, timeout_ms, handle); - ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(OB_ITER_END, ret); ASSERT_EQ(macro_block_size + 256 - 400, handle.get_data_size()); cmp = memcmp(handle.get_buffer(), write_buf + 400, macro_block_size + 256 - 400); ASSERT_EQ(0, cmp); @@ -1178,7 +1187,7 @@ TEST_F(TestTmpFile, test_write_more_than_one_macro_block) io_info.size_ = 200; ret = ObTmpFileManager::get_instance().pread(io_info, macro_block_size + 100, timeout_ms, handle); - ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(OB_ITER_END, ret); ASSERT_EQ(156, handle.get_data_size()); cmp = memcmp(handle.get_buffer(), write_buf + macro_block_size + 100, handle.get_data_size()); ASSERT_EQ(0, cmp);