maintain supply and consume count to ensure all item consume before external sort
This commit is contained in:
@ -1424,7 +1424,9 @@ ObBackupTabletProvider::ObBackupTabletProvider()
|
|||||||
backup_item_cmp_(sort_ret_),
|
backup_item_cmp_(sort_ret_),
|
||||||
meta_index_store_(),
|
meta_index_store_(),
|
||||||
prev_item_(),
|
prev_item_(),
|
||||||
has_prev_item_(false)
|
has_prev_item_(false),
|
||||||
|
supply_count_(0),
|
||||||
|
consume_count_(0)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
ObBackupTabletProvider::~ObBackupTabletProvider()
|
ObBackupTabletProvider::~ObBackupTabletProvider()
|
||||||
@ -1565,9 +1567,14 @@ int ObBackupTabletProvider::inner_get_batch_items_(
|
|||||||
if (OB_ITER_END == ret) {
|
if (OB_ITER_END == ret) {
|
||||||
ret = OB_SUCCESS;
|
ret = OB_SUCCESS;
|
||||||
external_sort_.clean_up();
|
external_sort_.clean_up();
|
||||||
if (OB_FAIL(external_sort_.init(
|
if (supply_count_ != consume_count_) {
|
||||||
|
ret = OB_ERR_SYS;
|
||||||
|
LOG_WARN("reinit external sort before all consume", K(ret), K_(meet_end), K_(param), K(supply_count_), K(consume_count_), K_(has_prev_item), K_(prev_item));
|
||||||
|
} else if (OB_FAIL(external_sort_.init(
|
||||||
BUF_MEM_LIMIT, FILE_BUF_SIZE, EXPIRE_TIMESTAMP, OB_SYS_TENANT_ID, &backup_item_cmp_))) {
|
BUF_MEM_LIMIT, FILE_BUF_SIZE, EXPIRE_TIMESTAMP, OB_SYS_TENANT_ID, &backup_item_cmp_))) {
|
||||||
LOG_WARN("failed to init external sort", K(ret));
|
LOG_WARN("failed to init external sort", K(ret));
|
||||||
|
} else {
|
||||||
|
LOG_INFO("reinit external sort", K(consume_count_), K(supply_count_));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
@ -1583,7 +1590,7 @@ int ObBackupTabletProvider::inner_get_batch_items_(
|
|||||||
} else {
|
} else {
|
||||||
has_prev_item_ = true;
|
has_prev_item_ = true;
|
||||||
prev_item_ = *next_item;
|
prev_item_ = *next_item;
|
||||||
|
consume_count_++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG_INFO("inner get batch item", K(items), K_(backup_data_type));
|
LOG_INFO("inner get batch item", K(items), K_(backup_data_type));
|
||||||
@ -1594,33 +1601,41 @@ int ObBackupTabletProvider::prepare_batch_tablet_(const uint64_t tenant_id, cons
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int64_t total_count = 0;
|
int64_t total_count = 0;
|
||||||
while (OB_SUCC(ret) && total_count < BATCH_SIZE) {
|
const bool need_prepare = (consume_count_ == supply_count_);
|
||||||
ObTabletID tablet_id;
|
if (need_prepare) {
|
||||||
int64_t count = 0;
|
while (OB_SUCC(ret) && total_count < BATCH_SIZE) {
|
||||||
if (OB_ISNULL(ls_backup_ctx_)) {
|
ObTabletID tablet_id;
|
||||||
ret = OB_ERR_UNEXPECTED;
|
int64_t count = 0;
|
||||||
LOG_WARN("log stream backup ctx should not be null", K(ret));
|
if (OB_ISNULL(ls_backup_ctx_)) {
|
||||||
} else if (OB_FAIL(ls_backup_ctx_->next(tablet_id))) {
|
ret = OB_ERR_UNEXPECTED;
|
||||||
if (OB_ITER_END == ret) {
|
LOG_WARN("log stream backup ctx should not be null", K(ret));
|
||||||
meet_end_ = true;
|
} else if (OB_FAIL(ls_backup_ctx_->next(tablet_id))) {
|
||||||
LOG_INFO("tablet meet end", K(ret), K(tenant_id), K(ls_id), K_(backup_data_type));
|
if (OB_ITER_END == ret) {
|
||||||
ret = OB_SUCCESS;
|
meet_end_ = true;
|
||||||
break;
|
LOG_INFO("tablet meet end", K(ret), K(tenant_id), K(ls_id), K_(backup_data_type));
|
||||||
|
ret = OB_SUCCESS;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
LOG_WARN("failed to get next tablet", K(ret));
|
||||||
|
}
|
||||||
|
} else if (OB_FAIL(prepare_tablet_(tenant_id, ls_id, tablet_id, backup_data_type_, count))) {
|
||||||
|
LOG_WARN("failed to prepare tablet", K(ret), K(tenant_id), K(ls_id), K(tablet_id));
|
||||||
|
} else if (OB_FAIL(ObBackupUtils::check_ls_valid_for_backup(tenant_id, ls_id, ls_backup_ctx_->rebuild_seq_))) {
|
||||||
|
LOG_WARN("failed to check ls valid for backup", K(ret), K(tenant_id), K(ls_id));
|
||||||
} else {
|
} else {
|
||||||
LOG_WARN("failed to get next tablet", K(ret));
|
supply_count_ += count;
|
||||||
|
total_count += count;
|
||||||
}
|
}
|
||||||
} else if (OB_FAIL(prepare_tablet_(tenant_id, ls_id, tablet_id, backup_data_type_, count))) {
|
|
||||||
LOG_WARN("failed to prepare tablet", K(ret), K(tenant_id), K(ls_id), K(tablet_id));
|
|
||||||
} else if (OB_FAIL(ObBackupUtils::check_ls_valid_for_backup(tenant_id, ls_id, ls_backup_ctx_->rebuild_seq_))) {
|
|
||||||
LOG_WARN("failed to check ls valid for backup", K(ret), K(tenant_id), K(ls_id));
|
|
||||||
} else {
|
|
||||||
total_count += count;
|
|
||||||
}
|
}
|
||||||
}
|
if (OB_SUCC(ret)) {
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_FAIL(external_sort_.do_sort(true /*final_merge*/))) {
|
||||||
if (OB_FAIL(external_sort_.do_sort(true /*final_merge*/))) {
|
LOG_WARN("failed to do external sort", K(ret));
|
||||||
LOG_WARN("failed to do external sort", K(ret));
|
} else {
|
||||||
|
LOG_INFO("do sort", K(consume_count_), K(supply_count_));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
LOG_INFO("no need prepare now", K(tenant_id), K(ls_id), K(consume_count_), K(supply_count_));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -1921,10 +1936,11 @@ int ObBackupTabletProvider::fetch_all_logic_macro_block_id_(const common::ObTabl
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
if (OB_FAIL(add_macro_block_id_item_list_(tablet_id, table_key, id_array))) {
|
int64_t added_count = 0;
|
||||||
|
if (OB_FAIL(add_macro_block_id_item_list_(tablet_id, table_key, id_array, added_count))) {
|
||||||
LOG_WARN("failed to add macro block id list", K(ret), K(tablet_id), K(table_key), K(id_array));
|
LOG_WARN("failed to add macro block id list", K(ret), K(tablet_id), K(table_key), K(id_array));
|
||||||
} else if (id_array.count() > 0) {
|
} else if (id_array.count() > 0) {
|
||||||
total_count += id_array.count();
|
total_count += added_count;
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -1938,9 +1954,10 @@ int ObBackupTabletProvider::fetch_all_logic_macro_block_id_(const common::ObTabl
|
|||||||
}
|
}
|
||||||
|
|
||||||
int ObBackupTabletProvider::add_macro_block_id_item_list_(const common::ObTabletID &tablet_id,
|
int ObBackupTabletProvider::add_macro_block_id_item_list_(const common::ObTabletID &tablet_id,
|
||||||
const ObITable::TableKey &table_key, const common::ObIArray<ObBackupMacroBlockId> &list)
|
const ObITable::TableKey &table_key, const common::ObIArray<ObBackupMacroBlockId> &list, int64_t &added_count)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
added_count = 0;
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < list.count(); ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < list.count(); ++i) {
|
||||||
const ObBackupMacroBlockId ¯o_id = list.at(i);
|
const ObBackupMacroBlockId ¯o_id = list.at(i);
|
||||||
ObBackupProviderItem item;
|
ObBackupProviderItem item;
|
||||||
@ -1957,6 +1974,7 @@ int ObBackupTabletProvider::add_macro_block_id_item_list_(const common::ObTablet
|
|||||||
} else if (OB_FAIL(external_sort_.add_item(item))) {
|
} else if (OB_FAIL(external_sort_.add_item(item))) {
|
||||||
LOG_WARN("failed to add item", KR(ret), K(item));
|
LOG_WARN("failed to add item", KR(ret), K(item));
|
||||||
} else {
|
} else {
|
||||||
|
added_count += 1;
|
||||||
LOG_INFO("add macro block id", K(tablet_id), K(table_key), K(macro_id));
|
LOG_INFO("add macro block id", K(tablet_id), K(table_key), K(macro_id));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -198,7 +198,7 @@ public:
|
|||||||
int deep_copy(const ObBackupProviderItem &src, char *buf, int64_t len, int64_t &pos);
|
int deep_copy(const ObBackupProviderItem &src, char *buf, int64_t len, int64_t &pos);
|
||||||
bool is_valid() const;
|
bool is_valid() const;
|
||||||
void reset();
|
void reset();
|
||||||
TO_STRING_KV(K_(item_type), K_(logic_id), K_(table_key), K_(tablet_id), K_(nested_offset), K_(nested_size));
|
TO_STRING_KV(K_(item_type), K_(logic_id), K_(table_key), K_(tablet_id), K_(nested_offset), K_(nested_size), K_(timestamp));
|
||||||
NEED_SERIALIZE_AND_DESERIALIZE;
|
NEED_SERIALIZE_AND_DESERIALIZE;
|
||||||
private:
|
private:
|
||||||
// for parallel external sort serialization restriction
|
// for parallel external sort serialization restriction
|
||||||
@ -214,6 +214,7 @@ private:
|
|||||||
common::ObTabletID tablet_id_; // logic_id_.tablet_id_ may not equal to tablet_id_
|
common::ObTabletID tablet_id_; // logic_id_.tablet_id_ may not equal to tablet_id_
|
||||||
int64_t nested_offset_;
|
int64_t nested_offset_;
|
||||||
int64_t nested_size_;
|
int64_t nested_size_;
|
||||||
|
int64_t timestamp_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class ObBackupProviderItemCompare {
|
class ObBackupProviderItemCompare {
|
||||||
@ -282,7 +283,7 @@ private:
|
|||||||
int fetch_all_logic_macro_block_id_(const common::ObTabletID &tablet_id, const storage::ObTabletHandle &tablet_handle,
|
int fetch_all_logic_macro_block_id_(const common::ObTabletID &tablet_id, const storage::ObTabletHandle &tablet_handle,
|
||||||
const storage::ObITable::TableKey &table_key, const blocksstable::ObSSTable &sstable, int64_t &total_count);
|
const storage::ObITable::TableKey &table_key, const blocksstable::ObSSTable &sstable, int64_t &total_count);
|
||||||
int add_macro_block_id_item_list_(const common::ObTabletID &tablet_id, const storage::ObITable::TableKey &table_key,
|
int add_macro_block_id_item_list_(const common::ObTabletID &tablet_id, const storage::ObITable::TableKey &table_key,
|
||||||
const common::ObIArray<ObBackupMacroBlockId> &list);
|
const common::ObIArray<ObBackupMacroBlockId> &list, int64_t &added_count);
|
||||||
int add_sstable_item_(const common::ObTabletID &tablet_id);
|
int add_sstable_item_(const common::ObTabletID &tablet_id);
|
||||||
int add_tablet_item_(const common::ObTabletID &tablet_id);
|
int add_tablet_item_(const common::ObTabletID &tablet_id);
|
||||||
int remove_duplicates_(common::ObIArray<ObBackupProviderItem> &array);
|
int remove_duplicates_(common::ObIArray<ObBackupProviderItem> &array);
|
||||||
@ -322,6 +323,8 @@ private:
|
|||||||
ObBackupMetaIndexStore meta_index_store_;
|
ObBackupMetaIndexStore meta_index_store_;
|
||||||
ObBackupProviderItem prev_item_;
|
ObBackupProviderItem prev_item_;
|
||||||
bool has_prev_item_;
|
bool has_prev_item_;
|
||||||
|
int supply_count_;
|
||||||
|
int consume_count_;
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObBackupTabletProvider);
|
DISALLOW_COPY_AND_ASSIGN(ObBackupTabletProvider);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user