optimize read performance of tmp file
This commit is contained in:
@ -471,8 +471,18 @@ int ObTmpFileMeta::clear()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
ObTmpFile::ObTmpFile() : file_meta_(), is_big_(false), tenant_id_(-1), offset_(0), allocator_(NULL), is_inited_(false)
|
ObTmpFile::ObTmpFile()
|
||||||
{}
|
: file_meta_(),
|
||||||
|
is_big_(false),
|
||||||
|
tenant_id_(-1),
|
||||||
|
offset_(0),
|
||||||
|
allocator_(NULL),
|
||||||
|
last_extent_id_(0),
|
||||||
|
last_extent_min_offset_(0),
|
||||||
|
last_extent_max_offset_(INT64_MAX),
|
||||||
|
is_inited_(false)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
ObTmpFile::~ObTmpFile()
|
ObTmpFile::~ObTmpFile()
|
||||||
{
|
{
|
||||||
@ -489,6 +499,9 @@ int ObTmpFile::clear()
|
|||||||
is_big_ = false;
|
is_big_ = false;
|
||||||
tenant_id_ = -1;
|
tenant_id_ = -1;
|
||||||
offset_ = 0;
|
offset_ = 0;
|
||||||
|
last_extent_id_ = 0;
|
||||||
|
last_extent_min_offset_ = 0;
|
||||||
|
last_extent_max_offset_ = INT64_MAX;
|
||||||
allocator_ = NULL;
|
allocator_ = NULL;
|
||||||
is_inited_ = false;
|
is_inited_ = false;
|
||||||
}
|
}
|
||||||
@ -518,7 +531,30 @@ int ObTmpFile::init(const int64_t fd, const int64_t dir_id, common::ObIAllocator
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObTmpFile::aio_pread_without_lock(const ObTmpFileIOInfo& io_info, int64_t& offset, ObTmpFileIOHandle& handle)
|
int64_t ObTmpFile::find_first_extent(const int64_t offset)
|
||||||
|
{
|
||||||
|
common::ObIArray<ObTmpFileExtent *> &extents = file_meta_.get_extents();
|
||||||
|
int64_t first_extent = 0;
|
||||||
|
int64_t left = 0;
|
||||||
|
int64_t right = extents.count() - 1;
|
||||||
|
ObTmpFileExtent *tmp = nullptr;
|
||||||
|
while(left < right) {
|
||||||
|
int64_t mid = (left + right) / 2;
|
||||||
|
tmp = extents.at(mid);
|
||||||
|
if (tmp->get_global_start() <= offset && offset < tmp->get_global_end()) {
|
||||||
|
first_extent = mid;
|
||||||
|
break;
|
||||||
|
} else if(tmp->get_global_start() > offset) {
|
||||||
|
right = mid - 1;
|
||||||
|
} else if(tmp->get_global_end() <= offset) {
|
||||||
|
left = mid + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return first_extent;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObTmpFile::aio_pread_without_lock(const ObTmpFileIOInfo &io_info,
|
||||||
|
int64_t &offset, ObTmpFileIOHandle &handle)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
char* buf = io_info.buf_;
|
char* buf = io_info.buf_;
|
||||||
@ -533,9 +569,15 @@ int ObTmpFile::aio_pread_without_lock(const ObTmpFileIOInfo& io_info, int64_t& o
|
|||||||
} else if (OB_FAIL(handle.prepare_read(io_info.buf_, this))) {
|
} else if (OB_FAIL(handle.prepare_read(io_info.buf_, this))) {
|
||||||
STORAGE_LOG(WARN, "fail to prepare read io handle", K(ret));
|
STORAGE_LOG(WARN, "fail to prepare read io handle", K(ret));
|
||||||
} else {
|
} else {
|
||||||
common::ObIArray<ObTmpFileExtent*>& extents = file_meta_.get_extents();
|
int64_t ith_extent = 0;
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < extents.count() && size > 0; ++i) {
|
common::ObIArray<ObTmpFileExtent *> &extents = file_meta_.get_extents();
|
||||||
tmp = extents.at(i);
|
if (offset >= last_extent_min_offset_ && offset_ <= last_extent_max_offset_) {
|
||||||
|
ith_extent = last_extent_id_;
|
||||||
|
} else {
|
||||||
|
ith_extent = find_first_extent(offset);
|
||||||
|
}
|
||||||
|
for (; OB_SUCC(ret) && ith_extent < extents.count() && size > 0; ++ith_extent) {
|
||||||
|
tmp = extents.at(ith_extent);
|
||||||
if (tmp->get_global_start() <= offset && offset < tmp->get_global_end()) {
|
if (tmp->get_global_start() <= offset && offset < tmp->get_global_end()) {
|
||||||
if (offset + size > tmp->get_global_end()) {
|
if (offset + size > tmp->get_global_end()) {
|
||||||
read_size = tmp->get_global_end() - offset;
|
read_size = tmp->get_global_end() - offset;
|
||||||
@ -550,6 +592,9 @@ int ObTmpFile::aio_pread_without_lock(const ObTmpFileIOInfo& io_info, int64_t& o
|
|||||||
size -= read_size;
|
size -= read_size;
|
||||||
buf += read_size;
|
buf += read_size;
|
||||||
handle.add_data_size(read_size);
|
handle.add_data_size(read_size);
|
||||||
|
last_extent_id_ = ith_extent;
|
||||||
|
last_extent_min_offset_ = tmp->get_global_start();
|
||||||
|
last_extent_max_offset_ = tmp->get_global_end();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -291,6 +291,7 @@ private:
|
|||||||
int aio_pread_without_lock(const ObTmpFileIOInfo& io_info, int64_t& offset, ObTmpFileIOHandle& handle);
|
int aio_pread_without_lock(const ObTmpFileIOInfo& io_info, int64_t& offset, ObTmpFileIOHandle& handle);
|
||||||
int64_t small_file_prealloc_size();
|
int64_t small_file_prealloc_size();
|
||||||
int64_t big_file_prealloc_size();
|
int64_t big_file_prealloc_size();
|
||||||
|
int64_t find_first_extent(const int64_t offset);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// NOTE:
|
// NOTE:
|
||||||
@ -303,7 +304,10 @@ private:
|
|||||||
bool is_big_;
|
bool is_big_;
|
||||||
uint64_t tenant_id_;
|
uint64_t tenant_id_;
|
||||||
int64_t offset_; // read offset
|
int64_t offset_; // read offset
|
||||||
common::ObIAllocator* allocator_;
|
common::ObIAllocator *allocator_;
|
||||||
|
int64_t last_extent_id_;
|
||||||
|
int64_t last_extent_min_offset_;
|
||||||
|
int64_t last_extent_max_offset_;
|
||||||
common::SpinRWLock lock_;
|
common::SpinRWLock lock_;
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
|
|
||||||
|
|||||||
@ -1476,6 +1476,104 @@ TEST_F(TestTmpFile, test_handle_double_wait)
|
|||||||
ObTmpFileManager::get_instance().remove(fd);
|
ObTmpFileManager::get_instance().remove(fd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(TestTmpFile, test_sql_workload)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
int64_t dir = -1;
|
||||||
|
int64_t fd = -1;
|
||||||
|
const int64_t macro_block_size = OB_FILE_SYSTEM.get_macro_block_size();
|
||||||
|
ObTmpFileIOInfo io_info;
|
||||||
|
ObTmpFileIOHandle handle;
|
||||||
|
ret = ObTmpFileManager::get_instance().alloc_dir(dir);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
ret = ObTmpFileManager::get_instance().open(fd, dir);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
const int64_t blk_cnt = 16;
|
||||||
|
int64_t write_size = macro_block_size * blk_cnt;
|
||||||
|
char *write_buf = (char *)malloc(write_size);
|
||||||
|
for (int64_t i = 0; i < write_size; ++i) {
|
||||||
|
write_buf[i] = static_cast<char>(i % 256);
|
||||||
|
}
|
||||||
|
char *read_buf = (char *)malloc(write_size);
|
||||||
|
|
||||||
|
|
||||||
|
io_info.fd_ = fd;
|
||||||
|
io_info.tenant_id_ = 1;
|
||||||
|
io_info.io_desc_.category_ = USER_IO;
|
||||||
|
io_info.io_desc_.wait_event_no_ = 2;
|
||||||
|
io_info.buf_ = write_buf;
|
||||||
|
io_info.size_ = write_size;
|
||||||
|
const int64_t timeout_ms = 5000;
|
||||||
|
int64_t write_time = ObTimeUtility::current_time();
|
||||||
|
|
||||||
|
const int cnt = 1;
|
||||||
|
const int64_t sql_read_size = 64 * 1024;
|
||||||
|
const int64_t sql_cnt = write_size / sql_read_size;
|
||||||
|
|
||||||
|
for (int i = 0; i < cnt; i++) {
|
||||||
|
for (int64_t j = 0; j < sql_cnt; j++) {
|
||||||
|
io_info.size_ = sql_read_size;
|
||||||
|
io_info.buf_ = write_buf + j * sql_read_size;
|
||||||
|
ret = ObTmpFileManager::get_instance().write(io_info, timeout_ms);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
write_time = ObTimeUtility::current_time() - write_time;
|
||||||
|
|
||||||
|
|
||||||
|
io_info.buf_ = read_buf;
|
||||||
|
|
||||||
|
io_info.size_ = macro_block_size;
|
||||||
|
ret = ObTmpFileManager::get_instance().pread(io_info, 100, timeout_ms, handle);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
ASSERT_EQ(macro_block_size, handle.get_data_size());
|
||||||
|
int cmp = memcmp(handle.get_buffer(), write_buf + 100, handle.get_data_size());
|
||||||
|
ASSERT_EQ(0, cmp);
|
||||||
|
|
||||||
|
|
||||||
|
io_info.size_ = write_size;
|
||||||
|
int64_t read_time = ObTimeUtility::current_time();
|
||||||
|
|
||||||
|
ret = ObTmpFileManager::get_instance().seek(fd, 0, ObTmpFile::SET_SEEK);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
|
||||||
|
for (int i = 0; i < cnt; i++) {
|
||||||
|
for (int64_t j = 0; j < sql_cnt; j++) {
|
||||||
|
io_info.size_ = sql_read_size;
|
||||||
|
io_info.buf_ = read_buf + j * sql_read_size;
|
||||||
|
ret = ObTmpFileManager::get_instance().read(io_info, timeout_ms, handle);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
ASSERT_EQ(sql_read_size, handle.get_data_size());
|
||||||
|
cmp = memcmp(handle.get_buffer(), write_buf + j * sql_read_size, sql_read_size);
|
||||||
|
ASSERT_EQ(0, cmp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
read_time = ObTimeUtility::current_time() - read_time;
|
||||||
|
|
||||||
|
io_info.size_ = 200;
|
||||||
|
ret = ObTmpFileManager::get_instance().pread(io_info, 200, timeout_ms, handle);
|
||||||
|
ASSERT_EQ(OB_SUCCESS, ret);
|
||||||
|
ASSERT_EQ(200, handle.get_data_size());
|
||||||
|
cmp = memcmp(handle.get_buffer(), write_buf + 200, 200);
|
||||||
|
ASSERT_EQ(0, cmp);
|
||||||
|
|
||||||
|
free(write_buf);
|
||||||
|
free(read_buf);
|
||||||
|
|
||||||
|
|
||||||
|
STORAGE_LOG(INFO, "test_sql_workload");
|
||||||
|
STORAGE_LOG(INFO, "io time", K((write_size * cnt) / (1024*1024*1024)), K(write_time), K(read_time));
|
||||||
|
ObTmpTenantFileStore *store = NULL;
|
||||||
|
OB_TMP_FILE_STORE.get_store(1, store);
|
||||||
|
store->print_block_usage();
|
||||||
|
ObMallocAllocator::get_instance()->print_tenant_memory_usage(1);
|
||||||
|
ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(1);
|
||||||
|
ObMallocAllocator::get_instance()->print_tenant_memory_usage(500);
|
||||||
|
ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(500);
|
||||||
|
|
||||||
|
ObTmpFileManager::get_instance().remove(fd);
|
||||||
|
}
|
||||||
|
|
||||||
} // end namespace unittest
|
} // end namespace unittest
|
||||||
} // end namespace oceanbase
|
} // end namespace oceanbase
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user