fix reset bug of tmp file ctx
This commit is contained in:
@ -1430,6 +1430,91 @@ TEST_F(TestTmpFile, test_big_file_disable_page_cache)
|
|||||||
test_big_file(write_size, wbp_mem_limit, io_info);
|
test_big_file(write_size, wbp_mem_limit, io_info);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(TestTmpFile, test_aio_pread)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
const int64_t write_size = 10 * 1024 * 1024; // 10MB
|
||||||
|
char *write_buf = new char [write_size];
|
||||||
|
for (int64_t i = 0; i < write_size;) {
|
||||||
|
int64_t random_length = generate_random_int(1024, 8 * 1024);
|
||||||
|
int64_t random_int = generate_random_int(0, 256);
|
||||||
|
for (int64_t j = 0; j < random_length && i + j < write_size; ++j) {
|
||||||
|
write_buf[i + j] = random_int;
|
||||||
|
}
|
||||||
|
i += random_length;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t dir = -1;
|
||||||
|
int64_t fd = -1;
|
||||||
|
ret = MTL(ObTenantTmpFileManager *)->alloc_dir(dir);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
ret = MTL(ObTenantTmpFileManager *)->open(fd, dir);
|
||||||
|
std::cout << "open temporary file: " << fd << std::endl;
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
ObTmpFileHandle file_handle;
|
||||||
|
ret = MTL(ObTenantTmpFileManager *)->get_tmp_file(fd, file_handle);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
file_handle.get()->page_idx_cache_.max_bucket_array_capacity_ = SMALL_WBP_IDX_CACHE_MAX_CAPACITY;
|
||||||
|
|
||||||
|
ObTmpFileIOInfo io_info;
|
||||||
|
io_info.fd_ = fd;
|
||||||
|
io_info.io_desc_.set_wait_event(2);
|
||||||
|
io_info.buf_ = write_buf;
|
||||||
|
io_info.size_ = write_size;
|
||||||
|
io_info.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS;
|
||||||
|
|
||||||
|
// 1. Write data
|
||||||
|
int64_t write_time = ObTimeUtility::current_time();
|
||||||
|
ret = MTL(ObTenantTmpFileManager *)->write(io_info);
|
||||||
|
write_time = ObTimeUtility::current_time() - write_time;
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
|
||||||
|
// 2. check aio_pread
|
||||||
|
int64_t read_size = 9 * 1024 * 1024; // 9MB
|
||||||
|
int64_t read_offset = 0;
|
||||||
|
char *read_buf = new char [read_size];
|
||||||
|
ObTmpFileIOHandle handle;
|
||||||
|
io_info.buf_ = read_buf;
|
||||||
|
io_info.size_ = read_size;
|
||||||
|
ret = MTL(ObTenantTmpFileManager *)->aio_pread(io_info, read_offset, handle);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
ASSERT_EQ(0, handle.get_done_size());
|
||||||
|
ret = handle.wait();
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
ASSERT_EQ(io_info.size_, handle.get_done_size());
|
||||||
|
int cmp = memcmp(handle.get_buffer(), write_buf + read_offset, io_info.size_);
|
||||||
|
ASSERT_EQ(0, cmp);
|
||||||
|
handle.reset();
|
||||||
|
delete[] read_buf;
|
||||||
|
|
||||||
|
// 3. execute two aio_pread, but io_handle doesn't not call wait()
|
||||||
|
read_size = 5 * 1024 * 1024; // 5MB
|
||||||
|
read_offset = 0;
|
||||||
|
read_buf = new char [read_size];
|
||||||
|
io_info.buf_ = read_buf;
|
||||||
|
io_info.size_ = read_size;
|
||||||
|
ret = MTL(ObTenantTmpFileManager *)->aio_pread(io_info, read_offset, handle);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
ASSERT_EQ(0, handle.get_done_size());
|
||||||
|
|
||||||
|
int read_offset2 = read_offset + read_size;
|
||||||
|
ret = MTL(ObTenantTmpFileManager *)->aio_pread(io_info, read_offset2, handle);
|
||||||
|
ASSERT_NE(OB_SUCCESS, ret);
|
||||||
|
|
||||||
|
ret = handle.wait();
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
ASSERT_EQ(io_info.size_, handle.get_done_size());
|
||||||
|
cmp = memcmp(handle.get_buffer(), write_buf + read_offset, io_info.size_);
|
||||||
|
ASSERT_EQ(0, cmp);
|
||||||
|
handle.reset();
|
||||||
|
delete[] read_buf;
|
||||||
|
|
||||||
|
file_handle.reset();
|
||||||
|
ret = MTL(ObTenantTmpFileManager *)->remove(fd);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
|
||||||
|
LOG_INFO("test_cached_read");
|
||||||
|
}
|
||||||
} // namespace oceanbase
|
} // namespace oceanbase
|
||||||
|
|
||||||
int main(int argc, char **argv)
|
int main(int argc, char **argv)
|
||||||
|
|||||||
@ -94,8 +94,12 @@ void ObTmpFileIOCtx::reuse()
|
|||||||
for (int32_t i = 0; i < page_cache_handles_.count(); i++) {
|
for (int32_t i = 0; i < page_cache_handles_.count(); i++) {
|
||||||
page_cache_handles_.at(i).page_handle_.reset();
|
page_cache_handles_.at(i).page_handle_.reset();
|
||||||
}
|
}
|
||||||
|
for (int32_t i = 0; i < block_cache_handles_.count(); i++) {
|
||||||
|
block_cache_handles_.at(i).block_handle_.reset();
|
||||||
|
}
|
||||||
io_handles_.reset();
|
io_handles_.reset();
|
||||||
page_cache_handles_.reset();
|
page_cache_handles_.reset();
|
||||||
|
block_cache_handles_.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObTmpFileIOCtx::reset()
|
void ObTmpFileIOCtx::reset()
|
||||||
@ -264,7 +268,7 @@ int ObTmpFileIOCtx::do_read_wait_()
|
|||||||
char * read_buf = page_cache_handle.dest_user_read_buf_;
|
char * read_buf = page_cache_handle.dest_user_read_buf_;
|
||||||
if (OB_UNLIKELY(!check_buf_range_valid(read_buf, read_size))) {
|
if (OB_UNLIKELY(!check_buf_range_valid(read_buf, read_size))) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("invalid range", KR(ret), K(read_buf), K(read_size), K(buf_size_));
|
LOG_WARN("invalid range", KR(ret), KP(read_buf), KP(buf_), K(read_size), K(buf_size_));
|
||||||
} else if (OB_UNLIKELY(offset_in_page + read_size > ObTmpFileGlobal::PAGE_SIZE)) {
|
} else if (OB_UNLIKELY(offset_in_page + read_size > ObTmpFileGlobal::PAGE_SIZE)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("read size is over than page range", KR(ret), KPC(this), K(offset_in_page), K(read_size));
|
LOG_WARN("read size is over than page range", KR(ret), KPC(this), K(offset_in_page), K(read_size));
|
||||||
@ -290,7 +294,7 @@ int ObTmpFileIOCtx::do_read_wait_()
|
|||||||
char * read_buf = block_cache_handle.dest_user_read_buf_;
|
char * read_buf = block_cache_handle.dest_user_read_buf_;
|
||||||
if (OB_UNLIKELY(!check_buf_range_valid(read_buf, read_size))) {
|
if (OB_UNLIKELY(!check_buf_range_valid(read_buf, read_size))) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("invalid range", KR(ret), K(read_buf), K(read_size), K(buf_size_));
|
LOG_WARN("invalid range", KR(ret), KP(read_buf), KP(buf_), K(read_size), K(buf_size_));
|
||||||
} else if (OB_UNLIKELY(offset_in_block + read_size > OB_DEFAULT_MACRO_BLOCK_SIZE)) {
|
} else if (OB_UNLIKELY(offset_in_block + read_size > OB_DEFAULT_MACRO_BLOCK_SIZE)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("read size is over than macro block range", KR(ret), KPC(this), K(offset_in_block), K(read_size));
|
LOG_WARN("read size is over than macro block range", KR(ret), KPC(this), K(offset_in_block), K(read_size));
|
||||||
@ -319,7 +323,7 @@ int ObTmpFileIOCtx::do_read_wait_()
|
|||||||
char * read_buf = io_handle.dest_user_read_buf_;
|
char * read_buf = io_handle.dest_user_read_buf_;
|
||||||
if (OB_UNLIKELY(!check_buf_range_valid(read_buf, size))) {
|
if (OB_UNLIKELY(!check_buf_range_valid(read_buf, size))) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("invalid range", KR(ret), K(read_buf), K(size), K(buf_size_));
|
LOG_WARN("invalid range", KR(ret), KP(read_buf), KP(buf_), K(size), K(buf_size_));
|
||||||
} else {
|
} else {
|
||||||
MEMCPY(read_buf, data_buf + offset, size);
|
MEMCPY(read_buf, data_buf + offset, size);
|
||||||
io_handle.handle_.reset();
|
io_handle.handle_.reset();
|
||||||
|
|||||||
@ -290,7 +290,6 @@ int ObTenantTmpFileManager::aio_read(const ObTmpFileIOInfo &io_info, ObTmpFileIO
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObTmpFileHandle tmp_file_handle;
|
ObTmpFileHandle tmp_file_handle;
|
||||||
io_handle.reset();
|
|
||||||
|
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
@ -298,6 +297,10 @@ int ObTenantTmpFileManager::aio_read(const ObTmpFileIOInfo &io_info, ObTmpFileIO
|
|||||||
} else if (!io_info.is_valid()) {
|
} else if (!io_info.is_valid()) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("fail to aio read, invalid argument", KR(ret), K(io_info));
|
LOG_WARN("fail to aio read, invalid argument", KR(ret), K(io_info));
|
||||||
|
} else if (OB_UNLIKELY(io_handle.is_valid() && !io_handle.is_finished())) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("tmp file io handle has remain data need to be waited", KR(ret), K(io_info), K(io_handle));
|
||||||
|
} else if (FALSE_IT(io_handle.reset())) {
|
||||||
} else if (OB_FAIL(get_tmp_file(io_info.fd_, tmp_file_handle))) {
|
} else if (OB_FAIL(get_tmp_file(io_info.fd_, tmp_file_handle))) {
|
||||||
LOG_WARN("fail to get tmp file io handle", KR(ret), K(io_info));
|
LOG_WARN("fail to get tmp file io handle", KR(ret), K(io_info));
|
||||||
} else if (OB_FAIL(io_handle.init_read(io_info, tmp_file_handle))) {
|
} else if (OB_FAIL(io_handle.init_read(io_info, tmp_file_handle))) {
|
||||||
@ -316,7 +319,6 @@ int ObTenantTmpFileManager::aio_pread(const ObTmpFileIOInfo &io_info,
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObTmpFileHandle tmp_file_handle;
|
ObTmpFileHandle tmp_file_handle;
|
||||||
io_handle.reset();
|
|
||||||
|
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
@ -324,6 +326,10 @@ int ObTenantTmpFileManager::aio_pread(const ObTmpFileIOInfo &io_info,
|
|||||||
} else if (!io_info.is_valid()) {
|
} else if (!io_info.is_valid()) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("fail to aio read, invalid argument", KR(ret), K(io_info));
|
LOG_WARN("fail to aio read, invalid argument", KR(ret), K(io_info));
|
||||||
|
} else if (OB_UNLIKELY(io_handle.is_valid() && !io_handle.is_finished())) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("tmp file io handle has remain data need to be waited", KR(ret), K(io_info), K(io_handle));
|
||||||
|
} else if (FALSE_IT(io_handle.reset())) {
|
||||||
} else if (OB_FAIL(get_tmp_file(io_info.fd_, tmp_file_handle))) {
|
} else if (OB_FAIL(get_tmp_file(io_info.fd_, tmp_file_handle))) {
|
||||||
LOG_WARN("fail to get tmp file io handle", KR(ret), K(io_info));
|
LOG_WARN("fail to get tmp file io handle", KR(ret), K(io_info));
|
||||||
} else if (OB_FAIL(io_handle.init_pread(io_info, offset, tmp_file_handle))) {
|
} else if (OB_FAIL(io_handle.init_pread(io_info, offset, tmp_file_handle))) {
|
||||||
@ -340,7 +346,6 @@ int ObTenantTmpFileManager::read(const ObTmpFileIOInfo &io_info, ObTmpFileIOHand
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObTmpFileHandle tmp_file_handle;
|
ObTmpFileHandle tmp_file_handle;
|
||||||
io_handle.reset();
|
|
||||||
|
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
@ -348,6 +353,10 @@ int ObTenantTmpFileManager::read(const ObTmpFileIOInfo &io_info, ObTmpFileIOHand
|
|||||||
} else if (!io_info.is_valid()) {
|
} else if (!io_info.is_valid()) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("fail to aio read, invalid argument", KR(ret), K(io_info));
|
LOG_WARN("fail to aio read, invalid argument", KR(ret), K(io_info));
|
||||||
|
} else if (OB_UNLIKELY(io_handle.is_valid() && !io_handle.is_finished())) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("tmp file io handle has remain data need to be waited", KR(ret), K(io_info), K(io_handle));
|
||||||
|
} else if (FALSE_IT(io_handle.reset())) {
|
||||||
} else if (OB_FAIL(get_tmp_file(io_info.fd_, tmp_file_handle))) {
|
} else if (OB_FAIL(get_tmp_file(io_info.fd_, tmp_file_handle))) {
|
||||||
LOG_WARN("fail to get tmp file io handle", KR(ret), K(io_info));
|
LOG_WARN("fail to get tmp file io handle", KR(ret), K(io_info));
|
||||||
} else if (OB_FAIL(io_handle.init_read(io_info, tmp_file_handle))) {
|
} else if (OB_FAIL(io_handle.init_read(io_info, tmp_file_handle))) {
|
||||||
@ -372,7 +381,6 @@ int ObTenantTmpFileManager::pread(const ObTmpFileIOInfo &io_info, const int64_t
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObTmpFileHandle tmp_file_handle;
|
ObTmpFileHandle tmp_file_handle;
|
||||||
io_handle.reset();
|
|
||||||
|
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
@ -380,6 +388,10 @@ int ObTenantTmpFileManager::pread(const ObTmpFileIOInfo &io_info, const int64_t
|
|||||||
} else if (!io_info.is_valid()) {
|
} else if (!io_info.is_valid()) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
LOG_WARN("fail to aio read, invalid argument", KR(ret), K(io_info));
|
LOG_WARN("fail to aio read, invalid argument", KR(ret), K(io_info));
|
||||||
|
} else if (OB_UNLIKELY(io_handle.is_valid() && !io_handle.is_finished())) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("tmp file io handle has remain data need to be waited", KR(ret), K(io_info), K(io_handle));
|
||||||
|
} else if (FALSE_IT(io_handle.reset())) {
|
||||||
} else if (OB_FAIL(get_tmp_file(io_info.fd_, tmp_file_handle))) {
|
} else if (OB_FAIL(get_tmp_file(io_info.fd_, tmp_file_handle))) {
|
||||||
LOG_WARN("fail to get tmp file io handle", KR(ret), K(io_info));
|
LOG_WARN("fail to get tmp file io handle", KR(ret), K(io_info));
|
||||||
} else if (OB_FAIL(io_handle.init_pread(io_info, offset, tmp_file_handle))) {
|
} else if (OB_FAIL(io_handle.init_pread(io_info, offset, tmp_file_handle))) {
|
||||||
|
|||||||
Reference in New Issue
Block a user