split tmp file flush thread into 4 timers for performance reason

This commit is contained in:
obdev 2024-09-18 08:15:17 +00:00 committed by ob-robot
parent bc726fd81f
commit 7dfea265d1
12 changed files with 387 additions and 109 deletions

View File

@ -197,4 +197,5 @@ TG_DEF(ObPrivateBlockGCThread, PrivGCThread, QUEUE_THREAD,
TG_DEF(TmpFileSwap, TFSwap, THREAD_POOL, 1)
TG_DEF(TmpFileFlush, TFFlush, TIMER, 1024)
#endif

View File

@ -2505,10 +2505,13 @@ int ObSharedNothingTmpFile::generate_data_flush_info_(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("flush sequence not match",
KR(ret), K(inner_flush_ctx_.flush_seq_), K(flush_task), KPC(this));
} else if (OB_FAIL(copy_flush_data_from_wbp_(flush_task, info, data_flush_context,
copy_begin_page_id, copy_begin_page_virtual_id,
copy_end_page_id,
flush_sequence, need_flush_tail))) {
} else if (ObTmpFileFlushTask::TaskType::META == flush_task.get_type()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("flush task type is unexpected", KR(ret), K(flush_task), K(data_flush_context), KPC(this));
} else if (OB_FAIL(collect_flush_data_page_id_(flush_task, info, data_flush_context,
copy_begin_page_id, copy_begin_page_virtual_id,
copy_end_page_id,
flush_sequence, need_flush_tail))) {
LOG_WARN("fail to copy flush data from wbp", KR(ret),
K(flush_task), K(info), K(data_flush_context), KPC(this));
}
@ -2549,7 +2552,7 @@ int ObSharedNothingTmpFile::generate_data_flush_info(
return ret;
}
int ObSharedNothingTmpFile::copy_flush_data_from_wbp_(
int ObSharedNothingTmpFile::collect_flush_data_page_id_(
ObTmpFileFlushTask &flush_task,
ObTmpFileFlushInfo &info,
ObTmpFileDataFlushContext &data_flush_context,
@ -2571,8 +2574,9 @@ int ObSharedNothingTmpFile::copy_flush_data_from_wbp_(
uint32_t cur_flushed_page_id = ObTmpFileGlobal::INVALID_PAGE_ID;
uint32_t cur_page_id = copy_begin_page_id;
int64_t cur_page_virtual_id = copy_begin_page_virtual_id;
int64_t collected_page_cnt = 0;
if (OB_ISNULL(buf) || OB_UNLIKELY(OB_STORAGE_OBJECT_MGR.get_macro_object_size() <= write_offset)) {
if (OB_UNLIKELY(OB_STORAGE_OBJECT_MGR.get_macro_object_size() <= write_offset)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid buf or write_offset", KR(ret), KP(buf), K(write_offset), K(flush_task), KPC(this));
} else if (OB_FAIL(inner_flush_ctx_.data_flush_infos_.push_back(InnerFlushInfo()))) {
@ -2596,21 +2600,21 @@ int ObSharedNothingTmpFile::copy_flush_data_from_wbp_(
LOG_WARN("fail to read page", KR(ret), K(fd_), K(cur_page_id));
} else if (OB_FAIL(wbp_->notify_write_back(fd_, cur_page_id, ObTmpFilePageUniqKey(cur_page_virtual_id)))) {
LOG_WARN("fail to notify write back", KR(ret), K(fd_), K(cur_page_id));
} else if (OB_UNLIKELY(!flush_task.check_buf_range_valid(buf, ObTmpFileGlobal::PAGE_SIZE))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid buffer range", KR(ret), K(fd_), K(write_offset), KP(buf));
} else if (OB_FAIL(flush_task.get_flush_page_id_arr().push_back(cur_page_id))) {
LOG_ERROR("fail to push back flush page id", KR(ret), K(fd_), K(cur_page_id), KPC(this));
ret = OB_ITER_END; // override error code
} else {
// ObTmpPageCacheKey cache_key(flush_task.get_block_index(),
// write_offset / ObTmpFileGlobal::PAGE_SIZE, tenant_id_);
// ObTmpPageCacheValue cache_value(page_buf);
// ObTmpPageCache::get_instance().try_put_page_to_cache(cache_key, cache_value);
MEMCPY(buf + write_offset, page_buf, ObTmpFileGlobal::PAGE_SIZE);
write_offset += ObTmpFileGlobal::PAGE_SIZE;
flushing_page_num += 1;
cur_flushed_page_id = cur_page_id;
cur_page_id = next_page_id;
cur_page_virtual_id += 1;
collected_page_cnt += 1;
if (original_state_is_dirty) {
write_back_data_page_num_++;
}
@ -2658,11 +2662,16 @@ int ObSharedNothingTmpFile::copy_flush_data_from_wbp_(
data_flush_context.set_has_flushed_last_partially_written_page(true);
}
flush_task.set_data_length(write_offset);
flush_task.set_buffer_pool_ptr(wbp_);
flush_task.set_type(ObTmpFileFlushTask::DATA);
inner_flush_ctx_.flush_seq_ = flush_sequence;
} else {
LOG_WARN("fail to generate data flush info", KR(ret), K(fd_), K(need_flush_tail),
K(flush_sequence), K(data_flush_context), K(info), K(flush_task), KPC(this));
for (int32_t i = 0; i < collected_page_cnt; ++i) {
flush_task.get_flush_page_id_arr().pop_back();
}
if (inner_flush_ctx_.data_flush_infos_.size() == origin_info_num + 1) {
inner_flush_ctx_.data_flush_infos_.pop_back();
}
@ -2697,6 +2706,9 @@ int ObSharedNothingTmpFile::generate_meta_flush_info_(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("flush sequence not match", KR(ret), K(flush_sequence), K(inner_flush_ctx_.flush_seq_),
K(flush_task), KPC(this));
} else if (ObTmpFileFlushTask::TaskType::DATA == flush_task.get_type()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("flush task type is unexpected", KR(ret), K(flush_task), K(meta_flush_context), KPC(this));
} else if (OB_ISNULL(buf) || OB_UNLIKELY(OB_STORAGE_OBJECT_MGR.get_macro_object_size() <= write_offset)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid buf or write_offset", KR(ret), KP(buf), K(write_offset), K(flush_task), KPC(this));
@ -2725,6 +2737,7 @@ int ObSharedNothingTmpFile::generate_meta_flush_info_(
if (OB_SUCC(ret)) {
flush_task.set_data_length(write_offset);
flush_task.set_type(ObTmpFileFlushTask::META);
inner_flush_ctx_.flush_seq_ = flush_sequence;
} else {
LOG_WARN("fail to generate meta flush info", KR(ret), K(fd_), K(flush_task),
@ -2788,9 +2801,6 @@ int ObSharedNothingTmpFile::insert_meta_tree_item(const ObTmpFileFlushInfo &info
last_page_lock_.unlock();
}
if (OB_SUCC(ret)) {
truncate_lock_.unlock();
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("flush info does not contain data info", KR(ret), K(info), KPC(this));
@ -2826,6 +2836,13 @@ int ObSharedNothingTmpFile::insert_meta_tree_item(const ObTmpFileFlushInfo &info
return ret;
}
int ObSharedNothingTmpFile::copy_finish()
{
int ret = OB_SUCCESS;
truncate_lock_.unlock();
return ret;
}
int ObSharedNothingTmpFile::cal_next_flush_page_id_from_flush_ctx_or_file_(
const ObTmpFileDataFlushContext &data_flush_context,
uint32_t &next_flush_page_id,

View File

@ -270,6 +270,7 @@ public:
const int64_t flush_sequence,
const bool need_flush_tail);
int insert_meta_tree_item(const ObTmpFileFlushInfo &info, int64_t block_index);
int copy_finish();
public:
int remove_flush_node(const bool is_meta);
int reinsert_flush_node(const bool is_meta);
@ -346,7 +347,7 @@ private:
int64_t &actual_write_size);
int truncate_the_first_wbp_page_();
int copy_flush_data_from_wbp_(ObTmpFileFlushTask &flush_task, ObTmpFileFlushInfo &info,
int collect_flush_data_page_id_(ObTmpFileFlushTask &flush_task, ObTmpFileFlushInfo &info,
ObTmpFileDataFlushContext &data_flush_context,
const uint32_t copy_begin_page_id, const int64_t copy_begin_page_virtual_id,
const uint32_t copy_end_page_id, const int64_t flush_sequence, const bool need_flush_tail);

View File

@ -20,6 +20,23 @@ namespace oceanbase
namespace tmp_file
{
ObTmpFileWriteBlockTask::ObTmpFileWriteBlockTask(ObTmpFileFlushTask &flush_task)
: flush_task_(flush_task)
{
}
void ObTmpFileWriteBlockTask::runTimerTask()
{
int ret = OB_SUCCESS;
if (OB_FAIL(flush_task_.write_one_block())) {
STORAGE_LOG(WARN, "fail to async write blocks", KR(ret), K(flush_task_));
}
flush_task_.atomic_set_write_block_ret_code(ret);
flush_task_.atomic_set_write_block_executed(true);
}
ObTmpFileFlushInfo::ObTmpFileFlushInfo()
: fd_(ObTmpFileGlobal::INVALID_TMP_FILE_FD),
batch_flush_idx_(0),
@ -239,19 +256,23 @@ ObTmpFileFlushTask::ObTmpFileFlushTask()
: inst_handle_(),
kvpair_(nullptr),
block_handle_(),
flush_page_id_arr_(),
write_block_ret_code_(OB_SUCCESS),
ret_code_(OB_SUCCESS),
io_result_ret_code_(OB_SUCCESS),
data_length_(0),
block_index_(-1),
flush_seq_(-1),
create_ts_(-1),
is_write_block_executed_(false),
is_io_finished_(false),
fast_flush_tree_page_(false),
recorded_as_prepare_finished_(false),
type_(TaskType::INVALID),
task_state_(ObTmpFileFlushTaskState::TFFT_INITED),
tmp_file_block_handle_(),
handle_(),
flush_infos_()
flush_infos_(),
flush_write_block_task_(*this)
{
flush_infos_.set_attr(ObMemAttr(MTL_ID(), "TFFlushInfos"));
}
@ -259,17 +280,20 @@ ObTmpFileFlushTask::ObTmpFileFlushTask()
void ObTmpFileFlushTask::destroy()
{
block_handle_.reset();
flush_page_id_arr_.reset();
inst_handle_.reset();
kvpair_ = nullptr;
write_block_ret_code_ = OB_SUCCESS;
ret_code_ = OB_SUCCESS;
io_result_ret_code_ = OB_SUCCESS;
data_length_ = 0;
block_index_ = -1;
flush_seq_ = -1;
create_ts_ = -1;
is_write_block_executed_ = false;
is_io_finished_ = false;
fast_flush_tree_page_ = false;
recorded_as_prepare_finished_ = false;
type_ = TaskType::INVALID;
task_state_ = ObTmpFileFlushTaskState::TFFT_INITED;
flush_infos_.reset();
}
@ -287,27 +311,120 @@ int ObTmpFileFlushTask::prealloc_block_buf()
return ret;
}
int ObTmpFileFlushTask::lazy_alloc_and_fill_block_buf_for_data_page_()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(flush_page_id_arr_.count() <= 0 ||
flush_page_id_arr_.count() > ObTmpFileGlobal::BLOCK_PAGE_NUMS ||
flush_page_id_arr_.count() != upper_align(data_length_, ObTmpFileGlobal::PAGE_SIZE) / ObTmpFileGlobal::PAGE_SIZE)){
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid flush page id array size", KR(ret), K(flush_page_id_arr_.count()), KPC(this));
} else if (flush_infos_.size() == 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("flush_infos_ is empty", KR(ret), KPC(this));
} else if (OB_ISNULL(wbp_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("write buffer pool ptr is nullptr", KR(ret), KPC(this));
} else if (OB_FAIL(prealloc_block_buf())) {
LOG_WARN("fail to prealloc block", KR(ret), K(block_index_), KPC(this));
} else {
char* page_buf = nullptr;
int32_t copy_index = 0;
for (int32_t i = 0; OB_SUCC(ret) && i < flush_infos_.count(); ++i) {
ObTmpFileFlushInfo &flush_info = flush_infos_.at(i);
int64_t cur_info_fd = flush_info.fd_;
int64_t cur_info_disk_begin_id = flush_info.flush_data_page_disk_begin_id_;
int64_t cur_info_page_num = flush_info.flush_data_page_num_;
int64_t cur_info_virtual_page_id = flush_info.flush_virtual_page_id_;
if (copy_index != cur_info_disk_begin_id) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected flush info", KR(ret), K(i), K(copy_index), KPC(this));
}
while (OB_SUCC(ret) && copy_index < cur_info_disk_begin_id + cur_info_page_num
&& copy_index < flush_page_id_arr_.count()) {
uint32_t page_id = flush_page_id_arr_.at(copy_index);
uint32_t next_page_id = ObTmpFileGlobal::INVALID_PAGE_ID; // UNUSED
if (OB_FAIL(wbp_->read_page(cur_info_fd, page_id, ObTmpFilePageUniqKey(cur_info_virtual_page_id), page_buf, next_page_id))) {
LOG_WARN("fail to read page", KR(ret), K(page_id), KPC(this));
} else if (OB_UNLIKELY(!check_buf_range_valid(get_data_buf(), ObTmpFileGlobal::PAGE_SIZE))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid buffer range", KR(ret), KP(get_data_buf()), KPC(this));
} else {
// only copy the size we recorded if we need to flush the last page
// since we do not hold last_page_lock and the last page may be appended
int64_t copy_size = ObTmpFileGlobal::PAGE_SIZE;
if (flush_info.file_size_ != 0) {
if (cur_info_disk_begin_id + cur_info_page_num - 1 != copy_index) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid flush info",
KR(ret), K(cur_info_disk_begin_id), K(cur_info_page_num), K(copy_index), K(flush_info), KPC(this));
} else {
copy_size = flush_info.file_size_ % ObTmpFileGlobal::PAGE_SIZE;
}
}
if (OB_SUCC(ret)) {
MEMCPY(get_data_buf() + copy_index * ObTmpFileGlobal::PAGE_SIZE, page_buf, copy_size);
}
}
copy_index += 1;
cur_info_virtual_page_id += 1;
}
}
if (OB_FAIL(ret)) {
LOG_ERROR("fail to read page and fill block buf", KR(ret), KPC(this));
}
// release truncate_lock regardless of ret
for (int32_t i = 0; i < flush_infos_.count(); ++i) {
flush_infos_.at(i).file_handle_.get()->copy_finish();
}
if (OB_SUCC(ret)) {
int tmp_ret = OB_SUCCESS;
if (TaskType::DATA != type_) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid task type when copy data in writing block", KR(ret), KPC(this));
} else if (OB_TMP_FAIL(ObTmpBlockCache::get_instance().put_block(get_inst_handle(),
get_kvpair(),
get_block_handle()))) {
LOG_WARN("fail to put block into block cache", KR(tmp_ret), KR(ret), KPC(this));
}
}
}
return ret;
}
int ObTmpFileFlushTask::write_one_block()
{
int ret = OB_SUCCESS;
handle_.reset();
blocksstable::ObMacroBlockWriteInfo write_info;
write_info.io_desc_.set_wait_event(2); // TODO: 检查是否需要用临时文件自己的event
write_info.io_desc_.set_resource_group_id(THIS_WORKER.get_group_id());
write_info.io_desc_.set_sys_module_id(ObIOModule::TMP_TENANT_MEM_BLOCK_IO);
write_info.buffer_ = get_data_buf();
write_info.size_ = OB_STORAGE_OBJECT_MGR.get_macro_object_size();
write_info.offset_ = 0;
if (FAILEDx(blocksstable::ObBlockManager::async_write_block(write_info, handle_))) {
LOG_ERROR("fail to async write block", KR(ret), K(write_info));
} else if (OB_FAIL(OB_SERVER_BLOCK_MGR.update_write_time(handle_.get_macro_id(),
true/*update_to_max_time)*/))){ // update to max time to skip bad block inspect
LOG_WARN("failed to update write time", KR(ret), K(handle_));
if (!block_handle_.is_valid()) {
if (TaskType::DATA != type_) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("block handle is null when writing non data task", KR(ret), KPC(this));
} else if (OB_FAIL(lazy_alloc_and_fill_block_buf_for_data_page_())) {
LOG_WARN("fail to lazy alloc and fill block buf", KR(ret), KPC(this));
}
}
atomic_set_write_block_ret_code(ret);
if (OB_SUCC(ret)) {
blocksstable::ObMacroBlockWriteInfo write_info;
write_info.io_desc_.set_wait_event(2); // TODO: 检查是否需要用临时文件自己的event
write_info.io_desc_.set_resource_group_id(THIS_WORKER.get_group_id());
write_info.io_desc_.set_sys_module_id(ObIOModule::TMP_TENANT_MEM_BLOCK_IO);
write_info.buffer_ = get_data_buf();
write_info.size_ = upper_align(get_data_length(), ObTmpFileGlobal::PAGE_SIZE);
write_info.offset_ = 0;
if (OB_FAIL(blocksstable::ObBlockManager::async_write_block(write_info, handle_))) {
LOG_ERROR("fail to async write block", KR(ret), K(write_info));
} else if (OB_FAIL(OB_SERVER_BLOCK_MGR.update_write_time(handle_.get_macro_id(),
true/*update_to_max_time)*/))){ // update to max time to skip bad block inspect
LOG_WARN("failed to update write time", KR(ret), K(handle_));
}
}
return ret;
}

View File

@ -28,6 +28,16 @@ namespace tmp_file
{
class ObTmpFileFlushTG;
class ObTmpFileWriteBlockTask : public common::ObTimerTask
{
public:
explicit ObTmpFileWriteBlockTask(ObTmpFileFlushTask &flush_task);
virtual ~ObTmpFileWriteBlockTask() {}
virtual void runTimerTask() override;
private:
ObTmpFileFlushTask &flush_task_;
};
struct ObTmpFileDataFlushContext
{
public:
@ -242,6 +252,12 @@ public:
TFFT_FINISH = 7,
TFFT_ABORT = 8,
};
enum TaskType
{
INVALID = -1,
DATA = 0,
META = 1,
};
public:
void destroy();
int prealloc_block_buf();
@ -254,8 +270,11 @@ public:
OB_INLINE bool is_valid() const { return OB_NOT_NULL(get_data_buf()); }
OB_INLINE bool is_full() const { return data_length_ == OB_STORAGE_OBJECT_MGR.get_macro_object_size(); }
OB_INLINE char *get_data_buf() const { return block_handle_.value_ == nullptr ? nullptr : block_handle_.value_->get_buffer(); }
OB_INLINE void atomic_set_ret_code(int ret_code) { ATOMIC_SET(&ret_code_, ret_code); }
OB_INLINE int atomic_get_ret_code() const { return ATOMIC_LOAD(&ret_code_); }
OB_INLINE ObSEArray<uint32_t, ObTmpFileGlobal::BLOCK_PAGE_NUMS>& get_flush_page_id_arr() {
return flush_page_id_arr_;
}
OB_INLINE void atomic_set_ret_code(int ret_code) { ATOMIC_SET(&io_result_ret_code_, ret_code); }
OB_INLINE int atomic_get_ret_code() const { return ATOMIC_LOAD(&io_result_ret_code_); }
OB_INLINE void atomic_set_write_block_ret_code(int write_block_ret_code) {
ATOMIC_SET(&write_block_ret_code_, write_block_ret_code);
}
@ -276,6 +295,8 @@ public:
OB_INLINE bool get_is_fast_flush_tree() const { return fast_flush_tree_page_; }
OB_INLINE void mark_recorded_as_prepare_finished() { recorded_as_prepare_finished_ = true; }
OB_INLINE bool get_recorded_as_prepare_finished() const { return recorded_as_prepare_finished_; }
OB_INLINE void set_type(TaskType type) { type_ = type; }
OB_INLINE TaskType get_type() const { return type_; }
OB_INLINE void set_state(const ObTmpFileFlushTaskState state) { task_state_ = state; }
OB_INLINE ObTmpFileFlushTaskState get_state() const { return task_state_; }
OB_INLINE void set_tmp_file_block_handle(const ObTmpFileBlockHandle &tfb_handle) { tmp_file_block_handle_ = tfb_handle; }
@ -289,26 +310,39 @@ public:
return buffer != nullptr && get_data_buf() != nullptr &&
buffer >= get_data_buf() && buffer + length <= get_data_buf() + OB_STORAGE_OBJECT_MGR.get_macro_object_size();
}
TO_STRING_KV(KP(this), KP(kvpair_), K(write_block_ret_code_), K(ret_code_), K(data_length_),
OB_INLINE void set_buffer_pool_ptr(ObTmpWriteBufferPool *wbp) { wbp_ = wbp; }
OB_INLINE void atomic_set_write_block_executed(const bool executed) { ATOMIC_SET(&is_write_block_executed_, executed); }
OB_INLINE bool atomic_get_write_block_executed() const { return ATOMIC_LOAD(&is_write_block_executed_); }
OB_INLINE ObTmpFileWriteBlockTask& get_flush_write_block_task() { return flush_write_block_task_; }
TO_STRING_KV(KP(this), KP(kvpair_), K(io_result_ret_code_), K(data_length_),
K(block_index_), K(flush_seq_), K(create_ts_), K(is_io_finished_),
K(fast_flush_tree_page_), K(recorded_as_prepare_finished_), K(task_state_), K(tmp_file_block_handle_), K(flush_infos_));
K(fast_flush_tree_page_), K(recorded_as_prepare_finished_), K(type_), K(task_state_), K(tmp_file_block_handle_), K(flush_infos_),
K(is_write_block_executed_), K(write_block_ret_code_), K(flush_write_block_task_), K(flush_page_id_arr_.count()));
private:
int lazy_alloc_and_fill_block_buf_for_data_page_();
private:
ObKVCacheInstHandle inst_handle_;
ObKVCachePair *kvpair_;
ObTmpBlockValueHandle block_handle_;
ObSEArray<uint32_t, ObTmpFileGlobal::BLOCK_PAGE_NUMS> flush_page_id_arr_;
int write_block_ret_code_;
int ret_code_;
int io_result_ret_code_;
int64_t data_length_; // data length (including padding to make length upper align to page size)
int64_t block_index_; // tmp file block logical index in ObTmpFileBlockManager
int64_t flush_seq_; // flush sequence, for verification purpose
int64_t create_ts_;
bool is_io_finished_;
bool is_write_block_executed_; // set to true if task has sent IO
bool is_io_finished_; // set to true if task has finished async IO
bool fast_flush_tree_page_; // indicate the task requires fast flush tree pages
bool recorded_as_prepare_finished_;
bool recorded_as_prepare_finished_; // set to true if this task has been recorded by ObTmpFileBatchFlushContext as prepare_finished
TaskType type_;
ObTmpFileFlushTaskState task_state_;
ObTmpFileBlockHandle tmp_file_block_handle_;// hold a reference to the corresponding tmp file block to prevent it from being released
blocksstable::ObMacroBlockHandle handle_;
ObArray<ObTmpFileFlushInfo> flush_infos_; // multi file flush into one block if size > 0
ObTmpFileWriteBlockTask flush_write_block_task_;
ObTmpWriteBufferPool *wbp_; // use to lazy read data pages
};
} // end namespace tmp_file

View File

@ -43,6 +43,7 @@ int ObTmpFileFlushManager::init()
} else if (OB_FAIL(flush_ctx_.init())) {
STORAGE_LOG(WARN, "fail to init flush ctx", KR(ret));
} else {
cur_flush_timer_idx_ = 0;
is_inited_ = true;
}
return ret;
@ -54,6 +55,13 @@ void ObTmpFileFlushManager::destroy()
flush_ctx_.destroy();
}
void ObTmpFileFlushManager::set_flush_timer_tg_id(int* flush_timer_tg_id, const int64_t timer_cnt)
{
for (int64_t i = 0; i < timer_cnt && i < ObTmpFileGlobal::FLUSH_TIMER_CNT; ++i) {
flush_timer_tg_id_[i] = flush_timer_tg_id[i];
}
}
int ObTmpFileFlushManager::alloc_flush_task(ObTmpFileFlushTask *&flush_task)
{
int ret = OB_SUCCESS;
@ -385,7 +393,9 @@ int ObTmpFileFlushManager::fill_block_buf_(ObTmpFileFlushTask &flush_task)
// but is stuck in TFFT_INSERT_META_TREE state and new task has no meta pages to flush
break;
case FlushCtxState::FSM_F4:
if (!flush_task.is_full() && FlushCtxState::FSM_FINISHED != flush_ctx_.get_state()
if (OB_FAIL(flush_task.prealloc_block_buf())) {
STORAGE_LOG(WARN, "fail to prealloc block buf", KR(ret), K(flush_task));
} else if (!flush_task.is_full() && FlushCtxState::FSM_FINISHED != flush_ctx_.get_state()
&& OB_FAIL(inner_fill_block_buf_(flush_task, flush_ctx_.get_state(),
true/*is_meta*/, false/*flush_tail*/))) {
if (OB_ITER_END != ret) {
@ -707,7 +717,7 @@ int ObTmpFileFlushManager::drive_flush_task_retry_(
return ret;
}
int ObTmpFileFlushManager::drive_flush_task_wait_to_finish_(ObTmpFileFlushTask &flush_task, FlushState &next_state)
int ObTmpFileFlushManager::drive_flush_task_wait_(ObTmpFileFlushTask &flush_task, FlushState &next_state)
{
int ret = OB_SUCCESS;
ObTmpFileFlushTask::ObTmpFileFlushTaskState state = flush_task.get_state();
@ -719,7 +729,7 @@ int ObTmpFileFlushManager::drive_flush_task_wait_to_finish_(ObTmpFileFlushTask &
break;
default:
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "unexpected state in drive_flush_task_wait_to_finish_", K(state));
STORAGE_LOG(WARN, "unexpected state in drive_flush_task_wait_", K(state));
break;
}
return ret;
@ -766,7 +776,7 @@ int ObTmpFileFlushManager::io_finished(ObTmpFileFlushTask &flush_task)
{
int ret = OB_SUCCESS;
FlushState next_state = FlushState::TFFT_INITED;
if (OB_FAIL(drive_flush_task_wait_to_finish_(flush_task, next_state))) {
if (OB_FAIL(drive_flush_task_wait_(flush_task, next_state))) {
STORAGE_LOG(WARN, "fail to drive flush state machine to FINISHED", KR(ret), K(flush_task));
} else if (flush_task.get_state() < next_state && OB_FAIL(advance_status_(flush_task, next_state))) {
// if the task encounters an IO error, its status will silently revert to TFFT_ASYNC_WRITE; do not verify status here.
@ -843,14 +853,17 @@ int ObTmpFileFlushManager::handle_create_block_index_(ObTmpFileFlushTask &flush_
return ret;
}
// For performance reasons, we have delayed the memory allocation and data copying of the flush data task
// until the TFFT_ASYNC_WRITE stage, distributing tasks across 4 timer threads for execution;
// flush meta task has fewer occurrences, so we allocate and copy memory directly in the current thread.
int ObTmpFileFlushManager::handle_fill_block_buf_(ObTmpFileFlushTask &flush_task, FlushState &next_state)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
if (OB_FAIL(flush_task.prealloc_block_buf())) {
STORAGE_LOG(WARN, "fail to prealloc block buf", KR(ret), K(flush_task));
} else if (flush_task.get_is_fast_flush_tree()) { // skip flush level, copy meta tree pages directly
if (OB_FAIL(fast_fill_block_buf_with_meta_(flush_task))) {
if (flush_task.get_is_fast_flush_tree()) { // skip flush level, copy meta tree pages directly
if (OB_FAIL(flush_task.prealloc_block_buf())) {
STORAGE_LOG(WARN, "fail to prealloc block buf", KR(ret), K(flush_task));
} else if (OB_FAIL(fast_fill_block_buf_with_meta_(flush_task))) {
STORAGE_LOG(WARN, "fail to fill block buffer with meta", KR(ret), K(flush_task));
}
} else {
@ -874,25 +887,19 @@ int ObTmpFileFlushManager::handle_fill_block_buf_(ObTmpFileFlushTask &flush_task
flush_task.get_flush_infos().at(i).has_meta())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "flush info has both data and meta", KR(ret), K(flush_task));
} else if (OB_UNLIKELY((flush_task.get_flush_infos().at(0).has_data() !=
flush_task.get_flush_infos().at(i).has_data()) ||
(flush_task.get_flush_infos().at(0).has_meta() !=
flush_task.get_flush_infos().at(i).has_meta()))) {
} else if (OB_UNLIKELY(ObTmpFileFlushTask::DATA == flush_task.get_type() &&
flush_task.get_flush_infos().at(i).has_meta())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "flush infos mixed storage meta and data pages", KR(ret), K(flush_task));
STORAGE_LOG(ERROR, "flush infos contain unexpected page type", KR(ret), K(flush_task));
} else if (OB_UNLIKELY(ObTmpFileFlushTask::META == flush_task.get_type() &&
flush_task.get_flush_infos().at(i).has_data())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "flush infos contain unexpected page type", KR(ret), K(flush_task));
}
}
}
if (OB_SUCC(ret)) {
const bool is_whole_data_page = flush_task.get_flush_infos().at(0).has_data();
if (is_whole_data_page &&
OB_TMP_FAIL(ObTmpBlockCache::get_instance().put_block(flush_task.get_inst_handle(),
flush_task.get_kvpair(),
flush_task.get_block_handle()))) {
STORAGE_LOG(WARN, "fail to put block into block cache", KR(tmp_ret), K(flush_task));
}
int64_t used_page_num = flush_task.get_total_page_num();
int64_t unused_page_id = used_page_num;
int64_t unused_page_num = ObTmpFileGlobal::BLOCK_PAGE_NUMS - used_page_num;
@ -927,21 +934,25 @@ int ObTmpFileFlushManager::handle_insert_meta_tree_(ObTmpFileFlushTask &flush_ta
int ObTmpFileFlushManager::handle_async_write_(ObTmpFileFlushTask &flush_task, FlushState &next_state)
{
int ret = OB_SUCCESS;
if (OB_FAIL(flush_task.write_one_block())) {
STORAGE_LOG(WARN, "fail to async write blocks", KR(ret), K(flush_task));
cur_flush_timer_idx_ = (cur_flush_timer_idx_ + 1) % ObTmpFileGlobal::FLUSH_TIMER_CNT;
if (OB_FAIL(TG_SCHEDULE(flush_timer_tg_id_[cur_flush_timer_idx_], flush_task.get_flush_write_block_task(), 0/*delay*/, false/*repeat*/))) {
LOG_WARN("TG_SCHEDULE tmp file write block task failed", KR(ret), K(flush_timer_tg_id_[cur_flush_timer_idx_]), K(cur_flush_timer_idx_), K(flush_task));
} else {
next_state = FlushState::TFFT_WAIT;
}
return ret;
}
int ObTmpFileFlushManager::handle_wait_(ObTmpFileFlushTask &flush_task, FlushState &next_state)
{
int ret = OB_SUCCESS;
int write_block_ret_code = flush_task.atomic_get_write_block_ret_code();
int task_ret_code = flush_task.atomic_get_ret_code();
if (OB_SUCCESS != task_ret_code) {
if (OB_SUCCESS != write_block_ret_code || OB_SUCCESS != task_ret_code) {
// rollback the status to TFFT_ASYNC_WRITE if IO failed, and re-send the I/O in the retry process.
STORAGE_LOG(INFO, "flush_task io fail, retry it later", KR(task_ret_code), K(flush_task));
STORAGE_LOG(INFO, "flush_task io fail, retry it later",
KR(write_block_ret_code), KR(task_ret_code), K(flush_task));
flush_task.set_state(FlushState::TFFT_ASYNC_WRITE);
} else if (OB_FAIL(tmp_file_block_mgr_.write_back_succ(flush_task.get_block_index(),
flush_task.get_macro_block_handle().get_macro_id()))) {

View File

@ -67,6 +67,7 @@ public:
~ObTmpFileFlushManager() {}
int init();
void destroy();
void set_flush_timer_tg_id(int* flush_timer_tg_id, const int64_t timer_cnt);
TO_STRING_KV(K(is_inited_), K(flush_ctx_));
public:
@ -99,7 +100,7 @@ private:
int advance_status_(ObTmpFileFlushTask &flush_task, const FlushState &state);
int drive_flush_task_prepare_(ObTmpFileFlushTask &flush_task, const FlushState state, FlushState &next_state);
int drive_flush_task_retry_(ObTmpFileFlushTask &flush_task, const FlushState state, FlushState &next_state);
int drive_flush_task_wait_to_finish_(ObTmpFileFlushTask &flush_task, FlushState &next_state);
int drive_flush_task_wait_(ObTmpFileFlushTask &flush_task, FlushState &next_state);
int handle_alloc_flush_task_(const bool fast_flush_meta, ObTmpFileFlushTask *&flush_task);
int handle_create_block_index_(ObTmpFileFlushTask &flush_task, FlushState &next_state);
int handle_fill_block_buf_(ObTmpFileFlushTask &flush_task, FlushState &next_state);
@ -125,6 +126,8 @@ private:
ObTmpWriteBufferPool &write_buffer_pool_;
ObTmpFileEvictionManager &evict_mgr_;
ObTmpFileFlushPriorityManager &flush_priority_mgr_;
int32_t cur_flush_timer_idx_;
int flush_timer_tg_id_[ObTmpFileGlobal::FLUSH_TIMER_CNT];
};
} // end namespace tmp_file

View File

@ -51,6 +51,7 @@ struct ObTmpFileGlobal final
FSM_FINISHED = 5
};
static const int64_t INVALID_FLUSH_SEQUENCE = -1;
static const int32_t FLUSH_TIMER_CNT = 4;
};

View File

@ -51,6 +51,8 @@ int ObTmpFilePageCacheController::start()
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
STORAGE_LOG(WARN, "tmp file page cache controller is not inited");
} else if (OB_FAIL(flush_tg_.start())) {
STORAGE_LOG(WARN, "fail to start swap thread", KR(ret));
} else if (OB_FAIL(swap_tg_.start())) {
STORAGE_LOG(WARN, "fail to start swap thread", KR(ret));
}
@ -65,6 +67,7 @@ void ObTmpFilePageCacheController::stop()
} else {
// stop background threads should follow the order 'swap' -> 'flush' because 'swap' holds ref to 'flush'
swap_tg_.stop();
flush_tg_.stop();
}
}
@ -75,6 +78,7 @@ void ObTmpFilePageCacheController::wait()
STORAGE_LOG(WARN, "tmp file page cache controller is not inited");
} else {
swap_tg_.wait();
flush_tg_.wait();
}
}

View File

@ -40,6 +40,7 @@ public:
}
~ObTmpFilePageCacheController() {}
public:
static const int64_t FLUSH_TIMER_CNT = 4;
static const int64_t FLUSH_FAST_INTERVAL = 5; // 5ms
static const int64_t FLUSH_INTERVAL = 1000; // 1s
static const int64_t SWAP_FAST_INTERVAL = 5; // 5ms

View File

@ -51,6 +51,9 @@ ObTmpFileFlushTG::ObTmpFileFlushTG(
fast_loop_cnt_(0),
fast_idle_loop_cnt_(0)
{
for (int32_t i = 0; i < ObTmpFileGlobal::FLUSH_TIMER_CNT; ++i) {
flush_timer_tg_id_[i] = -1;
}
}
int ObTmpFileFlushTG::init()
@ -60,48 +63,97 @@ int ObTmpFileFlushTG::init()
ret = OB_INIT_TWICE;
STORAGE_LOG(WARN, "ObTmpFileSwapTG init twice");
} else {
is_inited_ = true;
mode_ = RUNNING_MODE::NORMAL;
last_flush_timestamp_ = 0;
flush_io_finished_ret_ = OB_SUCCESS;
flush_io_finished_round_ = 0;
flushing_block_num_ = 0;
for (int32_t i = 0; OB_SUCC(ret) && i < ObTmpFileGlobal::FLUSH_TIMER_CNT; ++i) {
if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::TmpFileFlush, flush_timer_tg_id_[i]))) {
STORAGE_LOG(WARN, "fail to create flush timer thread", KR(ret));
}
}
if (OB_SUCC(ret)) {
is_inited_ = true;
mode_ = RUNNING_MODE::NORMAL;
last_flush_timestamp_ = 0;
flush_io_finished_ret_ = OB_SUCCESS;
flush_io_finished_round_ = 0;
flushing_block_num_ = 0;
fast_flush_meta_task_cnt_ = 0;
wait_list_size_ = 0;
retry_list_size_ = 0;
finished_list_size_ = 0;
fast_flush_meta_task_cnt_ = 0;
wait_list_size_ = 0;
retry_list_size_ = 0;
finished_list_size_ = 0;
normal_loop_cnt_ = 0;
normal_idle_loop_cnt_ = 0;
fast_loop_cnt_ = 0;
fast_idle_loop_cnt_ = 0;
normal_loop_cnt_ = 0;
normal_idle_loop_cnt_ = 0;
fast_loop_cnt_ = 0;
fast_idle_loop_cnt_ = 0;
}
}
if (OB_FAIL(ret)) {
destroy();
}
return ret;
}
int ObTmpFileFlushTG::start()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ObTmpFileSwapTG not init", KR(ret));
} else {
for (int32_t i = 0; OB_SUCC(ret) && i < ObTmpFileGlobal::FLUSH_TIMER_CNT; ++i) {
if (OB_FAIL(TG_START(flush_timer_tg_id_[i]))) {
LOG_WARN("TG_START flush_timer_tg_id_ failed", KR(ret), K(flush_timer_tg_id_[i]));
}
}
if (OB_SUCC(ret)) {
flush_mgr_.set_flush_timer_tg_id(flush_timer_tg_id_, ObTmpFileGlobal::FLUSH_TIMER_CNT);
}
}
return ret;
}
void ObTmpFileFlushTG::stop()
{
for (int32_t i = 0; i < ObTmpFileGlobal::FLUSH_TIMER_CNT; ++i) {
TG_STOP(flush_timer_tg_id_[i]);
}
}
void ObTmpFileFlushTG::wait()
{
for (int32_t i = 0; i < ObTmpFileGlobal::FLUSH_TIMER_CNT; ++i) {
TG_WAIT(flush_timer_tg_id_[i]);
}
}
void ObTmpFileFlushTG::destroy()
{
if (IS_INIT) {
clean_up_lists();
mode_ = RUNNING_MODE::INVALID;
last_flush_timestamp_ = 0;
flush_io_finished_ret_ = OB_SUCCESS;
flush_io_finished_round_ = 0;
flushing_block_num_ = 0;
clean_up_lists();
mode_ = RUNNING_MODE::INVALID;
last_flush_timestamp_ = 0;
flush_io_finished_ret_ = OB_SUCCESS;
flush_io_finished_round_ = 0;
flushing_block_num_ = 0;
is_fast_flush_meta_ = false;
fast_flush_meta_task_cnt_ = 0;
wait_list_size_ = 0;
retry_list_size_ = 0;
finished_list_size_ = 0;
is_fast_flush_meta_ = false;
fast_flush_meta_task_cnt_ = 0;
wait_list_size_ = 0;
retry_list_size_ = 0;
finished_list_size_ = 0;
normal_loop_cnt_ = 0;
normal_idle_loop_cnt_ = 0;
fast_loop_cnt_ = 0;
fast_idle_loop_cnt_ = 0;
normal_loop_cnt_ = 0;
normal_idle_loop_cnt_ = 0;
fast_loop_cnt_ = 0;
fast_idle_loop_cnt_ = 0;
is_inited_ = false;
is_inited_ = false;
for (int32_t i = 0; i < ObTmpFileGlobal::FLUSH_TIMER_CNT; ++i) {
if (-1 != flush_timer_tg_id_[i]) {
TG_DESTROY(flush_timer_tg_id_[i]);
flush_timer_tg_id_[i] = -1;
}
}
}
@ -260,19 +312,27 @@ void ObTmpFileFlushTG::flush_fast_()
{
int ret = OB_SUCCESS;
int64_t BLOCK_SIZE = OB_STORAGE_OBJECT_MGR.get_macro_object_size();
int64_t flush_size = min(get_fast_flush_size_(), get_flushing_block_num_threshold_() * BLOCK_SIZE);
if (OB_FAIL(check_flush_task_io_finished_())) {
STORAGE_LOG(WARN, "fail to check flush task io finished", KR(ret));
}
if (OB_FAIL(retry_task_())) {
STORAGE_LOG(WARN, "fail to retry task", KR(ret));
}
if (flush_size > 0) {
if (OB_FAIL(wash_(flush_size, RUNNING_MODE::FAST))) {
STORAGE_LOG(WARN, "fail to flush fast", KR(ret), KPC(this), K(flush_size));
}
int64_t flushing_block_num = ATOMIC_LOAD(&flushing_block_num_);
if (flushing_block_num >= get_flushing_block_num_threshold_()) {
STORAGE_LOG(DEBUG, "reach flushing block num threshold, skip flush", KPC(this));
} else {
STORAGE_LOG(DEBUG, "current expect flush size is 0, skip flush", K(flush_size), K(this));
int64_t max_flushing_block_num_cur_round = get_flushing_block_num_threshold_() - flushing_block_num;
int64_t flush_size = min(get_fast_flush_size_(), max_flushing_block_num_cur_round * BLOCK_SIZE);
if (flush_size > 0) {
if (OB_FAIL(wash_(flush_size, RUNNING_MODE::FAST))) {
STORAGE_LOG(WARN, "fail to flush fast", KR(ret), KPC(this), K(flush_size));
}
} else {
STORAGE_LOG(DEBUG, "current expect flush size is 0, skip flush", K(flush_size), KPC(this));
}
}
}
@ -481,12 +541,36 @@ int ObTmpFileFlushTG::check_flush_task_io_finished_()
if (OB_ISNULL(flush_task)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "flush task is nullptr", KR(ret));
}
bool write_block_success = false;
if (OB_FAIL(ret)) {
} else if (!flush_task->atomic_get_write_block_executed()) {
push_wait_list_(flush_task); // not send IO yet, continue waiting
ret = OB_SUCCESS;
} else if (flush_task->atomic_get_write_block_ret_code() != OB_SUCCESS) {
if (flush_task->atomic_get_write_block_ret_code() == OB_SERVER_OUTOF_DISK_SPACE) {
signal_io_finish(OB_SERVER_OUTOF_DISK_SPACE);
}
// rollback to TFFT_ASYNC_WRITE and re-send IO
if (OB_FAIL(flush_mgr_.io_finished(*flush_task))) {
STORAGE_LOG(WARN, "fail to handle flush task finished", KR(ret), KPC(flush_task));
} else if (FlushState::TFFT_ASYNC_WRITE == flush_task->get_state()) {
push_retry_list_(flush_task);
STORAGE_LOG(DEBUG, "write block failure flush task push to retry list", KPC(flush_task));
} else {
STORAGE_LOG(ERROR, "unexpected flush task state", KR(ret), KPC(flush_task));
}
} else {
write_block_success = true;
}
if (OB_FAIL(ret) || !write_block_success) {
} else if (OB_FAIL(flush_task->wait_macro_block_handle())) {
if (OB_EAGAIN == ret) {
push_wait_list_(flush_task); // IO is not completed, continue waiting
ret = OB_SUCCESS;
} else {
STORAGE_LOG(ERROR, "unexpected error in waiting flush task finished", KR(ret), KPC(this));
STORAGE_LOG(WARN, "unexpected error in waiting flush task finished", KR(ret), KPC(this));
}
} else if (!flush_task->atomic_get_io_finished()) {
ret = OB_ERR_UNEXPECTED;
@ -634,22 +718,21 @@ int ObTmpFileFlushTG::pop_finished_list_(ObTmpFileFlushTask *&flush_task)
return ret;
}
// fast mode flush size is max(2MB, min(5% * tmp_file_memory,30MB))
int ObTmpFileFlushTG::get_fast_flush_size_()
{
// TODO: move to page cache controller
const int64_t BLOCK_SIZE = OB_STORAGE_OBJECT_MGR.get_macro_object_size();
int64_t wbp_mem_limit = wbp_.get_memory_limit();
int64_t flush_size = max(BLOCK_SIZE, min(MAX_FLUSHING_BLOCK_NUM * BLOCK_SIZE, upper_align(0.05 * wbp_mem_limit, BLOCK_SIZE)));
int64_t flush_size = max(BLOCK_SIZE, min(MAX_FLUSHING_BLOCK_NUM * BLOCK_SIZE, upper_align(0.1 * wbp_mem_limit, BLOCK_SIZE)));
return flush_size;
}
// flushing threshold is MIN(20MB, (20% * tmp_file_memory))
int ObTmpFileFlushTG::get_flushing_block_num_threshold_()
int64_t ObTmpFileFlushTG::get_flushing_block_num_threshold_()
{
const int64_t BLOCK_SIZE = OB_STORAGE_OBJECT_MGR.get_macro_object_size();
int64_t wbp_mem_limit = wbp_.get_memory_limit();
int64_t flush_threshold = max(BLOCK_SIZE, min(MAX_FLUSHING_BLOCK_NUM, static_cast<int64_t>(0.2 * wbp_mem_limit / BLOCK_SIZE)));
int64_t flush_threshold =
max(1, min(MAX_FLUSHING_BLOCK_NUM, static_cast<int64_t>(0.2 * wbp_mem_limit / BLOCK_SIZE)));
return flush_threshold;
}

View File

@ -33,7 +33,7 @@ class ObTmpFileFlushTG
{
public:
typedef ObTmpFileFlushTask::ObTmpFileFlushTaskState FlushState;
static const int64_t MAX_FLUSHING_BLOCK_NUM = 50;
static const int64_t MAX_FLUSHING_BLOCK_NUM = 200;
enum RUNNING_MODE {
INVALID = 0,
NORMAL = 1,
@ -45,6 +45,9 @@ public:
ObIAllocator &allocator,
ObTmpFileBlockManager &tmp_file_block_mgr);
int init();
int start();
void stop();
void wait();
void destroy();
int try_work();
@ -71,7 +74,7 @@ private:
void flush_fast_();
void flush_normal_();
int get_fast_flush_size_();
int get_flushing_block_num_threshold_();
int64_t get_flushing_block_num_threshold_();
int push_wait_list_(ObTmpFileFlushTask *flush_task);
int pop_wait_list_(ObTmpFileFlushTask *&flush_task);
int push_retry_list_(ObTmpFileFlushTask *flush_task);
@ -104,6 +107,8 @@ private:
int64_t normal_idle_loop_cnt_;
int64_t fast_loop_cnt_;
int64_t fast_idle_loop_cnt_;
int flush_timer_tg_id_[ObTmpFileGlobal::FLUSH_TIMER_CNT];
};
class ObTmpFileSwapTG : public lib::TGRunnable