Bugfix: incorrect data was read from the temporary file in direct reading scenario.
This commit is contained in:
@ -147,11 +147,27 @@ int ObTmpPageCache::inner_read_io(const ObTmpBlockIOInfo &io_info,
|
||||
}
|
||||
|
||||
int ObTmpPageCache::direct_read(const ObTmpBlockIOInfo &info,
|
||||
ObMacroBlockHandle &mb_handle)
|
||||
ObMacroBlockHandle &mb_handle,
|
||||
common::ObIAllocator &allocator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(inner_read_io(info, nullptr, mb_handle))) {
|
||||
STORAGE_LOG(WARN, "fail to inner read io", K(ret), K(mb_handle));
|
||||
void *buf = nullptr;
|
||||
ObTmpDirectReadPageIOCallback *callback = nullptr;
|
||||
if (OB_ISNULL(buf = allocator.alloc(sizeof(ObTmpDirectReadPageIOCallback)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
STORAGE_LOG(WARN, "allocate callback memory failed", K(ret));
|
||||
} else {
|
||||
// fill the callback
|
||||
callback = new (buf) ObTmpDirectReadPageIOCallback;
|
||||
callback->cache_ = this;
|
||||
callback->offset_ = info.offset_;
|
||||
callback->allocator_ = &allocator;
|
||||
if (OB_FAIL(inner_read_io(info, callback, mb_handle))) {
|
||||
STORAGE_LOG(WARN, "fail to inner read io", K(ret), K(mb_handle));
|
||||
}
|
||||
// There is no need to handle error cases (freeing the memory of the
|
||||
// callback) because inner_read_io will handle error cases and free the
|
||||
// memory of the callback.
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -182,6 +198,9 @@ int ObTmpPageCache::prefetch(
|
||||
if (OB_FAIL(inner_read_io(info, callback, mb_handle))) {
|
||||
STORAGE_LOG(WARN, "fail to inner read io", K(ret), K(mb_handle));
|
||||
}
|
||||
// There is no need to handle error cases (freeing the memory of the
|
||||
// callback) because inner_read_io will handle error cases and free the
|
||||
// memory of the callback.
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -213,6 +232,9 @@ int ObTmpPageCache::prefetch(
|
||||
} else if (OB_FAIL(inner_read_io(info, callback, mb_handle))) {
|
||||
STORAGE_LOG(WARN, "fail to inner read io", K(ret), K(mb_handle));
|
||||
}
|
||||
// There is no need to handle error cases (freeing the memory of the
|
||||
// callback) because inner_read_io will handle error cases and free the
|
||||
// memory of the callback.
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -381,6 +403,37 @@ const char *ObTmpPageCache::ObTmpMultiPageIOCallback::get_data()
|
||||
return data_buf_;
|
||||
}
|
||||
|
||||
int64_t ObTmpPageCache::ObTmpDirectReadPageIOCallback::size() const
|
||||
{
|
||||
return sizeof(*this);
|
||||
}
|
||||
|
||||
const char * ObTmpPageCache::ObTmpDirectReadPageIOCallback::get_data()
|
||||
{
|
||||
return data_buf_;
|
||||
}
|
||||
|
||||
int ObTmpPageCache::ObTmpDirectReadPageIOCallback::inner_process(const char *data_buffer, const int64_t size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTimeGuard time_guard("ObTmpDirectReadPageIOCallback", 100000); //100ms
|
||||
if (OB_ISNULL(cache_) || OB_ISNULL(allocator_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "Invalid tmp page cache callback allocator", KP_(cache), KP_(allocator), K(ret));
|
||||
} else if (OB_UNLIKELY(size <= 0 || data_buffer == nullptr)) {
|
||||
ret = OB_INVALID_DATA;
|
||||
STORAGE_LOG(WARN, "invalid data buffer size", K(ret), K(size), KP(data_buffer));
|
||||
} else if (OB_FAIL(alloc_data_buf(data_buffer, size))) {
|
||||
STORAGE_LOG(WARN, "Fail to allocate memory, ", K(ret), K(size));
|
||||
} else if (FALSE_IT(time_guard.click("alloc_data_buf"))) {
|
||||
}
|
||||
if (OB_FAIL(ret) && NULL != allocator_ && NULL != data_buf_) {
|
||||
allocator_->free(data_buf_);
|
||||
data_buf_ = NULL;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTmpPageCache::read_io(const ObTmpBlockIOInfo &io_info, ObITmpPageIOCallback *callback,
|
||||
ObMacroBlockHandle &handle)
|
||||
{
|
||||
@ -391,11 +444,7 @@ int ObTmpPageCache::read_io(const ObTmpBlockIOInfo &io_info, ObITmpPageIOCallbac
|
||||
read_info.io_desc_ = io_info.io_desc_;
|
||||
read_info.macro_block_id_ = io_info.macro_block_id_;
|
||||
read_info.io_timeout_ms_ = io_info.io_timeout_ms_;
|
||||
if (callback == nullptr) {
|
||||
read_info.buf_ = io_info.buf_;
|
||||
} else {
|
||||
read_info.io_callback_ = callback;
|
||||
}
|
||||
read_info.io_callback_ = callback;
|
||||
read_info.offset_ = io_info.offset_;
|
||||
read_info.size_ = io_info.size_;
|
||||
read_info.io_desc_.set_group_id(ObIOModule::TMP_PAGE_CACHE_IO);
|
||||
|
||||
@ -104,7 +104,7 @@ public:
|
||||
typedef common::ObKVCache<ObTmpPageCacheKey, ObTmpPageCacheValue> BasePageCache;
|
||||
static ObTmpPageCache &get_instance();
|
||||
int init(const char *cache_name, const int64_t priority);
|
||||
int direct_read(const ObTmpBlockIOInfo &info, ObMacroBlockHandle &mb_handle);
|
||||
int direct_read(const ObTmpBlockIOInfo &info, ObMacroBlockHandle &mb_handle, common::ObIAllocator &allocator);
|
||||
int prefetch(
|
||||
const ObTmpPageCacheKey &key,
|
||||
const ObTmpBlockIOInfo &info,
|
||||
@ -167,6 +167,17 @@ public:
|
||||
friend class ObTmpPageCache;
|
||||
common::ObArray<ObTmpPageIOInfo> page_io_infos_;
|
||||
};
|
||||
class ObTmpDirectReadPageIOCallback final : public ObITmpPageIOCallback
|
||||
{
|
||||
public:
|
||||
ObTmpDirectReadPageIOCallback() {}
|
||||
~ObTmpDirectReadPageIOCallback() override {}
|
||||
int64_t size() const override;
|
||||
int inner_process(const char *data_buffer, const int64_t size) override;
|
||||
const char *get_data() override;
|
||||
TO_STRING_KV("callback_type:", "ObTmpDirectReadPageIOCallback", KP_(data_buf));
|
||||
DISALLOW_COPY_AND_ASSIGN(ObTmpDirectReadPageIOCallback);
|
||||
};
|
||||
private:
|
||||
ObTmpPageCache();
|
||||
~ObTmpPageCache();
|
||||
|
||||
@ -1347,18 +1347,25 @@ int ObTmpTenantFileStore::read_page(ObTmpMacroBlock *block, ObTmpBlockIOInfo &io
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (!handle.is_disable_page_cache() && page_io_infos->count() > DEFAULT_PAGE_IO_MERGE_RATIO * page_nums) {
|
||||
if (page_io_infos->count() > DEFAULT_PAGE_IO_MERGE_RATIO * page_nums) {
|
||||
// merge multi page io into one.
|
||||
ObMacroBlockHandle mb_handle;
|
||||
ObTmpBlockIOInfo info(io_info);
|
||||
int64_t p_offset = common::lower_align(io_info.offset_, ObTmpMacroBlock::get_default_page_size());
|
||||
const int64_t p_offset = common::lower_align(io_info.offset_, ObTmpMacroBlock::get_default_page_size());
|
||||
// just skip header and padding.
|
||||
info.offset_ = p_offset + ObTmpMacroBlock::get_header_padding();
|
||||
info.size_ = page_nums * ObTmpMacroBlock::get_default_page_size();
|
||||
info.macro_block_id_ = block->get_macro_block_id();
|
||||
if (OB_FAIL(page_cache_->prefetch(info, *page_io_infos, mb_handle, io_allocator_))) {
|
||||
STORAGE_LOG(WARN, "fail to prefetch multi tmp page", K(ret));
|
||||
if (handle.is_disable_page_cache()) {
|
||||
if (OB_FAIL(page_cache_->direct_read(info, mb_handle, io_allocator_))) {
|
||||
STORAGE_LOG(WARN, "fail to direct read multi page", K(ret));
|
||||
}
|
||||
} else {
|
||||
if (OB_FAIL(page_cache_->prefetch(info, *page_io_infos, mb_handle, io_allocator_))) {
|
||||
STORAGE_LOG(WARN, "fail to prefetch multi tmp page", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
ObTmpFileIOHandle::ObIOReadHandle read_handle(mb_handle, io_info.buf_,
|
||||
io_info.offset_ - p_offset, io_info.size_);
|
||||
if (OB_FAIL(handle.get_io_handles().push_back(read_handle))) {
|
||||
@ -1376,7 +1383,7 @@ int ObTmpTenantFileStore::read_page(ObTmpMacroBlock *block, ObTmpBlockIOInfo &io
|
||||
info.size_ = ObTmpMacroBlock::get_default_page_size();
|
||||
info.macro_block_id_ = block->get_macro_block_id();
|
||||
if (handle.is_disable_page_cache()) {
|
||||
if (OB_FAIL(page_cache_->direct_read(info, mb_handle))) {
|
||||
if (OB_FAIL(page_cache_->direct_read(info, mb_handle, io_allocator_))) {
|
||||
STORAGE_LOG(WARN, "fail to direct read tmp page", K(ret));
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -111,7 +111,7 @@ struct ObTmpBlockIOInfo final
|
||||
public:
|
||||
ObTmpBlockIOInfo()
|
||||
: block_id_(0), offset_(0), size_(0), io_timeout_ms_(DEFAULT_IO_WAIT_TIME_MS), tenant_id_(0),
|
||||
buf_(NULL), io_desc_(), macro_block_id_() {}
|
||||
buf_(NULL), io_desc_(), macro_block_id_() {}
|
||||
ObTmpBlockIOInfo(const int64_t block_id, const int64_t offset, const int64_t size,
|
||||
const uint64_t tenant_id, const MacroBlockId macro_block_id, char *buf,
|
||||
const common::ObIOFlag io_desc)
|
||||
|
||||
@ -680,6 +680,7 @@ TEST_F(TestTmpFile, test_big_file)
|
||||
write_time = ObTimeUtility::current_time() - write_time;
|
||||
io_info.buf_ = read_buf;
|
||||
|
||||
// Flush all held block caches to ensure that subsequent read processes will go through I/O.
|
||||
ObKVGlobalCache::get_instance().erase_cache(1, "tmp_block_cache");
|
||||
|
||||
io_info.size_ = write_size;
|
||||
@ -730,7 +731,6 @@ TEST_F(TestTmpFile, test_big_file)
|
||||
ObTmpFileManager::get_instance().remove(fd);
|
||||
}
|
||||
|
||||
/*
|
||||
TEST_F(TestTmpFile, test_big_file_disable_page_cache)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -762,6 +762,7 @@ TEST_F(TestTmpFile, test_big_file_disable_page_cache)
|
||||
write_time = ObTimeUtility::current_time() - write_time;
|
||||
io_info.buf_ = read_buf;
|
||||
|
||||
// Flush all held block caches to ensure that subsequent read processes will go through I/O.
|
||||
ObKVGlobalCache::get_instance().erase_cache(1, "tmp_block_cache");
|
||||
|
||||
io_info.size_ = write_size;
|
||||
@ -811,7 +812,6 @@ TEST_F(TestTmpFile, test_big_file_disable_page_cache)
|
||||
|
||||
ObTmpFileManager::get_instance().remove(fd);
|
||||
}
|
||||
*/
|
||||
|
||||
TEST_F(TestTmpFile, test_multi_small_file_single_thread_read_write)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user