Enable skip kvcache in temporary file read procedure.
This commit is contained in:
parent
aabbf9b9fb
commit
785702ef23
@ -24,7 +24,9 @@ namespace blocksstable
|
||||
{
|
||||
|
||||
ObTmpFileIOInfo::ObTmpFileIOInfo()
|
||||
: fd_(0), dir_id_(0), size_(0), io_timeout_ms_(DEFAULT_IO_WAIT_TIME_MS), tenant_id_(OB_INVALID_TENANT_ID), buf_(NULL), io_desc_()
|
||||
: fd_(0), dir_id_(0), size_(0), io_timeout_ms_(DEFAULT_IO_WAIT_TIME_MS),
|
||||
tenant_id_(OB_INVALID_TENANT_ID), buf_(NULL), io_desc_(),
|
||||
disable_page_cache_(false)
|
||||
{
|
||||
}
|
||||
|
||||
@ -61,6 +63,7 @@ ObTmpFileIOHandle::ObTmpFileIOHandle()
|
||||
is_read_(false),
|
||||
has_wait_(false),
|
||||
is_finished_(false),
|
||||
disable_page_cache_(false),
|
||||
ret_code_(OB_SUCCESS),
|
||||
expect_read_size_(0),
|
||||
last_read_offset_(-1),
|
||||
@ -86,7 +89,8 @@ int ObTmpFileIOHandle::prepare_read(
|
||||
char *read_buf,
|
||||
int64_t fd,
|
||||
int64_t dir_id,
|
||||
uint64_t tenant_id)
|
||||
uint64_t tenant_id,
|
||||
bool disable_page_cache)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(read_buf)) {
|
||||
@ -103,6 +107,7 @@ int ObTmpFileIOHandle::prepare_read(
|
||||
expect_read_size_ = read_size;
|
||||
last_read_offset_ = read_offset;
|
||||
io_flag_ = io_flag;
|
||||
disable_page_cache_ = disable_page_cache;
|
||||
if (last_fd_ != fd_) {
|
||||
last_fd_ = fd_;
|
||||
last_extent_id_ = 0;
|
||||
@ -812,7 +817,8 @@ int ObTmpFile::aio_read_without_lock(const ObTmpFileIOInfo &io_info,
|
||||
io_info.buf_,
|
||||
file_meta_.get_fd(),
|
||||
file_meta_.get_dir_id(),
|
||||
io_info.tenant_id_))){
|
||||
io_info.tenant_id_,
|
||||
io_info.disable_page_cache_))) {
|
||||
STORAGE_LOG(WARN, "fail to prepare read io handle", K(ret), K(io_info), K(offset));
|
||||
} else if (OB_UNLIKELY(io_info.size_ > 0 && offset >= tmp->get_global_end())) {
|
||||
ret = OB_ITER_END;
|
||||
|
@ -43,6 +43,7 @@ public:
|
||||
uint64_t tenant_id_;
|
||||
char *buf_;
|
||||
common::ObIOFlag io_desc_;
|
||||
bool disable_page_cache_;
|
||||
};
|
||||
|
||||
class ObTmpFileIOHandle final
|
||||
@ -97,6 +98,7 @@ public:
|
||||
~ObTmpFileIOHandle();
|
||||
OB_INLINE char *get_buffer() { return buf_; }
|
||||
OB_INLINE int64_t get_data_size() { return size_; }
|
||||
OB_INLINE bool is_disable_page_cache() const { return disable_page_cache_; }
|
||||
int prepare_read(
|
||||
const int64_t read_size,
|
||||
const int64_t read_offset,
|
||||
@ -104,7 +106,8 @@ public:
|
||||
char *read_buf,
|
||||
int64_t fd,
|
||||
int64_t dir_id,
|
||||
uint64_t tenant_id);
|
||||
uint64_t tenant_id,
|
||||
const bool disable_page_cache);
|
||||
int prepare_write(
|
||||
char *write_buf,
|
||||
const int64_t write_size,
|
||||
@ -160,6 +163,7 @@ private:
|
||||
bool is_read_;
|
||||
bool has_wait_;
|
||||
bool is_finished_;
|
||||
bool disable_page_cache_;
|
||||
int ret_code_;
|
||||
int64_t expect_read_size_;
|
||||
int64_t last_read_offset_; // only for more than 8MB read.
|
||||
|
@ -119,6 +119,43 @@ int ObTmpPageCacheValue::deep_copy(char *buf, const int64_t buf_len, ObIKVCacheV
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTmpPageCache::inner_read_io(const ObTmpBlockIOInfo &io_info,
|
||||
ObITmpPageIOCallback *callback,
|
||||
ObMacroBlockHandle ¯o_block_handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(read_io(io_info, nullptr, macro_block_handle))) {
|
||||
if (macro_block_handle.get_io_handle().is_empty()) {
|
||||
// TODO: After the continuous IO has been optimized, this should
|
||||
// not happen.
|
||||
if (OB_FAIL(macro_block_handle.wait())) {
|
||||
STORAGE_LOG(WARN, "fail to wait tmp page io", K(ret));
|
||||
} else if (OB_FAIL(read_io(io_info, nullptr, macro_block_handle))) {
|
||||
STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret));
|
||||
}
|
||||
} else {
|
||||
STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret));
|
||||
}
|
||||
}
|
||||
// Avoid double_free with io_handle
|
||||
if (OB_FAIL(ret) && OB_NOT_NULL(callback) && OB_NOT_NULL(callback->get_allocator())) {
|
||||
common::ObIAllocator *allocator = callback->get_allocator();
|
||||
callback->~ObITmpPageIOCallback();
|
||||
allocator->free(callback);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTmpPageCache::direct_read(const ObTmpBlockIOInfo &info,
|
||||
ObMacroBlockHandle &mb_handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(inner_read_io(info, nullptr, mb_handle))) {
|
||||
STORAGE_LOG(WARN, "fail to inner read io", K(ret), K(mb_handle));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTmpPageCache::prefetch(
|
||||
const ObTmpPageCacheKey &key,
|
||||
const ObTmpBlockIOInfo &info,
|
||||
@ -142,22 +179,8 @@ int ObTmpPageCache::prefetch(
|
||||
callback->offset_ = info.offset_;
|
||||
callback->allocator_ = &allocator;
|
||||
callback->key_ = key;
|
||||
if (OB_FAIL(read_io(info, *callback, mb_handle))) {
|
||||
if (mb_handle.get_io_handle().is_empty()) {
|
||||
// TODO: After the continuous IO has been optimized, this should
|
||||
// not happen.
|
||||
if (OB_FAIL(mb_handle.wait())) {
|
||||
STORAGE_LOG(WARN, "fail to wait tmp page io", K(ret));
|
||||
} else if (OB_FAIL(read_io(info, *callback, mb_handle))) {
|
||||
STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret));
|
||||
}
|
||||
} else {
|
||||
STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret) && OB_NOT_NULL(callback->get_allocator())) { //Avoid double_free with io_handle
|
||||
callback->~ObTmpPageIOCallback();
|
||||
allocator.free(callback);
|
||||
if (OB_FAIL(inner_read_io(info, callback, mb_handle))) {
|
||||
STORAGE_LOG(WARN, "fail to inner read io", K(ret), K(mb_handle));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -187,22 +210,8 @@ int ObTmpPageCache::prefetch(
|
||||
callback->allocator_ = &allocator;
|
||||
if (OB_FAIL(callback->page_io_infos_.assign(page_io_infos))) {
|
||||
STORAGE_LOG(WARN, "fail to assign page io infos", K(ret), K(page_io_infos.count()), K(info));
|
||||
} else if (OB_FAIL(read_io(info, *callback, mb_handle))) {
|
||||
if (mb_handle.get_io_handle().is_empty()) {
|
||||
// TODO: After the continuous IO has been optimized, this should
|
||||
// not happen.
|
||||
if (OB_FAIL(mb_handle.wait())) {
|
||||
STORAGE_LOG(WARN, "fail to wait tmp page io", K(ret));
|
||||
} else if (OB_FAIL(read_io(info, *callback, mb_handle))) {
|
||||
STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret));
|
||||
}
|
||||
} else {
|
||||
STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret) && OB_NOT_NULL(callback->get_allocator())) { //Avoid double_free with io_handle
|
||||
callback->~ObTmpMultiPageIOCallback();
|
||||
allocator.free(callback);
|
||||
} else if (OB_FAIL(inner_read_io(info, callback, mb_handle))) {
|
||||
STORAGE_LOG(WARN, "fail to inner read io", K(ret), K(mb_handle));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -372,7 +381,7 @@ const char *ObTmpPageCache::ObTmpMultiPageIOCallback::get_data()
|
||||
return data_buf_;
|
||||
}
|
||||
|
||||
int ObTmpPageCache::read_io(const ObTmpBlockIOInfo &io_info, ObITmpPageIOCallback &callback,
|
||||
int ObTmpPageCache::read_io(const ObTmpBlockIOInfo &io_info, ObITmpPageIOCallback *callback,
|
||||
ObMacroBlockHandle &handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -382,7 +391,7 @@ int ObTmpPageCache::read_io(const ObTmpBlockIOInfo &io_info, ObITmpPageIOCallbac
|
||||
read_info.io_desc_ = io_info.io_desc_;
|
||||
read_info.macro_block_id_ = io_info.macro_block_id_;
|
||||
read_info.io_timeout_ms_ = io_info.io_timeout_ms_;
|
||||
read_info.io_callback_ = &callback;
|
||||
read_info.io_callback_ = callback;
|
||||
read_info.offset_ = io_info.offset_;
|
||||
read_info.size_ = io_info.size_;
|
||||
read_info.io_desc_.set_group_id(ObIOModule::TMP_PAGE_CACHE_IO);
|
||||
|
@ -104,6 +104,7 @@ public:
|
||||
typedef common::ObKVCache<ObTmpPageCacheKey, ObTmpPageCacheValue> BasePageCache;
|
||||
static ObTmpPageCache &get_instance();
|
||||
int init(const char *cache_name, const int64_t priority);
|
||||
int direct_read(const ObTmpBlockIOInfo &info, ObMacroBlockHandle &mb_handle);
|
||||
int prefetch(
|
||||
const ObTmpPageCacheKey &key,
|
||||
const ObTmpBlockIOInfo &info,
|
||||
@ -142,7 +143,7 @@ public:
|
||||
{
|
||||
public:
|
||||
ObTmpPageIOCallback();
|
||||
~ObTmpPageIOCallback();
|
||||
~ObTmpPageIOCallback() override;
|
||||
int64_t size() const override;
|
||||
int inner_process(const char *data_buffer, const int64_t size) override;
|
||||
const char *get_data() override;
|
||||
@ -156,7 +157,7 @@ public:
|
||||
{
|
||||
public:
|
||||
ObTmpMultiPageIOCallback();
|
||||
~ObTmpMultiPageIOCallback();
|
||||
~ObTmpMultiPageIOCallback() override;
|
||||
int64_t size() const override;
|
||||
int inner_process(const char *data_buffer, const int64_t size) override;
|
||||
const char *get_data() override;
|
||||
@ -169,8 +170,12 @@ public:
|
||||
private:
|
||||
ObTmpPageCache();
|
||||
~ObTmpPageCache();
|
||||
int read_io(const ObTmpBlockIOInfo &io_info, ObITmpPageIOCallback &callback,
|
||||
ObMacroBlockHandle &handle);
|
||||
int inner_read_io(const ObTmpBlockIOInfo &io_info,
|
||||
ObITmpPageIOCallback *callback,
|
||||
ObMacroBlockHandle &handle);
|
||||
int read_io(const ObTmpBlockIOInfo &io_info,
|
||||
ObITmpPageIOCallback *callback,
|
||||
ObMacroBlockHandle &handle);
|
||||
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTmpPageCache);
|
||||
|
@ -1347,7 +1347,7 @@ int ObTmpTenantFileStore::read_page(ObTmpMacroBlock *block, ObTmpBlockIOInfo &io
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (page_io_infos->count() > DEFAULT_PAGE_IO_MERGE_RATIO * page_nums) {
|
||||
if (!handle.is_disable_page_cache() && page_io_infos->count() > DEFAULT_PAGE_IO_MERGE_RATIO * page_nums) {
|
||||
// merge multi page io into one.
|
||||
ObMacroBlockHandle mb_handle;
|
||||
ObTmpBlockIOInfo info(io_info);
|
||||
@ -1375,9 +1375,16 @@ int ObTmpTenantFileStore::read_page(ObTmpMacroBlock *block, ObTmpBlockIOInfo &io
|
||||
info.offset_ += ObTmpMacroBlock::get_header_padding();
|
||||
info.size_ = ObTmpMacroBlock::get_default_page_size();
|
||||
info.macro_block_id_ = block->get_macro_block_id();
|
||||
if (OB_FAIL(page_cache_->prefetch(page_io_infos->at(i).key_, info, mb_handle, io_allocator_))) {
|
||||
STORAGE_LOG(WARN, "fail to prefetch tmp page", K(ret));
|
||||
if (handle.is_disable_page_cache()) {
|
||||
if (OB_FAIL(page_cache_->direct_read(info, mb_handle))) {
|
||||
STORAGE_LOG(WARN, "fail to direct read tmp page", K(ret));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(page_cache_->prefetch(page_io_infos->at(i).key_, info, mb_handle, io_allocator_))) {
|
||||
STORAGE_LOG(WARN, "fail to prefetch tmp page", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
char *buf = io_info.buf_ + ObTmpMacroBlock::calculate_offset(
|
||||
page_io_infos->at(i).key_.get_page_id(), page_io_infos->at(i).offset_) - io_info.offset_;
|
||||
ObTmpFileIOHandle::ObIOReadHandle read_handle(mb_handle, buf, page_io_infos->at(i).offset_,
|
||||
|
@ -724,6 +724,85 @@ TEST_F(TestTmpFile, test_big_file)
|
||||
ObTmpFileManager::get_instance().remove(fd);
|
||||
}
|
||||
|
||||
TEST_F(TestTmpFile, test_big_file_disable_page_cache)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t dir = -1;
|
||||
int64_t fd = -1;
|
||||
const int64_t macro_block_size = OB_SERVER_BLOCK_MGR.get_macro_block_size();
|
||||
ObTmpFileIOInfo io_info;
|
||||
ObTmpFileIOHandle handle;
|
||||
ret = ObTmpFileManager::get_instance().alloc_dir(dir);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ret = ObTmpFileManager::get_instance().open(fd, dir);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
int64_t write_size = macro_block_size * 512;
|
||||
char *write_buf = (char *)malloc(write_size);
|
||||
for (int64_t i = 0; i < write_size; ++i) {
|
||||
write_buf[i] = static_cast<char>(i % 256);
|
||||
}
|
||||
char *read_buf = (char *)malloc(write_size);
|
||||
io_info.fd_ = fd;
|
||||
io_info.tenant_id_ = 1;
|
||||
io_info.io_desc_.set_wait_event(2);
|
||||
io_info.buf_ = write_buf;
|
||||
io_info.size_ = write_size;
|
||||
io_info.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS;
|
||||
io_info.disable_page_cache_ = true;
|
||||
int64_t write_time = ObTimeUtility::current_time();
|
||||
ret = ObTmpFileManager::get_instance().write(io_info);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
write_time = ObTimeUtility::current_time() - write_time;
|
||||
io_info.buf_ = read_buf;
|
||||
|
||||
io_info.size_ = write_size;
|
||||
ret = ObTmpFileManager::get_instance().aio_read(io_info, handle);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_TRUE(handle.size_ < handle.expect_read_size_);
|
||||
ASSERT_EQ(OB_SUCCESS, handle.wait());
|
||||
ASSERT_EQ(write_size, handle.get_data_size());
|
||||
int cmp = memcmp(handle.get_buffer(), write_buf, handle.get_data_size());
|
||||
ASSERT_EQ(0, cmp);
|
||||
|
||||
io_info.size_ = macro_block_size;
|
||||
ret = ObTmpFileManager::get_instance().pread(io_info, 100, handle);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(macro_block_size, handle.get_data_size());
|
||||
cmp = memcmp(handle.get_buffer(), write_buf + 100, handle.get_data_size());
|
||||
ASSERT_EQ(0, cmp);
|
||||
|
||||
io_info.size_ = write_size;
|
||||
int64_t read_time = ObTimeUtility::current_time();
|
||||
ret = ObTmpFileManager::get_instance().pread(io_info, 0, handle);
|
||||
read_time = ObTimeUtility::current_time() - read_time;
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(write_size, handle.get_data_size());
|
||||
cmp = memcmp(handle.get_buffer(), write_buf, write_size);
|
||||
ASSERT_EQ(0, cmp);
|
||||
|
||||
io_info.size_ = 200;
|
||||
ret = ObTmpFileManager::get_instance().pread(io_info, 200, handle);
|
||||
ASSERT_EQ(OB_SUCCESS, ret);
|
||||
ASSERT_EQ(200, handle.get_data_size());
|
||||
cmp = memcmp(handle.get_buffer(), write_buf + 200, 200);
|
||||
ASSERT_EQ(0, cmp);
|
||||
|
||||
free(write_buf);
|
||||
free(read_buf);
|
||||
|
||||
STORAGE_LOG(INFO, "test_big_file");
|
||||
STORAGE_LOG(INFO, "io time", K(write_time), K(read_time));
|
||||
ObTmpTenantFileStoreHandle store_handle;
|
||||
OB_TMP_FILE_STORE.get_store(1, store_handle);
|
||||
store_handle.get_tenant_store()->print_block_usage();
|
||||
ObMallocAllocator::get_instance()->print_tenant_memory_usage(1);
|
||||
ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(1);
|
||||
ObMallocAllocator::get_instance()->print_tenant_memory_usage(500);
|
||||
ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(500);
|
||||
|
||||
ObTmpFileManager::get_instance().remove(fd);
|
||||
}
|
||||
|
||||
TEST_F(TestTmpFile, test_multi_small_file_single_thread_read_write)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
Loading…
x
Reference in New Issue
Block a user