parallel external sort judge if final sort before add item
This commit is contained in:
parent
121dd427e8
commit
b98f09b5bc
@ -1429,9 +1429,7 @@ ObBackupTabletProvider::ObBackupTabletProvider()
|
||||
backup_item_cmp_(sort_ret_),
|
||||
meta_index_store_(),
|
||||
prev_item_(),
|
||||
has_prev_item_(false),
|
||||
supply_count_(0),
|
||||
consume_count_(0)
|
||||
has_prev_item_(false)
|
||||
{}
|
||||
|
||||
ObBackupTabletProvider::~ObBackupTabletProvider()
|
||||
@ -1451,9 +1449,6 @@ int ObBackupTabletProvider::init(const ObLSBackupParam ¶m, const share::ObBa
|
||||
LOG_WARN("get invalid args", K(ret), K(param), K(backup_data_type));
|
||||
} else if (FALSE_IT(backup_item_cmp_.set_backup_data_type(backup_data_type))) {
|
||||
LOG_WARN("failed to set backup data type", K(ret), K(backup_data_type));
|
||||
} else if (OB_FAIL(external_sort_.init(
|
||||
BUF_MEM_LIMIT, FILE_BUF_SIZE, EXPIRE_TIMESTAMP, OB_SYS_TENANT_ID, &backup_item_cmp_))) {
|
||||
LOG_WARN("failed to init external sort", K(ret));
|
||||
} else if (OB_FAIL(param_.assign(param))) {
|
||||
LOG_WARN("failed to assign param", K(ret), K(param));
|
||||
} else {
|
||||
@ -1571,16 +1566,6 @@ int ObBackupTabletProvider::inner_get_batch_items_(
|
||||
if (OB_FAIL(external_sort_.get_next_item(next_item))) {
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
external_sort_.clean_up();
|
||||
if (supply_count_ != consume_count_) {
|
||||
ret = OB_ERR_SYS;
|
||||
LOG_WARN("reinit external sort before all consume", K(ret), K_(meet_end), K_(param), K(supply_count_), K(consume_count_), K_(has_prev_item), K_(prev_item));
|
||||
} else if (OB_FAIL(external_sort_.init(
|
||||
BUF_MEM_LIMIT, FILE_BUF_SIZE, EXPIRE_TIMESTAMP, OB_SYS_TENANT_ID, &backup_item_cmp_))) {
|
||||
LOG_WARN("failed to init external sort", K(ret));
|
||||
} else {
|
||||
LOG_INFO("reinit external sort", K(consume_count_), K(supply_count_));
|
||||
}
|
||||
break;
|
||||
} else {
|
||||
LOG_WARN("failed to get next item", K(ret));
|
||||
@ -1595,7 +1580,6 @@ int ObBackupTabletProvider::inner_get_batch_items_(
|
||||
} else {
|
||||
has_prev_item_ = true;
|
||||
prev_item_ = *next_item;
|
||||
consume_count_++;
|
||||
}
|
||||
}
|
||||
LOG_INFO("inner get batch item", K(items), K_(backup_data_type));
|
||||
@ -1606,8 +1590,15 @@ int ObBackupTabletProvider::prepare_batch_tablet_(const uint64_t tenant_id, cons
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t total_count = 0;
|
||||
const bool need_prepare = (consume_count_ == supply_count_);
|
||||
const bool need_prepare = external_sort_.is_all_got();
|
||||
if (need_prepare) {
|
||||
external_sort_.clean_up();
|
||||
if (OB_FAIL(external_sort_.init(
|
||||
BUF_MEM_LIMIT, FILE_BUF_SIZE, EXPIRE_TIMESTAMP, OB_SYS_TENANT_ID, &backup_item_cmp_))) {
|
||||
LOG_WARN("failed to init external sort", K(ret));
|
||||
} else {
|
||||
LOG_INFO("reinit external sort", K(tenant_id), K(ls_id));
|
||||
}
|
||||
while (OB_SUCC(ret) && total_count < BATCH_SIZE) {
|
||||
ObTabletID tablet_id;
|
||||
int64_t count = 0;
|
||||
@ -1628,7 +1619,6 @@ int ObBackupTabletProvider::prepare_batch_tablet_(const uint64_t tenant_id, cons
|
||||
} else if (OB_FAIL(ObBackupUtils::check_ls_valid_for_backup(tenant_id, ls_id, ls_backup_ctx_->rebuild_seq_))) {
|
||||
LOG_WARN("failed to check ls valid for backup", K(ret), K(tenant_id), K(ls_id));
|
||||
} else {
|
||||
supply_count_ += count;
|
||||
total_count += count;
|
||||
}
|
||||
}
|
||||
@ -1636,11 +1626,12 @@ int ObBackupTabletProvider::prepare_batch_tablet_(const uint64_t tenant_id, cons
|
||||
if (OB_FAIL(external_sort_.do_sort(true /*final_merge*/))) {
|
||||
LOG_WARN("failed to do external sort", K(ret));
|
||||
} else {
|
||||
LOG_INFO("do sort", K(consume_count_), K(supply_count_));
|
||||
LOG_INFO("do sort");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG_INFO("no need prepare now", K(tenant_id), K(ls_id), K(consume_count_), K(supply_count_));
|
||||
LOG_INFO("no need prepare now", K(tenant_id), K(ls_id), "is_all_got", external_sort_.is_all_got(),
|
||||
"is_sorted", external_sort_.is_sorted());
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -323,8 +323,6 @@ private:
|
||||
ObBackupMetaIndexStore meta_index_store_;
|
||||
ObBackupProviderItem prev_item_;
|
||||
bool has_prev_item_;
|
||||
int supply_count_;
|
||||
int consume_count_;
|
||||
DISALLOW_COPY_AND_ASSIGN(ObBackupTabletProvider);
|
||||
};
|
||||
|
||||
|
@ -1693,8 +1693,11 @@ public:
|
||||
int add_fragment_iter(ObFragmentIterator<T> *iter);
|
||||
int transfer_final_sorted_fragment_iter(ObExternalSort<T, Compare> &merge_sorter);
|
||||
int get_current_round(ExternalSortRound *&round);
|
||||
bool is_sorted() const { return is_sorted_; }
|
||||
bool is_all_got() const { return add_count_ == get_count_; }
|
||||
void add_count(const int64_t count) { add_count_ += count; }
|
||||
TO_STRING_KV(K(is_inited_), K(file_buf_size_), K(buf_mem_limit_), K(expire_timestamp_),
|
||||
K(merge_count_per_round_), KP(tenant_id_), KP(compare_));
|
||||
K(merge_count_per_round_), KP(tenant_id_), KP(compare_), K_(add_count), K_(get_count));
|
||||
private:
|
||||
static const int64_t EXTERNAL_SORT_ROUND_CNT = 2;
|
||||
bool is_inited_;
|
||||
@ -1709,13 +1712,17 @@ private:
|
||||
ExternalSortRound *next_round_;
|
||||
bool is_empty_;
|
||||
uint64_t tenant_id_;
|
||||
bool is_sorted_;
|
||||
int64_t add_count_;
|
||||
int64_t get_count_;
|
||||
};
|
||||
|
||||
template<typename T, typename Compare>
|
||||
ObExternalSort<T, Compare>::ObExternalSort()
|
||||
: is_inited_(false), file_buf_size_(0), buf_mem_limit_(0), expire_timestamp_(0), merge_count_per_round_(0),
|
||||
compare_(NULL), memory_sort_round_(), curr_round_(NULL), next_round_(NULL),
|
||||
is_empty_(true), tenant_id_(common::OB_INVALID_ID)
|
||||
is_empty_(true), tenant_id_(common::OB_INVALID_ID), is_sorted_(false),
|
||||
add_count_(0), get_count_(0)
|
||||
{
|
||||
}
|
||||
|
||||
@ -1723,7 +1730,8 @@ template<typename T, typename Compare>
|
||||
ObExternalSort<T, Compare>::ObExternalSort(ObIAllocator &allocator)
|
||||
: is_inited_(false), file_buf_size_(0), buf_mem_limit_(0), expire_timestamp_(0), merge_count_per_round_(0),
|
||||
compare_(NULL), memory_sort_round_(allocator), curr_round_(NULL), next_round_(NULL),
|
||||
is_empty_(true), tenant_id_(common::OB_INVALID_ID)
|
||||
is_empty_(true), tenant_id_(common::OB_INVALID_ID), is_sorted_(false),
|
||||
add_count_(0), get_count_(0)
|
||||
{
|
||||
for (int64_t i = 0; i < EXTERNAL_SORT_ROUND_CNT; i++) {
|
||||
new (&sort_rounds_[i]) ExternalSortRound(allocator);
|
||||
@ -1763,6 +1771,9 @@ int ObExternalSort<T, Compare>::init(
|
||||
curr_round_ = &sort_rounds_[0];
|
||||
next_round_ = &sort_rounds_[1];
|
||||
is_empty_ = true;
|
||||
is_sorted_ = false;
|
||||
add_count_ = 0;
|
||||
get_count_ = 0;
|
||||
if (merge_count_per_round_ < ObExternalSortConstant::MIN_MULTIPLE_MERGE_COUNT) {
|
||||
ret = common::OB_INVALID_ARGUMENT;
|
||||
STORAGE_LOG(WARN, "invalid argument, invalid memory limit", K(ret),
|
||||
@ -1787,8 +1798,13 @@ int ObExternalSort<T, Compare>::add_item(const T &item)
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
ret = common::OB_NOT_INIT;
|
||||
STORAGE_LOG(WARN, "ObExternalSort has not been inited", K(ret));
|
||||
} else if (OB_UNLIKELY(is_sorted_)) {
|
||||
ret = common::OB_ERR_SYS;
|
||||
STORAGE_LOG(WARN, "should not add item after being sorted", K(ret));
|
||||
} else if (OB_FAIL(memory_sort_round_.add_item(item))) {
|
||||
STORAGE_LOG(WARN, "fail to add item in memory sort round", K(ret));
|
||||
} else {
|
||||
add_count_++;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -1838,6 +1854,9 @@ int ObExternalSort<T, Compare>::do_sort(const bool final_merge)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
is_sorted_ = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1850,6 +1869,12 @@ int ObExternalSort<T, Compare>::get_next_item(const T *&item)
|
||||
STORAGE_LOG(WARN, "ObExternalSort has not been inited", K(ret));
|
||||
} else if (is_empty_) {
|
||||
ret = common::OB_ITER_END;
|
||||
} else if (OB_UNLIKELY(!is_sorted_)) {
|
||||
ret = common::OB_ERR_SYS;
|
||||
STORAGE_LOG(WARN, "direct get next item before being sorted", K(ret));
|
||||
}else if (OB_UNLIKELY(add_count_ < get_count_)) {
|
||||
ret = common::OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "supply count and consume count not match", K(ret), K_(add_count), K_(get_count));
|
||||
} else if (memory_sort_round_.has_data() && memory_sort_round_.is_in_memory()) {
|
||||
if (OB_FAIL(memory_sort_round_.get_next_item(item))) {
|
||||
if (common::OB_ITER_END != ret) {
|
||||
@ -1861,6 +1886,13 @@ int ObExternalSort<T, Compare>::get_next_item(const T *&item)
|
||||
STORAGE_LOG(WARN, "fail to get next item", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
get_count_++;
|
||||
}
|
||||
if (common::OB_ITER_END == ret && add_count_ != get_count_) {
|
||||
ret = common::OB_ERR_SYS;
|
||||
STORAGE_LOG(WARN, "add count not match get count", K(ret), K_(add_count), K_(get_count));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -1878,6 +1910,9 @@ void ObExternalSort<T, Compare>::clean_up()
|
||||
curr_round_ = NULL;
|
||||
next_round_ = NULL;
|
||||
is_empty_ = true;
|
||||
is_sorted_ = false;
|
||||
add_count_ = 0;
|
||||
get_count_ = 0;
|
||||
STORAGE_LOG(INFO, "do external sort clean up");
|
||||
for (int64_t i = 0; i < EXTERNAL_SORT_ROUND_CNT; ++i) {
|
||||
// ignore ret
|
||||
@ -1946,6 +1981,9 @@ int ObExternalSort<T, Compare>::transfer_final_sorted_fragment_iter(
|
||||
} else {
|
||||
merge_sorter.is_empty_ = false;
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
merge_sorter.add_count(add_count_);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user