From b98f09b5bc39ef05b46d31b6ed03175c73beb81b Mon Sep 17 00:00:00 2001 From: oceanoverflow Date: Wed, 22 Mar 2023 04:45:47 +0000 Subject: [PATCH] parallel external sort judge if final sort before add item --- src/storage/backup/ob_backup_utils.cpp | 33 +++++++------------ src/storage/backup/ob_backup_utils.h | 2 -- src/storage/ob_parallel_external_sort.h | 44 +++++++++++++++++++++++-- 3 files changed, 53 insertions(+), 26 deletions(-) diff --git a/src/storage/backup/ob_backup_utils.cpp b/src/storage/backup/ob_backup_utils.cpp index 68b3c8828..0bd6f3442 100644 --- a/src/storage/backup/ob_backup_utils.cpp +++ b/src/storage/backup/ob_backup_utils.cpp @@ -1429,9 +1429,7 @@ ObBackupTabletProvider::ObBackupTabletProvider() backup_item_cmp_(sort_ret_), meta_index_store_(), prev_item_(), - has_prev_item_(false), - supply_count_(0), - consume_count_(0) + has_prev_item_(false) {} ObBackupTabletProvider::~ObBackupTabletProvider() @@ -1451,9 +1449,6 @@ int ObBackupTabletProvider::init(const ObLSBackupParam ¶m, const share::ObBa LOG_WARN("get invalid args", K(ret), K(param), K(backup_data_type)); } else if (FALSE_IT(backup_item_cmp_.set_backup_data_type(backup_data_type))) { LOG_WARN("failed to set backup data type", K(ret), K(backup_data_type)); - } else if (OB_FAIL(external_sort_.init( - BUF_MEM_LIMIT, FILE_BUF_SIZE, EXPIRE_TIMESTAMP, OB_SYS_TENANT_ID, &backup_item_cmp_))) { - LOG_WARN("failed to init external sort", K(ret)); } else if (OB_FAIL(param_.assign(param))) { LOG_WARN("failed to assign param", K(ret), K(param)); } else { @@ -1571,16 +1566,6 @@ int ObBackupTabletProvider::inner_get_batch_items_( if (OB_FAIL(external_sort_.get_next_item(next_item))) { if (OB_ITER_END == ret) { ret = OB_SUCCESS; - external_sort_.clean_up(); - if (supply_count_ != consume_count_) { - ret = OB_ERR_SYS; - LOG_WARN("reinit external sort before all consume", K(ret), K_(meet_end), K_(param), K(supply_count_), K(consume_count_), K_(has_prev_item), K_(prev_item)); - } else if (OB_FAIL(external_sort_.init( - BUF_MEM_LIMIT, FILE_BUF_SIZE, EXPIRE_TIMESTAMP, OB_SYS_TENANT_ID, &backup_item_cmp_))) { - LOG_WARN("failed to init external sort", K(ret)); - } else { - LOG_INFO("reinit external sort", K(consume_count_), K(supply_count_)); - } break; } else { LOG_WARN("failed to get next item", K(ret)); @@ -1595,7 +1580,6 @@ int ObBackupTabletProvider::inner_get_batch_items_( } else { has_prev_item_ = true; prev_item_ = *next_item; - consume_count_++; } } LOG_INFO("inner get batch item", K(items), K_(backup_data_type)); @@ -1606,8 +1590,15 @@ int ObBackupTabletProvider::prepare_batch_tablet_(const uint64_t tenant_id, cons { int ret = OB_SUCCESS; int64_t total_count = 0; - const bool need_prepare = (consume_count_ == supply_count_); + const bool need_prepare = external_sort_.is_all_got(); if (need_prepare) { + external_sort_.clean_up(); + if (OB_FAIL(external_sort_.init( + BUF_MEM_LIMIT, FILE_BUF_SIZE, EXPIRE_TIMESTAMP, OB_SYS_TENANT_ID, &backup_item_cmp_))) { + LOG_WARN("failed to init external sort", K(ret)); + } else { + LOG_INFO("reinit external sort", K(tenant_id), K(ls_id)); + } while (OB_SUCC(ret) && total_count < BATCH_SIZE) { ObTabletID tablet_id; int64_t count = 0; @@ -1628,7 +1619,6 @@ int ObBackupTabletProvider::prepare_batch_tablet_(const uint64_t tenant_id, cons } else if (OB_FAIL(ObBackupUtils::check_ls_valid_for_backup(tenant_id, ls_id, ls_backup_ctx_->rebuild_seq_))) { LOG_WARN("failed to check ls valid for backup", K(ret), K(tenant_id), K(ls_id)); } else { - supply_count_ += count; total_count += count; } } @@ -1636,11 +1626,12 @@ int ObBackupTabletProvider::prepare_batch_tablet_(const uint64_t tenant_id, cons if (OB_FAIL(external_sort_.do_sort(true /*final_merge*/))) { LOG_WARN("failed to do external sort", K(ret)); } else { - LOG_INFO("do sort", K(consume_count_), K(supply_count_)); + LOG_INFO("do sort"); } } } else { - LOG_INFO("no need prepare now", K(tenant_id), K(ls_id), K(consume_count_), K(supply_count_)); + LOG_INFO("no need prepare now", K(tenant_id), K(ls_id), "is_all_got", external_sort_.is_all_got(), + "is_sorted", external_sort_.is_sorted()); } return ret; } diff --git a/src/storage/backup/ob_backup_utils.h b/src/storage/backup/ob_backup_utils.h index 0e61d047b..3c6aeed27 100644 --- a/src/storage/backup/ob_backup_utils.h +++ b/src/storage/backup/ob_backup_utils.h @@ -323,8 +323,6 @@ private: ObBackupMetaIndexStore meta_index_store_; ObBackupProviderItem prev_item_; bool has_prev_item_; - int supply_count_; - int consume_count_; DISALLOW_COPY_AND_ASSIGN(ObBackupTabletProvider); }; diff --git a/src/storage/ob_parallel_external_sort.h b/src/storage/ob_parallel_external_sort.h index d91738532..dcadd2acb 100644 --- a/src/storage/ob_parallel_external_sort.h +++ b/src/storage/ob_parallel_external_sort.h @@ -1693,8 +1693,11 @@ public: int add_fragment_iter(ObFragmentIterator *iter); int transfer_final_sorted_fragment_iter(ObExternalSort &merge_sorter); int get_current_round(ExternalSortRound *&round); + bool is_sorted() const { return is_sorted_; } + bool is_all_got() const { return add_count_ == get_count_; } + void add_count(const int64_t count) { add_count_ += count; } TO_STRING_KV(K(is_inited_), K(file_buf_size_), K(buf_mem_limit_), K(expire_timestamp_), - K(merge_count_per_round_), KP(tenant_id_), KP(compare_)); + K(merge_count_per_round_), KP(tenant_id_), KP(compare_), K_(add_count), K_(get_count)); private: static const int64_t EXTERNAL_SORT_ROUND_CNT = 2; bool is_inited_; @@ -1709,13 +1712,17 @@ private: ExternalSortRound *next_round_; bool is_empty_; uint64_t tenant_id_; + bool is_sorted_; + int64_t add_count_; + int64_t get_count_; }; template ObExternalSort::ObExternalSort() : is_inited_(false), file_buf_size_(0), buf_mem_limit_(0), expire_timestamp_(0), merge_count_per_round_(0), compare_(NULL), memory_sort_round_(), curr_round_(NULL), next_round_(NULL), - is_empty_(true), tenant_id_(common::OB_INVALID_ID) + is_empty_(true), tenant_id_(common::OB_INVALID_ID), is_sorted_(false), + add_count_(0), get_count_(0) { } @@ -1723,7 +1730,8 @@ template ObExternalSort::ObExternalSort(ObIAllocator &allocator) : is_inited_(false), file_buf_size_(0), buf_mem_limit_(0), expire_timestamp_(0), merge_count_per_round_(0), compare_(NULL), memory_sort_round_(allocator), curr_round_(NULL), next_round_(NULL), - is_empty_(true), tenant_id_(common::OB_INVALID_ID) + is_empty_(true), tenant_id_(common::OB_INVALID_ID), is_sorted_(false), + add_count_(0), get_count_(0) { for (int64_t i = 0; i < EXTERNAL_SORT_ROUND_CNT; i++) { new (&sort_rounds_[i]) ExternalSortRound(allocator); @@ -1763,6 +1771,9 @@ int ObExternalSort::init( curr_round_ = &sort_rounds_[0]; next_round_ = &sort_rounds_[1]; is_empty_ = true; + is_sorted_ = false; + add_count_ = 0; + get_count_ = 0; if (merge_count_per_round_ < ObExternalSortConstant::MIN_MULTIPLE_MERGE_COUNT) { ret = common::OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid argument, invalid memory limit", K(ret), @@ -1787,8 +1798,13 @@ int ObExternalSort::add_item(const T &item) if (OB_UNLIKELY(!is_inited_)) { ret = common::OB_NOT_INIT; STORAGE_LOG(WARN, "ObExternalSort has not been inited", K(ret)); + } else if (OB_UNLIKELY(is_sorted_)) { + ret = common::OB_ERR_SYS; + STORAGE_LOG(WARN, "should not add item after being sorted", K(ret)); } else if (OB_FAIL(memory_sort_round_.add_item(item))) { STORAGE_LOG(WARN, "fail to add item in memory sort round", K(ret)); + } else { + add_count_++; } return ret; } @@ -1838,6 +1854,9 @@ int ObExternalSort::do_sort(const bool final_merge) } } } + if (OB_SUCC(ret)) { + is_sorted_ = true; + } return ret; } @@ -1850,6 +1869,12 @@ int ObExternalSort::get_next_item(const T *&item) STORAGE_LOG(WARN, "ObExternalSort has not been inited", K(ret)); } else if (is_empty_) { ret = common::OB_ITER_END; + } else if (OB_UNLIKELY(!is_sorted_)) { + ret = common::OB_ERR_SYS; + STORAGE_LOG(WARN, "direct get next item before being sorted", K(ret)); + }else if (OB_UNLIKELY(add_count_ < get_count_)) { + ret = common::OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "supply count and consume count not match", K(ret), K_(add_count), K_(get_count)); } else if (memory_sort_round_.has_data() && memory_sort_round_.is_in_memory()) { if (OB_FAIL(memory_sort_round_.get_next_item(item))) { if (common::OB_ITER_END != ret) { @@ -1861,6 +1886,13 @@ int ObExternalSort::get_next_item(const T *&item) STORAGE_LOG(WARN, "fail to get next item", K(ret)); } } + if (OB_SUCC(ret)) { + get_count_++; + } + if (common::OB_ITER_END == ret && add_count_ != get_count_) { + ret = common::OB_ERR_SYS; + STORAGE_LOG(WARN, "add count not match get count", K(ret), K_(add_count), K_(get_count)); + } return ret; } @@ -1878,6 +1910,9 @@ void ObExternalSort::clean_up() curr_round_ = NULL; next_round_ = NULL; is_empty_ = true; + is_sorted_ = false; + add_count_ = 0; + get_count_ = 0; STORAGE_LOG(INFO, "do external sort clean up"); for (int64_t i = 0; i < EXTERNAL_SORT_ROUND_CNT; ++i) { // ignore ret @@ -1946,6 +1981,9 @@ int ObExternalSort::transfer_final_sorted_fragment_iter( } else { merge_sorter.is_empty_ = false; } + if (OB_SUCC(ret)) { + merge_sorter.add_count(add_count_); + } return ret; }