diff --git a/mittest/mtlenv/storage/tmp_file/test_tmp_file.cpp b/mittest/mtlenv/storage/tmp_file/test_tmp_file.cpp index e9eb74accd..fb9137d0f0 100644 --- a/mittest/mtlenv/storage/tmp_file/test_tmp_file.cpp +++ b/mittest/mtlenv/storage/tmp_file/test_tmp_file.cpp @@ -1430,6 +1430,91 @@ TEST_F(TestTmpFile, test_big_file_disable_page_cache) test_big_file(write_size, wbp_mem_limit, io_info); } +TEST_F(TestTmpFile, test_aio_pread) +{ + int ret = OB_SUCCESS; + const int64_t write_size = 10 * 1024 * 1024; // 10MB + char *write_buf = new char [write_size]; + for (int64_t i = 0; i < write_size;) { + int64_t random_length = generate_random_int(1024, 8 * 1024); + int64_t random_int = generate_random_int(0, 256); + for (int64_t j = 0; j < random_length && i + j < write_size; ++j) { + write_buf[i + j] = random_int; + } + i += random_length; + } + + int64_t dir = -1; + int64_t fd = -1; + ret = MTL(ObTenantTmpFileManager *)->alloc_dir(dir); + ASSERT_EQ(OB_SUCCESS, ret); + ret = MTL(ObTenantTmpFileManager *)->open(fd, dir); + std::cout << "open temporary file: " << fd << std::endl; + ASSERT_EQ(OB_SUCCESS, ret); + ObTmpFileHandle file_handle; + ret = MTL(ObTenantTmpFileManager *)->get_tmp_file(fd, file_handle); + ASSERT_EQ(OB_SUCCESS, ret); + file_handle.get()->page_idx_cache_.max_bucket_array_capacity_ = SMALL_WBP_IDX_CACHE_MAX_CAPACITY; + + ObTmpFileIOInfo io_info; + io_info.fd_ = fd; + io_info.io_desc_.set_wait_event(2); + io_info.buf_ = write_buf; + io_info.size_ = write_size; + io_info.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS; + + // 1. Write data + int64_t write_time = ObTimeUtility::current_time(); + ret = MTL(ObTenantTmpFileManager *)->write(io_info); + write_time = ObTimeUtility::current_time() - write_time; + ASSERT_EQ(OB_SUCCESS, ret); + + // 2. check aio_pread + int64_t read_size = 9 * 1024 * 1024; // 9MB + int64_t read_offset = 0; + char *read_buf = new char [read_size]; + ObTmpFileIOHandle handle; + io_info.buf_ = read_buf; + io_info.size_ = read_size; + ret = MTL(ObTenantTmpFileManager *)->aio_pread(io_info, read_offset, handle); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(0, handle.get_done_size()); + ret = handle.wait(); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(io_info.size_, handle.get_done_size()); + int cmp = memcmp(handle.get_buffer(), write_buf + read_offset, io_info.size_); + ASSERT_EQ(0, cmp); + handle.reset(); + delete[] read_buf; + + // 3. execute two aio_pread, but io_handle doesn't not call wait() + read_size = 5 * 1024 * 1024; // 5MB + read_offset = 0; + read_buf = new char [read_size]; + io_info.buf_ = read_buf; + io_info.size_ = read_size; + ret = MTL(ObTenantTmpFileManager *)->aio_pread(io_info, read_offset, handle); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(0, handle.get_done_size()); + + int read_offset2 = read_offset + read_size; + ret = MTL(ObTenantTmpFileManager *)->aio_pread(io_info, read_offset2, handle); + ASSERT_NE(OB_SUCCESS, ret); + + ret = handle.wait(); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(io_info.size_, handle.get_done_size()); + cmp = memcmp(handle.get_buffer(), write_buf + read_offset, io_info.size_); + ASSERT_EQ(0, cmp); + handle.reset(); + delete[] read_buf; + + file_handle.reset(); + ret = MTL(ObTenantTmpFileManager *)->remove(fd); + ASSERT_EQ(OB_SUCCESS, ret); + + LOG_INFO("test_cached_read"); +} } // namespace oceanbase int main(int argc, char **argv) diff --git a/src/storage/tmp_file/ob_tmp_file_io_ctx.cpp b/src/storage/tmp_file/ob_tmp_file_io_ctx.cpp index bc846f7e78..0930b1dd37 100644 --- a/src/storage/tmp_file/ob_tmp_file_io_ctx.cpp +++ b/src/storage/tmp_file/ob_tmp_file_io_ctx.cpp @@ -94,8 +94,12 @@ void ObTmpFileIOCtx::reuse() for (int32_t i = 0; i < page_cache_handles_.count(); i++) { page_cache_handles_.at(i).page_handle_.reset(); } + for (int32_t i = 0; i < block_cache_handles_.count(); i++) { + block_cache_handles_.at(i).block_handle_.reset(); + } io_handles_.reset(); page_cache_handles_.reset(); + block_cache_handles_.reset(); } void ObTmpFileIOCtx::reset() @@ -264,7 +268,7 @@ int ObTmpFileIOCtx::do_read_wait_() char * read_buf = page_cache_handle.dest_user_read_buf_; if (OB_UNLIKELY(!check_buf_range_valid(read_buf, read_size))) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid range", KR(ret), K(read_buf), K(read_size), K(buf_size_)); + LOG_WARN("invalid range", KR(ret), KP(read_buf), KP(buf_), K(read_size), K(buf_size_)); } else if (OB_UNLIKELY(offset_in_page + read_size > ObTmpFileGlobal::PAGE_SIZE)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("read size is over than page range", KR(ret), KPC(this), K(offset_in_page), K(read_size)); @@ -290,7 +294,7 @@ int ObTmpFileIOCtx::do_read_wait_() char * read_buf = block_cache_handle.dest_user_read_buf_; if (OB_UNLIKELY(!check_buf_range_valid(read_buf, read_size))) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid range", KR(ret), K(read_buf), K(read_size), K(buf_size_)); + LOG_WARN("invalid range", KR(ret), KP(read_buf), KP(buf_), K(read_size), K(buf_size_)); } else if (OB_UNLIKELY(offset_in_block + read_size > OB_DEFAULT_MACRO_BLOCK_SIZE)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("read size is over than macro block range", KR(ret), KPC(this), K(offset_in_block), K(read_size)); @@ -319,7 +323,7 @@ int ObTmpFileIOCtx::do_read_wait_() char * read_buf = io_handle.dest_user_read_buf_; if (OB_UNLIKELY(!check_buf_range_valid(read_buf, size))) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid range", KR(ret), K(read_buf), K(size), K(buf_size_)); + LOG_WARN("invalid range", KR(ret), KP(read_buf), KP(buf_), K(size), K(buf_size_)); } else { MEMCPY(read_buf, data_buf + offset, size); io_handle.handle_.reset(); diff --git a/src/storage/tmp_file/ob_tmp_file_manager.cpp b/src/storage/tmp_file/ob_tmp_file_manager.cpp index 69f4109a96..790dbfd14f 100644 --- a/src/storage/tmp_file/ob_tmp_file_manager.cpp +++ b/src/storage/tmp_file/ob_tmp_file_manager.cpp @@ -290,7 +290,6 @@ int ObTenantTmpFileManager::aio_read(const ObTmpFileIOInfo &io_info, ObTmpFileIO { int ret = OB_SUCCESS; ObTmpFileHandle tmp_file_handle; - io_handle.reset(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; @@ -298,6 +297,10 @@ int ObTenantTmpFileManager::aio_read(const ObTmpFileIOInfo &io_info, ObTmpFileIO } else if (!io_info.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("fail to aio read, invalid argument", KR(ret), K(io_info)); + } else if (OB_UNLIKELY(io_handle.is_valid() && !io_handle.is_finished())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("tmp file io handle has remain data need to be waited", KR(ret), K(io_info), K(io_handle)); + } else if (FALSE_IT(io_handle.reset())) { } else if (OB_FAIL(get_tmp_file(io_info.fd_, tmp_file_handle))) { LOG_WARN("fail to get tmp file io handle", KR(ret), K(io_info)); } else if (OB_FAIL(io_handle.init_read(io_info, tmp_file_handle))) { @@ -316,7 +319,6 @@ int ObTenantTmpFileManager::aio_pread(const ObTmpFileIOInfo &io_info, { int ret = OB_SUCCESS; ObTmpFileHandle tmp_file_handle; - io_handle.reset(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; @@ -324,6 +326,10 @@ int ObTenantTmpFileManager::aio_pread(const ObTmpFileIOInfo &io_info, } else if (!io_info.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("fail to aio read, invalid argument", KR(ret), K(io_info)); + } else if (OB_UNLIKELY(io_handle.is_valid() && !io_handle.is_finished())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("tmp file io handle has remain data need to be waited", KR(ret), K(io_info), K(io_handle)); + } else if (FALSE_IT(io_handle.reset())) { } else if (OB_FAIL(get_tmp_file(io_info.fd_, tmp_file_handle))) { LOG_WARN("fail to get tmp file io handle", KR(ret), K(io_info)); } else if (OB_FAIL(io_handle.init_pread(io_info, offset, tmp_file_handle))) { @@ -340,7 +346,6 @@ int ObTenantTmpFileManager::read(const ObTmpFileIOInfo &io_info, ObTmpFileIOHand { int ret = OB_SUCCESS; ObTmpFileHandle tmp_file_handle; - io_handle.reset(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; @@ -348,6 +353,10 @@ int ObTenantTmpFileManager::read(const ObTmpFileIOInfo &io_info, ObTmpFileIOHand } else if (!io_info.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("fail to aio read, invalid argument", KR(ret), K(io_info)); + } else if (OB_UNLIKELY(io_handle.is_valid() && !io_handle.is_finished())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("tmp file io handle has remain data need to be waited", KR(ret), K(io_info), K(io_handle)); + } else if (FALSE_IT(io_handle.reset())) { } else if (OB_FAIL(get_tmp_file(io_info.fd_, tmp_file_handle))) { LOG_WARN("fail to get tmp file io handle", KR(ret), K(io_info)); } else if (OB_FAIL(io_handle.init_read(io_info, tmp_file_handle))) { @@ -372,7 +381,6 @@ int ObTenantTmpFileManager::pread(const ObTmpFileIOInfo &io_info, const int64_t { int ret = OB_SUCCESS; ObTmpFileHandle tmp_file_handle; - io_handle.reset(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; @@ -380,6 +388,10 @@ int ObTenantTmpFileManager::pread(const ObTmpFileIOInfo &io_info, const int64_t } else if (!io_info.is_valid()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("fail to aio read, invalid argument", KR(ret), K(io_info)); + } else if (OB_UNLIKELY(io_handle.is_valid() && !io_handle.is_finished())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("tmp file io handle has remain data need to be waited", KR(ret), K(io_info), K(io_handle)); + } else if (FALSE_IT(io_handle.reset())) { } else if (OB_FAIL(get_tmp_file(io_info.fd_, tmp_file_handle))) { LOG_WARN("fail to get tmp file io handle", KR(ret), K(io_info)); } else if (OB_FAIL(io_handle.init_pread(io_info, offset, tmp_file_handle))) {