fix tablet/table compaction hashmap about bucket_num

This commit is contained in:
obdev 2023-03-06 09:14:02 +00:00 committed by ob-robot
parent 3a9f993f37
commit 63b0121241
3 changed files with 64 additions and 61 deletions

View File

@ -174,20 +174,21 @@ int ObChecksumValidatorBase::remove_not_exist_table(
ret = OB_NOT_INIT;
LOG_WARN("not init", KR(ret), K_(tenant_id));
} else {
ObArray<uint64_t> removed_table_ids; // record the table_id which will be removed
hash::ObHashMap<uint64_t, ObTableCompactionInfo>::iterator iter = table_compaction_map.begin();
for (;OB_SUCC(ret) && (iter != table_compaction_map.end()); ++iter) {
const uint64_t cur_table_id = iter->first;
if (!exist_in_table_array(cur_table_id, table_ids)) {
if (OB_FAIL(removed_table_ids.push_back(cur_table_id))) {
LOG_WARN("fail to push back", KR(ret), K(cur_table_id));
SMART_VAR(ObArray<uint64_t>, removed_table_ids) { // record the table_id which will be removed
hash::ObHashMap<uint64_t, ObTableCompactionInfo>::iterator iter = table_compaction_map.begin();
for (;OB_SUCC(ret) && (iter != table_compaction_map.end()); ++iter) {
const uint64_t cur_table_id = iter->first;
if (!exist_in_table_array(cur_table_id, table_ids)) {
if (OB_FAIL(removed_table_ids.push_back(cur_table_id))) {
LOG_WARN("fail to push back", KR(ret), K(cur_table_id));
}
}
} /*end for iter*/
for (int64_t i = 0; (OB_SUCC(ret) && (i < removed_table_ids.count())); ++i) {
const uint64_t table_id = removed_table_ids.at(i);
if (OB_FAIL(table_compaction_map.erase_refactored(table_id))) {
LOG_WARN("fail to erase refactored", KR(ret), K(i), K(table_id));
}
}
} /*end for iter*/
for (int64_t i = 0; (OB_SUCC(ret) && (i < removed_table_ids.count())); ++i) {
const uint64_t table_id = removed_table_ids.at(i);
if (OB_FAIL(table_compaction_map.erase_refactored(table_id))) {
LOG_WARN("fail to erase refactored", KR(ret), K(i), K(table_id));
}
}
}
@ -976,60 +977,61 @@ int ObCrossClusterTabletChecksumValidator::try_update_tablet_checksum_items(
// we need to insert it into __all_tablet_checksum table at last. In this case, if we get this
// tablet's checksum item in table, we can ensure all checksum items have already been inserted.
ObTabletChecksumItem mark_end_item;
ObArray<ObTabletChecksumItem> tablet_checksum_items;
const int64_t item_cnt = items.count();
for (int64_t i = 0; !stop && OB_SUCC(ret) && (i < item_cnt); ++i) {
curr_replica_item.reset();
if (OB_FAIL(curr_replica_item.assign(items.at(i)))) {
LOG_WARN("fail to assign tablet replica checksum item", KR(ret), K(i), "item", items.at(i));
} else if (!curr_replica_item.is_key_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet replica checksum is not valid", KR(ret), K(curr_replica_item));
} else {
if (curr_replica_item.is_same_tablet(prev_replica_item)) { // write one checksum_item per tablet
SMART_VAR(ObArray<ObTabletChecksumItem>, tablet_checksum_items) {
const int64_t item_cnt = items.count();
for (int64_t i = 0; !stop && OB_SUCC(ret) && (i < item_cnt); ++i) {
curr_replica_item.reset();
if (OB_FAIL(curr_replica_item.assign(items.at(i)))) {
LOG_WARN("fail to assign tablet replica checksum item", KR(ret), K(i), "item", items.at(i));
} else if (!curr_replica_item.is_key_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tablet replica checksum is not valid", KR(ret), K(curr_replica_item));
} else {
if (is_first_tablet_in_sys_ls(curr_replica_item)) {
contain_first_tablet_in_sys_ls = true;
if (OB_FAIL(mark_end_item.assign(curr_replica_item))) {
LOG_WARN("fail to assign tablet replica checksum item", KR(ret), K(curr_replica_item));
}
if (curr_replica_item.is_same_tablet(prev_replica_item)) { // write one checksum_item per tablet
} else {
if (OB_FAIL(tmp_checksum_item.assign(curr_replica_item))) {
LOG_WARN("fail to assign tablet replica checksum item", KR(ret), K(curr_replica_item));
} else if (OB_FAIL(tablet_checksum_items.push_back(tmp_checksum_item))) {
LOG_WARN("fail to push back tablet checksum item", KR(ret), K(tmp_checksum_item));
if (is_first_tablet_in_sys_ls(curr_replica_item)) {
contain_first_tablet_in_sys_ls = true;
if (OB_FAIL(mark_end_item.assign(curr_replica_item))) {
LOG_WARN("fail to assign tablet replica checksum item", KR(ret), K(curr_replica_item));
}
} else {
if (OB_FAIL(tmp_checksum_item.assign(curr_replica_item))) {
LOG_WARN("fail to assign tablet replica checksum item", KR(ret), K(curr_replica_item));
} else if (OB_FAIL(tablet_checksum_items.push_back(tmp_checksum_item))) {
LOG_WARN("fail to push back tablet checksum item", KR(ret), K(tmp_checksum_item));
}
}
if (FAILEDx(prev_replica_item.assign_key(curr_replica_item))) {
LOG_WARN("fail to assign key of tablet replica checksum item", KR(ret), K(curr_replica_item));
}
}
if (FAILEDx(prev_replica_item.assign_key(curr_replica_item))) {
LOG_WARN("fail to assign key of tablet replica checksum item", KR(ret), K(curr_replica_item));
}
}
}
if ((item_cnt - 1) == i) { // already iterate all tablet checksum items
if (contain_first_tablet_in_sys_ls) {
if (OB_UNLIKELY(!mark_end_item.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected err about mark_end_item", KR(ret), K(mark_end_item));
} else if (FAILEDx(tablet_checksum_items.push_back(mark_end_item))) {
LOG_WARN("fail to push back tablet checksum item", KR(ret), K(mark_end_item));
if ((item_cnt - 1) == i) { // already iterate all tablet checksum items
if (contain_first_tablet_in_sys_ls) {
if (OB_UNLIKELY(!mark_end_item.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected err about mark_end_item", KR(ret), K(mark_end_item));
} else if (FAILEDx(tablet_checksum_items.push_back(mark_end_item))) {
LOG_WARN("fail to push back tablet checksum item", KR(ret), K(mark_end_item));
}
}
}
if (tablet_checksum_items.count() > 0) {
if (tablet_checksum_items.count() > 0) {
FREEZE_TIME_GUARD;
if (FAILEDx(ObTabletChecksumOperator::update_tablet_checksum_items(*sql_proxy_,
tenant_id_, tablet_checksum_items))) {
LOG_WARN("fail to try update tablet checksum items", KR(ret), K_(tenant_id));
}
}
} else if (tablet_checksum_items.count() >= MAX_BATCH_INSERT_COUNT) {
FREEZE_TIME_GUARD;
if (FAILEDx(ObTabletChecksumOperator::update_tablet_checksum_items(*sql_proxy_,
tenant_id_, tablet_checksum_items))) {
LOG_WARN("fail to try update tablet checksum items", KR(ret), K_(tenant_id));
} else {
tablet_checksum_items.reuse();
}
}
} else if (tablet_checksum_items.count() >= MAX_BATCH_INSERT_COUNT) {
FREEZE_TIME_GUARD;
if (FAILEDx(ObTabletChecksumOperator::update_tablet_checksum_items(*sql_proxy_,
tenant_id_, tablet_checksum_items))) {
LOG_WARN("fail to try update tablet checksum items", KR(ret), K_(tenant_id));
} else {
tablet_checksum_items.reuse();
}
}
}
}
@ -1469,7 +1471,7 @@ int ObIndexChecksumValidator::try_print_first_unverified_info(
if (OB_FAIL(table_schema->get_simple_index_infos(simple_index_infos_array, true))) {
LOG_WARN("fail to get simple index infos array", KR(ret), KPC(table_schema));
} else {
for (int i = 0; OB_SUCC(ret) && (i < simple_index_infos_array.count()); ++i) {
for (int64_t i = 0; OB_SUCC(ret) && (i < simple_index_infos_array.count()); ++i) {
ObTableCompactionInfo index_compaction_info;
const uint64_t index_table_id = simple_index_infos_array.at(i).table_id_;
if (OB_FAIL(table_compaction_map.get_refactored(index_table_id, index_compaction_info))) {

View File

@ -51,16 +51,15 @@ int ObMajorMergeProgressChecker::init(
ObIServerTrace &server_trace)
{
int ret = OB_SUCCESS;
const int64_t DEFAULT_TABLET_CNT = 8;
const int64_t DEFAULT_TABLE_CNT = 128;
const int64_t DEFAULT_MAP_BUCKET_CNT = 10000;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", KR(ret));
} else if (OB_FAIL(tablet_compaction_map_.create(DEFAULT_TABLET_CNT, "MFTatCompactMap",
} else if (OB_FAIL(tablet_compaction_map_.create(DEFAULT_MAP_BUCKET_CNT, "MFTatCompactMap",
"MFTatCompactMap", tenant_id))) {
LOG_WARN("fail to create tablet compaction status map", KR(ret), K(tenant_id), K(DEFAULT_TABLET_CNT));
} else if (OB_FAIL(table_compaction_map_.create(DEFAULT_TABLE_CNT, "MFTbCompMap", "MFTbCompMap", tenant_id))) {
LOG_WARN("fail to create table compaction status map", KR(ret), K(tenant_id), K(DEFAULT_TABLE_CNT));
LOG_WARN("fail to create tablet compaction status map", KR(ret), K(tenant_id), K(DEFAULT_MAP_BUCKET_CNT));
} else if (OB_FAIL(table_compaction_map_.create(DEFAULT_MAP_BUCKET_CNT, "MFTbCompMap", "MFTbCompMap", tenant_id))) {
LOG_WARN("fail to create table compaction status map", KR(ret), K(tenant_id), K(DEFAULT_MAP_BUCKET_CNT));
} else if (OB_FAIL(tablet_validator_.init(tenant_id, is_primary_service, sql_proxy, zone_merge_mgr))) {
LOG_WARN("fail to init tablet validator", KR(ret), K(tenant_id));
} else if (OB_FAIL(index_validator_.init(tenant_id, is_primary_service, sql_proxy, zone_merge_mgr))) {

View File

@ -706,6 +706,8 @@ int ObMajorMergeScheduler::try_update_global_merged_scn(const int64_t expected_e
K(global_broadcast_scn_val));
} else if (OB_FAIL(zone_merge_mgr_->try_update_global_last_merged_scn(expected_epoch))) {
LOG_WARN("try update global last_merged_scn failed", KR(ret), K(expected_epoch));
} else if (OB_FAIL(progress_checker_.prepare_handle())) { // free memory of compaction_map
LOG_WARN("fail to do prepare handle of progress checker", KR(ret), K(expected_epoch));
} else {
ROOTSERVICE_EVENT_ADD("daily_merge", "global_merged", K_(tenant_id),
"global_broadcast_scn", global_broadcast_scn_val);