fix ObTabletReplicaChecksumOperator::batch_get() function
This commit is contained in:
@ -112,7 +112,7 @@ int ObTabletReplicaChecksumIterator::fetch_next_batch()
|
|||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
checksum_items_.reuse();
|
checksum_items_.reuse();
|
||||||
if (OB_FAIL(ObTabletReplicaChecksumOperator::batch_get(tenant_id_, start_pair,
|
if (OB_FAIL(ObTabletReplicaChecksumOperator::batch_get(tenant_id_, start_pair,
|
||||||
BATCH_FETCH_COUNT, compaction_scn_, *sql_proxy_, checksum_items_))) {
|
compaction_scn_, *sql_proxy_, checksum_items_))) {
|
||||||
LOG_WARN("fail to get batch checksums", KR(ret), K_(tenant_id), K(start_pair), K_(compaction_scn));
|
LOG_WARN("fail to get batch checksums", KR(ret), K_(tenant_id), K(start_pair), K_(compaction_scn));
|
||||||
} else if (OB_UNLIKELY(0 == checksum_items_.count())) {
|
} else if (OB_UNLIKELY(0 == checksum_items_.count())) {
|
||||||
ret = OB_ITER_END;
|
ret = OB_ITER_END;
|
||||||
|
|||||||
@ -477,76 +477,45 @@ int ObTabletReplicaChecksumOperator::remove_residual_checksum(
|
|||||||
int ObTabletReplicaChecksumOperator::batch_get(
|
int ObTabletReplicaChecksumOperator::batch_get(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const ObTabletLSPair &start_pair,
|
const ObTabletLSPair &start_pair,
|
||||||
const int64_t batch_cnt,
|
|
||||||
const SCN &compaction_scn,
|
const SCN &compaction_scn,
|
||||||
ObISQLClient &sql_proxy,
|
ObISQLClient &sql_proxy,
|
||||||
ObIArray<ObTabletReplicaChecksumItem> &items)
|
ObIArray<ObTabletReplicaChecksumItem> &items)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
ObSqlString sql;
|
ObSqlString sql;
|
||||||
int64_t remain_cnt = batch_cnt;
|
if (OB_FAIL(construct_batch_get_sql_str_(tenant_id, start_pair, MAX_BATCH_COUNT,
|
||||||
int64_t ori_items_cnt = 0;
|
compaction_scn, sql))) {
|
||||||
|
LOG_WARN("fail to construct load sql", KR(ret), K(tenant_id), K(start_pair), K(compaction_scn));
|
||||||
while (OB_SUCC(ret) && (remain_cnt > 0)) {
|
} else if (OB_FAIL(batch_get(tenant_id, sql, sql_proxy, items))) {
|
||||||
sql.reuse();
|
LOG_WARN("fail to batch get tablet replica checksum items", KR(ret), K(tenant_id), K(sql));
|
||||||
const int64_t limit_cnt = ((remain_cnt >= MAX_BATCH_COUNT) ? MAX_BATCH_COUNT : remain_cnt);
|
} else {
|
||||||
ori_items_cnt = items.count();
|
const int64_t items_cnt = items.count();
|
||||||
ObTabletLSPair last_pair;
|
if (MAX_BATCH_COUNT == items_cnt) {
|
||||||
if (remain_cnt == batch_cnt) {
|
// in case the checksum of three replica belong to one tablet, were split into two batch-get,
|
||||||
last_pair = start_pair;
|
// we will remove the last several item which belong to one tablet
|
||||||
} else {
|
// if current round item's count less than limit_cnt, it means already to the end, no need to handle.
|
||||||
if (ori_items_cnt > 0) {
|
int64_t tmp_items_cnt = items_cnt;
|
||||||
const ObTabletReplicaChecksumItem &last_item = items.at(ori_items_cnt - 1);
|
ObTabletReplicaChecksumItem tmp_item;
|
||||||
if (OB_FAIL(last_pair.init(last_item.tablet_id_, last_item.ls_id_))) {
|
if (OB_FAIL(tmp_item.assign_key(items.at(tmp_items_cnt - 1)))) {
|
||||||
LOG_WARN("fail to init last tablet_ls_pair", KR(ret), K(last_item));
|
LOG_WARN("fail to assign key", KR(ret), "tmp_item", items.at(tmp_items_cnt - 1));
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("err unexpected, about tablet replica items count", KR(ret), K(tenant_id), K(start_pair),
|
|
||||||
K(batch_cnt), K(remain_cnt), K(ori_items_cnt));
|
|
||||||
}
|
}
|
||||||
}
|
while (OB_SUCC(ret) && (tmp_items_cnt > 0)) {
|
||||||
|
if (tmp_item.is_same_tablet(items.at(tmp_items_cnt - 1))) {
|
||||||
if (FAILEDx(construct_batch_get_sql_str_(tenant_id, last_pair, limit_cnt, compaction_scn, sql))) {
|
if (OB_FAIL(items.remove(tmp_items_cnt - 1))) {
|
||||||
LOG_WARN("fail to construct load sql", KR(ret), K(tenant_id), K(last_pair), K(limit_cnt),
|
LOG_WARN("fail to remove item from array", KR(ret), K(tmp_items_cnt), K(items));
|
||||||
K(compaction_scn));
|
|
||||||
} else if (OB_FAIL(batch_get(tenant_id, sql, sql_proxy, items))) {
|
|
||||||
LOG_WARN("fail to batch get tablet replica checksum items", KR(ret), K(tenant_id), K(sql));
|
|
||||||
} else {
|
|
||||||
const int64_t curr_items_cnt = items.count();
|
|
||||||
if (curr_items_cnt - ori_items_cnt == limit_cnt) {
|
|
||||||
// in case the checksum of three replica belong to one tablet, were split into two batch-get,
|
|
||||||
// we will remove the last several item which belong to one tablet
|
|
||||||
// if current round item's count less than limit_cnt, it means already to the end, no need to handle.
|
|
||||||
int64_t tmp_items_cnt = curr_items_cnt;
|
|
||||||
ObTabletReplicaChecksumItem tmp_item;
|
|
||||||
if (OB_FAIL(tmp_item.assign_key(items.at(tmp_items_cnt - 1)))) {
|
|
||||||
LOG_WARN("fail to assign key", KR(ret), "tmp_item", items.at(tmp_items_cnt - 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
while (OB_SUCC(ret) && (tmp_items_cnt > 0)) {
|
|
||||||
if (tmp_item.is_same_tablet(items.at(tmp_items_cnt - 1))) {
|
|
||||||
if (OB_FAIL(items.remove(tmp_items_cnt - 1))) {
|
|
||||||
LOG_WARN("fail to remove item from array", KR(ret), K(tmp_items_cnt), K(items));
|
|
||||||
} else {
|
|
||||||
--tmp_items_cnt;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
break;
|
--tmp_items_cnt;
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
|
||||||
if (tmp_items_cnt == 0) {
|
|
||||||
ret = OB_ERR_UNEXPECTED;
|
|
||||||
LOG_WARN("unexpected err about item count", KR(ret), K(tmp_item), K(curr_items_cnt), K(ori_items_cnt));
|
|
||||||
} else {
|
|
||||||
remain_cnt -= limit_cnt;
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
if (tmp_items_cnt == 0) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("unexpected err about item count", KR(ret), K(tmp_item), K(items_cnt));
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
remain_cnt = 0; // already get all checksum item, finish batch_get
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -104,10 +104,12 @@ public:
|
|||||||
class ObTabletReplicaChecksumOperator
|
class ObTabletReplicaChecksumOperator
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
// This function is specifically designed for ObTabletReplicaChecksumIterator.
|
||||||
|
// This function would remove the last several checksum items in some cases.
|
||||||
|
// Please do not call this function in any other place, except ObTabletReplicaChecksumIterator.
|
||||||
static int batch_get(
|
static int batch_get(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const ObTabletLSPair &start_pair,
|
const ObTabletLSPair &start_pair,
|
||||||
const int64_t batch_cnt,
|
|
||||||
const SCN &compaction_scn,
|
const SCN &compaction_scn,
|
||||||
common::ObISQLClient &sql_proxy,
|
common::ObISQLClient &sql_proxy,
|
||||||
common::ObIArray<ObTabletReplicaChecksumItem> &items);
|
common::ObIArray<ObTabletReplicaChecksumItem> &items);
|
||||||
|
|||||||
Reference in New Issue
Block a user