fix tmp file not insert into flush prio mgr when there are flush tasks not completed

This commit is contained in:
dongb0
2024-08-20 13:18:59 +00:00
committed by ob-robot
parent 6bb31f7239
commit 5178396599
6 changed files with 245 additions and 19 deletions

View File

@ -1008,6 +1008,201 @@ TEST_F(TestTmpFile, test_tmp_file_truncate)
LOG_INFO("test_tmp_file_truncate");
}
TEST_F(TestTmpFile, test_truncate_to_flushed_page_id)
{
int ret = OB_SUCCESS;
const int64_t write_size = 4 * 1024 * 1024 + 12 * 1024;
const int64_t wbp_mem_limit = MTL(ObTenantTmpFileManager *)->page_cache_controller_.write_buffer_pool_.get_memory_limit();
char *write_buf = new char [write_size];
for (int64_t i = 0; i < write_size;) {
int64_t random_length = generate_random_int(1024, 8 * 1024);
int64_t random_int = generate_random_int(0, 256);
for (int64_t j = 0; j < random_length && i + j < write_size; ++j) {
write_buf[i + j] = random_int;
}
i += random_length;
}
int64_t dir = -1;
int64_t fd = -1;
ret = MTL(ObTenantTmpFileManager *)->alloc_dir(dir);
ASSERT_EQ(OB_SUCCESS, ret);
ret = MTL(ObTenantTmpFileManager *)->open(fd, dir);
std::cout << "open temporary file: " << fd << std::endl;
ASSERT_EQ(OB_SUCCESS, ret);
ObTmpFileHandle file_handle;
ret = MTL(ObTenantTmpFileManager *)->get_tmp_file(fd, file_handle);
ASSERT_EQ(OB_SUCCESS, ret);
file_handle.get()->page_idx_cache_.max_bucket_array_capacity_ = SMALL_WBP_IDX_CACHE_MAX_CAPACITY;
ObTmpFileIOInfo io_info;
io_info.fd_ = fd;
io_info.io_desc_.set_wait_event(2);
io_info.buf_ = write_buf;
io_info.size_ = write_size;
io_info.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS;
// Write data
ret = MTL(ObTenantTmpFileManager *)->write(io_info);
ASSERT_EQ(OB_SUCCESS, ret);
sleep(2); // waits for flushing 4MB data pages, 12KB(2 pages) left
ret = MTL(ObTenantTmpFileManager *)->truncate(fd, 4 * 1024 * 1024);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(4 * 1024 * 1024, file_handle.get()->truncated_offset_);
int64_t block_index = -1;
ret = MTL(ObTenantTmpFileManager *)->tmp_file_block_manager_.create_tmp_file_block(0/*begin_page_id*/, ObTmpFileGlobal::BLOCK_PAGE_NUMS, block_index);
int64_t flush_sequence = MTL(ObTenantTmpFileManager *)->page_cache_controller_.flush_mgr_.flush_ctx_.get_flush_sequence();
ObTmpFileDataFlushContext data_flush_ctx;
ObTmpFileFlushTask flush_task;
flush_task.get_flush_infos().push_back(ObTmpFileFlushInfo());
flush_task.set_block_index(block_index);
ret = flush_task.prealloc_block_buf();
EXPECT_EQ(OB_SUCCESS, ret);
// copy first page after flush and truncate
int64_t last_info_idx = flush_task.get_flush_infos().count() - 1;
ret = file_handle.get()->generate_data_flush_info(flush_task, flush_task.get_flush_infos().at(last_info_idx),
data_flush_ctx, flush_sequence, false/*flush tail*/);
ASSERT_EQ(OB_SUCCESS, ret);
LOG_INFO("checking flush task", K(flush_task));
LOG_INFO("checking data flush ctx", K(data_flush_ctx));
int64_t PAGE_SIZE = 8 * 1024;
EXPECT_EQ(PAGE_SIZE, flush_task.get_data_length());
// simulate we have push 1 task to TFFT_INSERT_META_TREE and release truncate lock
// so that another truncate operation can come in.
file_handle.get()->truncate_lock_.unlock();
// truncate again
int64_t truncate_offset = 4 * 1024 * 1024 + PAGE_SIZE + PAGE_SIZE / 2;
ret = MTL(ObTenantTmpFileManager *)->truncate(fd, truncate_offset);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(truncate_offset, file_handle.get()->truncated_offset_);
// append 8KB, 12KB left in wbp
io_info.size_ = PAGE_SIZE;
ret = MTL(ObTenantTmpFileManager *)->write(io_info);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(4 * 1024 * 1024 + 20 * 1024, file_handle.get()->file_size_);
// copy second page
flush_task.get_flush_infos().push_back(ObTmpFileFlushInfo());
last_info_idx = flush_task.get_flush_infos().count() - 1;
ret = file_handle.get()->generate_data_flush_info(flush_task, flush_task.get_flush_infos().at(last_info_idx),
data_flush_ctx, flush_sequence, false/*flush tail*/);
LOG_INFO("checking flush task", K(flush_task));
LOG_INFO("checking data flush ctx", K(data_flush_ctx));
EXPECT_EQ(OB_SUCCESS, ret);
EXPECT_EQ(PAGE_SIZE * 2, flush_task.get_data_length());
// copy third page
flush_task.get_flush_infos().push_back(ObTmpFileFlushInfo());
last_info_idx = flush_task.get_flush_infos().count() - 1;
ret = file_handle.get()->generate_data_flush_info(flush_task, flush_task.get_flush_infos().at(last_info_idx),
data_flush_ctx, flush_sequence, true/*flush tail*/);
LOG_INFO("checking flush task", K(flush_task));
LOG_INFO("checking data flush ctx", K(data_flush_ctx));
EXPECT_EQ(OB_SUCCESS, ret);
EXPECT_EQ(PAGE_SIZE * 3, flush_task.get_data_length());
int64_t first_virtual_page_id = flush_task.get_flush_infos().at(0).flush_virtual_page_id_;
int64_t second_virtual_page_id = flush_task.get_flush_infos().at(1).flush_virtual_page_id_;
int64_t third_virtual_page_id = flush_task.get_flush_infos().at(2).flush_virtual_page_id_;
EXPECT_EQ(first_virtual_page_id + 1, second_virtual_page_id);
EXPECT_EQ(second_virtual_page_id + 1, third_virtual_page_id);
file_handle.reset();
flush_task.~ObTmpFileFlushTask();
free(write_buf);
ret = MTL(ObTenantTmpFileManager *)->remove(fd);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(0, MTL(ObTenantTmpFileManager *)->page_cache_controller_.flush_priority_mgr_.get_file_size());
ASSERT_EQ(0, MTL(ObTenantTmpFileManager *)->page_cache_controller_.evict_mgr_.get_file_size());
LOG_INFO("test_truncate_offset_and_flushed_page_id_defensive_check");
}
TEST_F(TestTmpFile, test_write_last_page_during_flush)
{
int ret = OB_SUCCESS;
const int64_t write_size = 64 * 1024 + 100;
char *write_buf = new char [write_size];
for (int64_t i = 0; i < write_size;) {
int64_t random_length = generate_random_int(1024, 8 * 1024);
int64_t random_int = generate_random_int(0, 256);
for (int64_t j = 0; j < random_length && i + j < write_size; ++j) {
write_buf[i + j] = random_int;
}
i += random_length;
}
int64_t dir = -1;
int64_t fd = -1;
ret = MTL(ObTenantTmpFileManager *)->alloc_dir(dir);
ASSERT_EQ(OB_SUCCESS, ret);
ret = MTL(ObTenantTmpFileManager *)->open(fd, dir);
std::cout << "open temporary file: " << fd << std::endl;
ASSERT_EQ(OB_SUCCESS, ret);
ObTmpFileHandle file_handle;
ret = MTL(ObTenantTmpFileManager *)->get_tmp_file(fd, file_handle);
ASSERT_EQ(OB_SUCCESS, ret);
file_handle.get()->page_idx_cache_.max_bucket_array_capacity_ = SMALL_WBP_IDX_CACHE_MAX_CAPACITY;
ObTmpFileIOInfo io_info;
io_info.fd_ = fd;
io_info.io_desc_.set_wait_event(2);
io_info.buf_ = write_buf;
io_info.size_ = write_size;
io_info.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS;
ret = MTL(ObTenantTmpFileManager *)->write(io_info);
ASSERT_EQ(OB_SUCCESS, ret);
printf("generate_data_flush_info\n");
// hard code generate_data_flush_info to flush last page
int64_t block_index = -1;
ret = MTL(ObTenantTmpFileManager *)->tmp_file_block_manager_.create_tmp_file_block(0/*begin_page_id*/, ObTmpFileGlobal::BLOCK_PAGE_NUMS, block_index);
ASSERT_EQ(OB_SUCCESS, ret);
int64_t flush_sequence = MTL(ObTenantTmpFileManager *)->page_cache_controller_.flush_mgr_.flush_ctx_.get_flush_sequence();
ObTmpFileDataFlushContext data_flush_ctx;
ObTmpFileFlushTask flush_task;
flush_task.get_flush_infos().push_back(ObTmpFileFlushInfo());
flush_task.set_block_index(block_index);
ret = flush_task.prealloc_block_buf();
EXPECT_EQ(OB_SUCCESS, ret);
// copy first page after flush and truncate
int64_t last_info_idx = flush_task.get_flush_infos().count() - 1;
ret = file_handle.get()->generate_data_flush_info(flush_task, flush_task.get_flush_infos().at(last_info_idx),
data_flush_ctx, flush_sequence, true/*flush tail*/);
ASSERT_EQ(OB_SUCCESS, ret);
// insert_meta_tree
ret = file_handle.get()->insert_meta_tree_item(flush_task.get_flush_infos().at(0), block_index);
ASSERT_EQ(OB_SUCCESS, ret);
// write before IO complete
ret = MTL(ObTenantTmpFileManager *)->write(io_info);
ASSERT_EQ(OB_SUCCESS, ret);
// assume io complete, update file meta
bool reset_ctx = false;
ret = file_handle.get()->update_meta_after_flush(flush_task.get_flush_infos().at(0).batch_flush_idx_, false/*is_meta*/, reset_ctx);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(8, file_handle.get()->flushed_data_page_num_);
ASSERT_EQ(0, file_handle.get()->write_back_data_page_num_);
flush_task.~ObTmpFileFlushTask();
file_handle.reset();
ret = MTL(ObTenantTmpFileManager *)->remove(fd);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(0, MTL(ObTenantTmpFileManager *)->page_cache_controller_.flush_priority_mgr_.get_file_size());
ASSERT_EQ(0, MTL(ObTenantTmpFileManager *)->page_cache_controller_.evict_mgr_.get_file_size());
delete [] write_buf;
LOG_INFO("test_write_last_page_during_flush");
}
// generate 750MB random data.
// this test will trigger flush and evict logic for both data and meta pages.
void test_big_file(const int64_t write_size, const int64_t wbp_mem_limit, ObTmpFileIOInfo io_info)
@ -1380,7 +1575,7 @@ TEST_F(TestTmpFile, test_multi_file_multi_thread_read_write_with_block_cache)
TEST_F(TestTmpFile, test_more_files_more_threads_read_write)
{
int ret = OB_SUCCESS;
const int64_t read_thread_cnt = 1;
const int64_t read_thread_cnt = 2;
const int64_t file_cnt = 128;
const int64_t batch_size = 3 * 1024 * 1024;
const int64_t batch_num = 2; // total 128 * 3MB * 2 = 768MB
@ -1400,6 +1595,29 @@ TEST_F(TestTmpFile, test_more_files_more_threads_read_write)
STORAGE_LOG(INFO, "io time", K(io_time));
}
TEST_F(TestTmpFile, test_multiple_small_files)
{
int ret = OB_SUCCESS;
const int64_t read_thread_cnt = 2;
const int64_t file_cnt = 256;
const int64_t batch_size = 10 * 1024; // 10KB
const int64_t batch_num = 3;
const bool disable_block_cache = true;
TestMultiTmpFileStress test(MTL_CTX());
int64_t dir = -1;
ret = MTL(ObTenantTmpFileManager *)->alloc_dir(dir);
ASSERT_EQ(OB_SUCCESS, ret);
ret = test.init(file_cnt, dir, read_thread_cnt, batch_size, batch_num, disable_block_cache);
ASSERT_EQ(OB_SUCCESS, ret);
int64_t io_time = ObTimeUtility::current_time();
test.start();
test.wait();
io_time = ObTimeUtility::current_time() - io_time;
STORAGE_LOG(INFO, "test_multiple_small_files");
STORAGE_LOG(INFO, "io time", K(io_time));
}
TEST_F(TestTmpFile, test_big_file)
{
const int64_t write_size = 750 * 1024 * 1024; // write 750MB data

View File

@ -900,9 +900,7 @@ int ObSharedNothingTmpFile::aio_write(ObTmpFileIOCtx &io_ctx)
if (OB_FAIL(inner_write_(io_ctx))) {
if (OB_ALLOCATE_TMP_FILE_PAGE_FAILED == ret) {
ret = OB_SUCCESS;
if (TC_REACH_COUNT_INTERVAL(10)) {
LOG_INFO("alloc mem failed, try to evict pages", K(fd_), K(file_size_), K(io_ctx));
}
LOG_INFO("alloc mem failed, try to evict pages", K(fd_), K(file_size_), K(io_ctx), KPC(this));
if (OB_FAIL(page_cache_controller_->invoke_swap_and_wait(
MIN(io_ctx.get_todo_size(), ObTmpFileGlobal::TMP_FILE_WRITE_BATCH_PAGE_NUM * ObTmpFileGlobal::PAGE_SIZE),
io_ctx.get_io_timeout_ms()))) {
@ -1998,12 +1996,20 @@ int ObSharedNothingTmpFile::update_meta_after_flush(const int64_t info_idx, cons
if (FAILEDx(inner_flush_ctx_.update_finished_continuous_flush_info_num(is_meta, end_pos))) {
LOG_WARN("fail to update finished continuous flush info num", KR(ret), K(start_pos), K(end_pos), KPC(this));
} else if (inner_flush_ctx_.is_all_finished()) {
if (OB_ISNULL(data_flush_node_.get_next()) && OB_FAIL(reinsert_flush_node_(false /*is_meta*/))) {
LOG_ERROR("fail to reinsert data flush node", KR(ret), K(is_meta), K(inner_flush_ctx_), KPC(this));
} else if (OB_ISNULL(meta_flush_node_.get_next()) && OB_FAIL(reinsert_flush_node_(true /*is_meta*/))) {
LOG_ERROR("fail to reinsert meta flush node", KR(ret), K(is_meta), K(inner_flush_ctx_), KPC(this));
} else {
} else {
int tmp_ret = OB_SUCCESS;
if (inner_flush_ctx_.is_data_finished()) {
if (OB_ISNULL(data_flush_node_.get_next()) && OB_TMP_FAIL(reinsert_flush_node_(false /*is_meta*/))) {
LOG_ERROR("fail to reinsert data flush node", KR(tmp_ret), K(is_meta), K(inner_flush_ctx_), KPC(this));
}
}
if (inner_flush_ctx_.is_meta_finished()) {
if (OB_ISNULL(meta_flush_node_.get_next()) && OB_TMP_FAIL(reinsert_flush_node_(true /*is_meta*/))) {
LOG_ERROR("fail to reinsert meta flush node", KR(tmp_ret), K(is_meta), K(inner_flush_ctx_), KPC(this));
}
}
if (inner_flush_ctx_.is_all_finished()) {
inner_flush_ctx_.reset();
}
}

View File

@ -85,10 +85,14 @@ public:
}
void reset();
int update_finished_continuous_flush_info_num(const bool is_meta, const int64_t end_pos);
bool is_all_finished() const
bool is_all_finished() const { return is_data_finished() && is_meta_finished(); }
bool is_data_finished() const
{
return data_flush_infos_.size() == data_finished_continuous_flush_info_num_ &&
meta_flush_infos_.size() == meta_finished_continuous_flush_info_num_;
return data_flush_infos_.size() == data_finished_continuous_flush_info_num_;
}
bool is_meta_finished() const
{
return meta_flush_infos_.size() == meta_finished_continuous_flush_info_num_;
}
bool is_flushing() const
{

View File

@ -218,7 +218,7 @@ void ObTmpFileBatchFlushContext::record_flush_task(const int64_t data_length)
// -------------- ObTmpFileFlushTask --------------- //
ObTmpFileFlushTask::ObTmpFileFlushTask(ObIAllocator &task_allocator)
ObTmpFileFlushTask::ObTmpFileFlushTask()
: inst_handle_(),
kvpair_(nullptr),
block_handle_(),
@ -232,8 +232,7 @@ ObTmpFileFlushTask::ObTmpFileFlushTask(ObIAllocator &task_allocator)
task_state_(ObTmpFileFlushTaskState::TFFT_INITED),
tmp_file_block_handle_(),
handle_(),
flush_infos_(),
task_allocator_(task_allocator)
flush_infos_()
{
flush_infos_.set_attr(ObMemAttr(MTL_ID(), "TFFlushInfos"));
}

View File

@ -227,7 +227,7 @@ public:
struct ObTmpFileFlushTask : public common::ObSpLinkQueue::Link
{
public:
ObTmpFileFlushTask(ObIAllocator &task_allocator);
ObTmpFileFlushTask();
~ObTmpFileFlushTask() { destroy(); }
enum ObTmpFileFlushTaskState
{
@ -297,7 +297,6 @@ private:
ObTmpFileBlockHandle tmp_file_block_handle_;// hold a reference to the corresponding tmp file block to prevent it from being released
blocksstable::ObMacroBlockHandle handle_;
ObArray<ObTmpFileFlushInfo> flush_infos_; // multi file flush into one block if size > 0
ObIAllocator &task_allocator_; // ref to ObTmpFilePageCacheController::task_allocator_, used to free data_buf_
};
} // end namespace tmp_file

View File

@ -65,7 +65,7 @@ int ObTmpFileFlushManager::alloc_flush_task(ObTmpFileFlushTask *&flush_task)
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "fail to allocate memory for flush callback", KR(ret));
} else {
flush_task = new (task_buf) ObTmpFileFlushTask(task_allocator_);
flush_task = new (task_buf) ObTmpFileFlushTask();
}
return ret;
}