[Tmp.File] fix prefetch handle issue
This commit is contained in:
@ -374,7 +374,7 @@ private:
|
|||||||
: tenant_id_(tenant_id), fd_list_(fd_list)
|
: tenant_id_(tenant_id), fd_list_(fd_list)
|
||||||
{}
|
{}
|
||||||
~RmTenantTmpFileOp() = default;
|
~RmTenantTmpFileOp() = default;
|
||||||
bool operator()(common::hash::HashMapPair<int64_t, ObTmpFile*>& entry)
|
int operator()(common::hash::HashMapPair<int64_t, ObTmpFile *> &entry)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObTmpFile* tmp_file = entry.second;
|
ObTmpFile* tmp_file = entry.second;
|
||||||
@ -386,7 +386,7 @@ private:
|
|||||||
STORAGE_LOG(WARN, "fd_list_ push back failed", K(ret));
|
STORAGE_LOG(WARN, "fd_list_ push back failed", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return OB_SUCCESS == ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|||||||
@ -103,13 +103,12 @@ int ObTmpPageCacheValue::deep_copy(char* buf, const int64_t buf_len, ObIKVCacheV
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTmpPageCache::prefetch(const ObTmpPageCacheKey& key, const ObTmpBlockIOInfo& info, ObTmpFileIOHandle& handle,
|
int ObTmpPageCache::prefetch(const ObTmpPageCacheKey &key, const ObTmpBlockIOInfo &info, ObMacroBlockHandle &mb_handle)
|
||||||
ObMacroBlockHandle& mb_handle)
|
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_UNLIKELY(!key.is_valid() || !handle.is_valid())) {
|
if (OB_UNLIKELY(!key.is_valid())) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
STORAGE_LOG(WARN, "Invalid arguments", K(ret), K(key), K(handle));
|
STORAGE_LOG(WARN, "Invalid arguments", K(ret), K(key));
|
||||||
} else {
|
} else {
|
||||||
// fill the callback
|
// fill the callback
|
||||||
ObTmpPageIOCallback callback;
|
ObTmpPageIOCallback callback;
|
||||||
@ -122,7 +121,7 @@ int ObTmpPageCache::prefetch(const ObTmpPageCacheKey& key, const ObTmpBlockIOInf
|
|||||||
if (mb_handle.get_io_handle().is_empty()) {
|
if (mb_handle.get_io_handle().is_empty()) {
|
||||||
// TODO: After the continuous IO has been optimized, this should
|
// TODO: After the continuous IO has been optimized, this should
|
||||||
// not happen.
|
// not happen.
|
||||||
if (OB_FAIL(handle.wait(DEFAULT_IO_WAIT_TIME_MS))) {
|
if (OB_FAIL(mb_handle.wait(DEFAULT_IO_WAIT_TIME_MS))) {
|
||||||
STORAGE_LOG(WARN, "fail to wait tmp page io", K(ret));
|
STORAGE_LOG(WARN, "fail to wait tmp page io", K(ret));
|
||||||
} else if (OB_FAIL(read_io(info, callback, mb_handle))) {
|
} else if (OB_FAIL(read_io(info, callback, mb_handle))) {
|
||||||
STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret));
|
STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret));
|
||||||
@ -135,13 +134,13 @@ int ObTmpPageCache::prefetch(const ObTmpPageCacheKey& key, const ObTmpBlockIOInf
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTmpPageCache::prefetch(const ObTmpBlockIOInfo& info, const common::ObIArray<ObTmpPageIOInfo>& page_io_infos,
|
int ObTmpPageCache::prefetch(
|
||||||
ObTmpFileIOHandle& handle, ObMacroBlockHandle& mb_handle)
|
const ObTmpBlockIOInfo &info, const common::ObIArray<ObTmpPageIOInfo> &page_io_infos, ObMacroBlockHandle &mb_handle)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_UNLIKELY(page_io_infos.count() <= 0 || !handle.is_valid())) {
|
if (OB_UNLIKELY(page_io_infos.count() <= 0)) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
STORAGE_LOG(WARN, "Invalid arguments", K(ret), K(page_io_infos.count()), K(info), K(handle));
|
STORAGE_LOG(WARN, "Invalid arguments", K(ret), K(page_io_infos.count()), K(info));
|
||||||
} else {
|
} else {
|
||||||
ObTmpMultiPageIOCallback callback;
|
ObTmpMultiPageIOCallback callback;
|
||||||
callback.cache_ = this;
|
callback.cache_ = this;
|
||||||
@ -159,7 +158,7 @@ int ObTmpPageCache::prefetch(const ObTmpBlockIOInfo& info, const common::ObIArra
|
|||||||
if (mb_handle.get_io_handle().is_empty()) {
|
if (mb_handle.get_io_handle().is_empty()) {
|
||||||
// TODO: After the continuous IO has been optimized, this should
|
// TODO: After the continuous IO has been optimized, this should
|
||||||
// not happen.
|
// not happen.
|
||||||
if (OB_FAIL(handle.wait(DEFAULT_IO_WAIT_TIME_MS))) {
|
if (OB_FAIL(mb_handle.wait(DEFAULT_IO_WAIT_TIME_MS))) {
|
||||||
STORAGE_LOG(WARN, "fail to wait tmp page io", K(ret));
|
STORAGE_LOG(WARN, "fail to wait tmp page io", K(ret));
|
||||||
} else if (OB_FAIL(read_io(info, callback, mb_handle))) {
|
} else if (OB_FAIL(read_io(info, callback, mb_handle))) {
|
||||||
STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret));
|
STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret));
|
||||||
|
|||||||
@ -108,16 +108,15 @@ public:
|
|||||||
class ObTmpPageCache : public common::ObKVCache<ObTmpPageCacheKey, ObTmpPageCacheValue> {
|
class ObTmpPageCache : public common::ObKVCache<ObTmpPageCacheKey, ObTmpPageCacheValue> {
|
||||||
public:
|
public:
|
||||||
typedef common::ObKVCache<ObTmpPageCacheKey, ObTmpPageCacheValue> BasePageCache;
|
typedef common::ObKVCache<ObTmpPageCacheKey, ObTmpPageCacheValue> BasePageCache;
|
||||||
static ObTmpPageCache& get_instance();
|
static ObTmpPageCache &get_instance();
|
||||||
int init(const char* cache_name, const int64_t priority, const ObStorageFileHandle& file_handle);
|
int init(const char *cache_name, const int64_t priority, const ObStorageFileHandle &file_handle);
|
||||||
int prefetch(const ObTmpPageCacheKey& key, const ObTmpBlockIOInfo& info, ObTmpFileIOHandle& handle,
|
int prefetch(const ObTmpPageCacheKey &key, const ObTmpBlockIOInfo &info, ObMacroBlockHandle &mb_handle);
|
||||||
ObMacroBlockHandle& mb_handle);
|
|
||||||
// multi page prefetch
|
// multi page prefetch
|
||||||
int prefetch(const ObTmpBlockIOInfo& info, const common::ObIArray<ObTmpPageIOInfo>& page_io_infos,
|
int prefetch(const ObTmpBlockIOInfo &info, const common::ObIArray<ObTmpPageIOInfo> &page_io_infos,
|
||||||
ObTmpFileIOHandle& handle, ObMacroBlockHandle& mb_handle);
|
ObMacroBlockHandle &mb_handle);
|
||||||
int get_cache_page(const ObTmpPageCacheKey& key, ObTmpPageValueHandle& handle);
|
int get_cache_page(const ObTmpPageCacheKey &key, ObTmpPageValueHandle &handle);
|
||||||
int get_page(const ObTmpPageCacheKey& key, ObTmpPageValueHandle& handle);
|
int get_page(const ObTmpPageCacheKey &key, ObTmpPageValueHandle &handle);
|
||||||
int put_page(const ObTmpPageCacheKey& key, const ObTmpPageCacheValue& value);
|
int put_page(const ObTmpPageCacheKey &key, const ObTmpPageCacheValue &value);
|
||||||
void destroy();
|
void destroy();
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
|||||||
@ -1059,7 +1059,7 @@ int ObTmpTenantFileStore::read_page(ObTmpMacroBlock* block, ObTmpBlockIOInfo& io
|
|||||||
info.offset_ = common::lower_align(io_info.offset_, ObTmpMacroBlock::get_default_page_size());
|
info.offset_ = common::lower_align(io_info.offset_, ObTmpMacroBlock::get_default_page_size());
|
||||||
info.size_ = page_nums * ObTmpMacroBlock::get_default_page_size();
|
info.size_ = page_nums * ObTmpMacroBlock::get_default_page_size();
|
||||||
info.macro_block_id_ = block->get_macro_block_id();
|
info.macro_block_id_ = block->get_macro_block_id();
|
||||||
if (OB_FAIL(page_cache_->prefetch(info, page_io_infos, handle, mb_handle))) {
|
if (OB_FAIL(page_cache_->prefetch(info, page_io_infos, mb_handle))) {
|
||||||
STORAGE_LOG(WARN, "fail to prefetch multi tmp page", K(ret));
|
STORAGE_LOG(WARN, "fail to prefetch multi tmp page", K(ret));
|
||||||
} else {
|
} else {
|
||||||
ObTmpFileIOHandle::ObIOReadHandle read_handle(
|
ObTmpFileIOHandle::ObIOReadHandle read_handle(
|
||||||
@ -1076,7 +1076,7 @@ int ObTmpTenantFileStore::read_page(ObTmpMacroBlock* block, ObTmpBlockIOInfo& io
|
|||||||
info.offset_ = page_io_infos.at(i).key_.get_page_id() * ObTmpMacroBlock::get_default_page_size();
|
info.offset_ = page_io_infos.at(i).key_.get_page_id() * ObTmpMacroBlock::get_default_page_size();
|
||||||
info.size_ = ObTmpMacroBlock::get_default_page_size();
|
info.size_ = ObTmpMacroBlock::get_default_page_size();
|
||||||
info.macro_block_id_ = block->get_macro_block_id();
|
info.macro_block_id_ = block->get_macro_block_id();
|
||||||
if (OB_FAIL(page_cache_->prefetch(page_io_infos.at(i).key_, info, handle, mb_handle))) {
|
if (OB_FAIL(page_cache_->prefetch(page_io_infos.at(i).key_, info, mb_handle))) {
|
||||||
STORAGE_LOG(WARN, "fail to prefetch tmp page", K(ret));
|
STORAGE_LOG(WARN, "fail to prefetch tmp page", K(ret));
|
||||||
} else {
|
} else {
|
||||||
char* buf =
|
char* buf =
|
||||||
|
|||||||
Reference in New Issue
Block a user