[CP] Replace ObConcurrentFIFOAllocator with ObFIFOAllocator.
This commit is contained in:
@ -936,7 +936,9 @@ int ObTmpTenantFileStore::init(const uint64_t tenant_id)
|
|||||||
STORAGE_LOG(WARN, "ObTmpTenantFileStore has not been inited", K(ret));
|
STORAGE_LOG(WARN, "ObTmpTenantFileStore has not been inited", K(ret));
|
||||||
} else if (OB_FAIL(allocator_.init(BLOCK_SIZE, ObModIds::OB_TMP_BLOCK_MANAGER, tenant_id, get_memory_limit(tenant_id)))) {
|
} else if (OB_FAIL(allocator_.init(BLOCK_SIZE, ObModIds::OB_TMP_BLOCK_MANAGER, tenant_id, get_memory_limit(tenant_id)))) {
|
||||||
STORAGE_LOG(WARN, "fail to init allocator", K(ret));
|
STORAGE_LOG(WARN, "fail to init allocator", K(ret));
|
||||||
} else if (OB_FAIL(io_allocator_.init(OB_MALLOC_BIG_BLOCK_SIZE, ObModIds::OB_TMP_PAGE_CACHE, tenant_id, IO_LIMIT))) {
|
} else if (OB_FAIL(io_allocator_.init(lib::ObMallocAllocator::get_instance(),
|
||||||
|
OB_MALLOC_MIDDLE_BLOCK_SIZE,
|
||||||
|
ObMemAttr(OB_SERVER_TENANT_ID, ObModIds::OB_TMP_PAGE_CACHE, ObCtxIds::DEFAULT_CTX_ID)))) {
|
||||||
STORAGE_LOG(WARN, "Fail to init io allocator, ", K(ret));
|
STORAGE_LOG(WARN, "Fail to init io allocator, ", K(ret));
|
||||||
} else if (OB_ISNULL(page_cache_ = &ObTmpPageCache::get_instance())) {
|
} else if (OB_ISNULL(page_cache_ = &ObTmpPageCache::get_instance())) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
@ -979,7 +981,7 @@ void ObTmpTenantFileStore::destroy()
|
|||||||
page_cache_ = NULL;
|
page_cache_ = NULL;
|
||||||
}
|
}
|
||||||
allocator_.destroy();
|
allocator_.destroy();
|
||||||
io_allocator_.destroy();
|
io_allocator_.reset();
|
||||||
is_inited_ = false;
|
is_inited_ = false;
|
||||||
STORAGE_LOG(INFO, "cache num when destroy",
|
STORAGE_LOG(INFO, "cache num when destroy",
|
||||||
K(ATOMIC_LOAD(&page_cache_num_)), K(ATOMIC_LOAD(&block_cache_num_)));
|
K(ATOMIC_LOAD(&page_cache_num_)), K(ATOMIC_LOAD(&block_cache_num_)));
|
||||||
|
|||||||
@ -294,7 +294,7 @@ private:
|
|||||||
ObTmpPageCache *page_cache_;
|
ObTmpPageCache *page_cache_;
|
||||||
common::SpinRWLock lock_;
|
common::SpinRWLock lock_;
|
||||||
common::ObConcurrentFIFOAllocator allocator_;
|
common::ObConcurrentFIFOAllocator allocator_;
|
||||||
common::ObConcurrentFIFOAllocator io_allocator_;
|
common::ObFIFOAllocator io_allocator_;
|
||||||
ObTmpTenantMacroBlockManager tmp_block_manager_;
|
ObTmpTenantMacroBlockManager tmp_block_manager_;
|
||||||
ObTmpTenantMemBlockManager tmp_mem_block_manager_;
|
ObTmpTenantMemBlockManager tmp_mem_block_manager_;
|
||||||
|
|
||||||
|
|||||||
@ -13,6 +13,7 @@
|
|||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
#define protected public
|
#define protected public
|
||||||
#define private public
|
#define private public
|
||||||
|
#include "lib/allocator/ob_fifo_allocator.h"
|
||||||
#include "storage/blocksstable/ob_tmp_file.h"
|
#include "storage/blocksstable/ob_tmp_file.h"
|
||||||
#include "storage/blocksstable/ob_tmp_file_store.h"
|
#include "storage/blocksstable/ob_tmp_file_store.h"
|
||||||
#include "storage/blocksstable/ob_tmp_file_cache.h"
|
#include "storage/blocksstable/ob_tmp_file_cache.h"
|
||||||
@ -1720,6 +1721,74 @@ TEST_F(TestTmpFile, test_page_buddy)
|
|||||||
ASSERT_EQ(false, page_buddy_4.is_empty());
|
ASSERT_EQ(false, page_buddy_4.is_empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(TestTmpFile, test_page_io_info_unrelease)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
|
ObTmpTenantFileStoreHandle store_handle;
|
||||||
|
OB_TMP_FILE_STORE.get_store(1, store_handle);
|
||||||
|
common::ObFIFOAllocator *fifo_allocator = &(store_handle.get_tenant_store()->io_allocator_);
|
||||||
|
|
||||||
|
// case 1: construct callback
|
||||||
|
{
|
||||||
|
int64_t begin_used = fifo_allocator->used();
|
||||||
|
{
|
||||||
|
ObTmpPageCache::ObTmpMultiPageIOCallback callback;
|
||||||
|
callback.allocator_ = fifo_allocator;
|
||||||
|
callback.cache_ = &(ObTmpPageCache::get_instance());
|
||||||
|
callback.page_io_infos_.assign(common::ObSEArray<ObTmpPageIOInfo, ObTmpFilePageBuddy::MAX_PAGE_NUMS>());
|
||||||
|
|
||||||
|
int64_t final_used = fifo_allocator->used();
|
||||||
|
ASSERT_EQ(final_used, begin_used);
|
||||||
|
}
|
||||||
|
ASSERT_EQ(begin_used, fifo_allocator->used());
|
||||||
|
}
|
||||||
|
|
||||||
|
// case 2: never call alloc_data_buf
|
||||||
|
{
|
||||||
|
int64_t begin_used = fifo_allocator->used();
|
||||||
|
{
|
||||||
|
ObTmpPageCache::ObTmpMultiPageIOCallback callback;
|
||||||
|
callback.allocator_ = fifo_allocator;
|
||||||
|
callback.cache_ = &(ObTmpPageCache::get_instance());
|
||||||
|
callback.page_io_infos_.assign(common::ObSEArray<ObTmpPageIOInfo, ObTmpFilePageBuddy::MAX_PAGE_NUMS>());
|
||||||
|
|
||||||
|
int64_t tmp_buf_size = 4096;
|
||||||
|
char *tmp_buf = static_cast<char*>(fifo_allocator->alloc(tmp_buf_size));
|
||||||
|
ASSERT_EQ(callback.alloc_data_buf(tmp_buf, tmp_buf_size), OB_SUCCESS);
|
||||||
|
fifo_allocator->free(tmp_buf);
|
||||||
|
int64_t after_alloc_io_buf_used = fifo_allocator->used();
|
||||||
|
ASSERT_EQ(after_alloc_io_buf_used, begin_used + tmp_buf_size);
|
||||||
|
|
||||||
|
int64_t after_process_pos = fifo_allocator->used();
|
||||||
|
ASSERT_EQ(after_process_pos, begin_used + tmp_buf_size);
|
||||||
|
ASSERT_EQ(after_alloc_io_buf_used, after_process_pos);
|
||||||
|
}
|
||||||
|
ASSERT_EQ(begin_used, fifo_allocator->used());
|
||||||
|
}
|
||||||
|
|
||||||
|
// case 3: call inner_process
|
||||||
|
{
|
||||||
|
int64_t begin_used = fifo_allocator->used();
|
||||||
|
{
|
||||||
|
ObTmpPageCache::ObTmpMultiPageIOCallback callback;
|
||||||
|
callback.allocator_ = fifo_allocator;
|
||||||
|
callback.cache_ = &(ObTmpPageCache::get_instance());
|
||||||
|
callback.page_io_infos_.assign(common::ObSEArray<ObTmpPageIOInfo, ObTmpFilePageBuddy::MAX_PAGE_NUMS>());
|
||||||
|
|
||||||
|
int64_t tmp_buf_size = 4096;
|
||||||
|
char *tmp_buf = static_cast<char*>(fifo_allocator->alloc(tmp_buf_size));
|
||||||
|
ASSERT_EQ(callback.inner_process(tmp_buf, tmp_buf_size), OB_SUCCESS);
|
||||||
|
int64_t after_process_used = fifo_allocator->used();
|
||||||
|
ASSERT_EQ(after_process_used, begin_used + tmp_buf_size * 2);
|
||||||
|
fifo_allocator->free(tmp_buf);
|
||||||
|
int64_t final_used = fifo_allocator->used();
|
||||||
|
ASSERT_EQ(final_used, begin_used + tmp_buf_size);
|
||||||
|
}
|
||||||
|
ASSERT_EQ(begin_used, fifo_allocator->used());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(TestTmpFile, test_tmp_file_sync)
|
TEST_F(TestTmpFile, test_tmp_file_sync)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|||||||
Reference in New Issue
Block a user