fix major freeze about restore and freeze_info gc
This commit is contained in:
parent
a51bfc97fd
commit
828836002c
@ -1109,16 +1109,29 @@ int ObCrossClusterTabletChecksumValidator::check_if_all_tablet_checksum_exist(
|
||||
const SCN &frozen_scn)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_exist = false;
|
||||
if (is_all_tablet_checksum_exist_) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(ObTabletChecksumOperator::is_first_tablet_in_sys_ls_exist(*sql_proxy_,
|
||||
tenant_id_, frozen_scn, is_exist))) {
|
||||
LOG_WARN("fail to check is first tablet in first ls exist", KR(ret), K_(tenant_id), K(frozen_scn));
|
||||
} else {
|
||||
// update is_all_tablet_checksum_exist_ according to the result of
|
||||
// ObTabletChecksumOperator::is_first_tablet_in_sys_ls_exist
|
||||
is_all_tablet_checksum_exist_ = is_exist;
|
||||
// check only once every 10 seconds
|
||||
if (TC_REACH_TIME_INTERVAL(10 * 1000 * 1000)) { // 10s
|
||||
bool is_sync = false;
|
||||
ObFreezeInfoProxy freeze_info_proxy(tenant_id_);
|
||||
ObArray<uint64_t> frozen_scn_vals;
|
||||
if (is_all_tablet_checksum_exist_) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(freeze_info_proxy.get_frozen_scn_larger_or_equal_than(
|
||||
*sql_proxy_, frozen_scn, frozen_scn_vals))) {
|
||||
LOG_WARN("fail to get frozen scn", KR(ret), K_(tenant_id), K(frozen_scn));
|
||||
} else if (OB_UNLIKELY(frozen_scn_vals.count() <= 0)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("no larger frozen scn exists", KR(ret), K_(tenant_id), K(frozen_scn));
|
||||
} else if (OB_FAIL(ObTabletChecksumOperator::is_all_tablet_checksum_sync(*sql_proxy_,
|
||||
tenant_id_, frozen_scn_vals, is_sync))) {
|
||||
LOG_WARN("fail to check is first tablet in first ls exist", KR(ret), K_(tenant_id), K(frozen_scn));
|
||||
} else {
|
||||
// update is_all_tablet_checksum_exist_ according to the result of
|
||||
// ObTabletChecksumOperator::is_all_tablet_checksum_sync
|
||||
is_all_tablet_checksum_exist_ = is_sync;
|
||||
LOG_INFO("succ to check if all tablet checksum exist", K_(tenant_id), K(frozen_scn),
|
||||
K_(is_all_tablet_checksum_exist));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -350,7 +350,10 @@ int ObFreezeInfoDetector::try_adjust_global_merge_info(const int64_t expected_ep
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool is_initial = false;
|
||||
if (!is_primary_service() && !is_global_merge_info_adjusted_) {
|
||||
// both primary and standby tenants should adjust global_merge_info to skip unnecessary major freeze
|
||||
// primary tenants:
|
||||
// standby tenants:
|
||||
if (!is_global_merge_info_adjusted_) {
|
||||
bool is_restore = false;
|
||||
if (OB_FAIL(check_tenant_is_restore(tenant_id_, is_restore))) {
|
||||
LOG_WARN("fail to check tenant is restore", KR(ret), K_(tenant_id), K_(is_primary_service));
|
||||
@ -367,9 +370,12 @@ int ObFreezeInfoDetector::try_adjust_global_merge_info(const int64_t expected_ep
|
||||
LOG_WARN("fail to try adjust global merge info, freeze info manager is null", KR(ret),
|
||||
K_(tenant_id), K_(is_primary_service));
|
||||
} else if (OB_FAIL(freeze_info_mgr_->adjust_global_merge_info(expected_epoch))) {
|
||||
LOG_WARN("fail to adjust global merge info", KR(ret), K_(tenant_id), K_(is_primary_service));
|
||||
LOG_WARN("fail to adjust global merge info", KR(ret), K_(tenant_id), K_(is_primary_service),
|
||||
K(expected_epoch));
|
||||
} else {
|
||||
is_global_merge_info_adjusted_ = true;
|
||||
LOG_INFO("succ to adjust global merge info", K_(tenant_id), K_(is_primary_service),
|
||||
K(expected_epoch));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -184,7 +184,29 @@ int ObMajorMergeProgressChecker::handle_table_with_first_tablet_in_sys_ls(
|
||||
LOG_WARN("fail to get refactored", KR(ret), K(major_merge_special_table_id));
|
||||
} else if (OB_FAIL(cross_cluster_validator_.write_tablet_checksum_at_table_level(stop, pairs,
|
||||
global_broadcast_scn, cur_compaction_info, major_merge_special_table_id, expected_epoch))) {
|
||||
LOG_WARN("fail to write tablet checksum at table level", KR(ret), K_(tenant_id), K(pairs));
|
||||
if (OB_ITEM_NOT_MATCH == ret) {
|
||||
bool is_exist = false;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_TMP_FAIL(ObTabletReplicaChecksumOperator::is_higher_ver_tablet_rep_ckm_exist(
|
||||
tenant_id_, global_broadcast_scn, major_merge_special_table_id, *sql_proxy_, is_exist))) {
|
||||
LOG_WARN("fail to check is higher version tablet replica checksum exist", KR(tmp_ret),
|
||||
K_(tenant_id), K(global_broadcast_scn), K(major_merge_special_table_id));
|
||||
} else if (is_exist) {
|
||||
// 1. one restore standby tenant switchover to primary tenant, launch one lower version
|
||||
// of major compaction, tablet replica checksum is overwritten by higher version.
|
||||
// 2. one lower version of major compaction is not finished, another higher version of
|
||||
// medium compaction is launched, leading to tablet replica checksum is overwritten by
|
||||
// higher version.
|
||||
LOG_ERROR("already exist higher version tablet checksum of first table", KR(ret),
|
||||
K(global_broadcast_scn), K(major_merge_special_table_id), K(expected_epoch));
|
||||
ret = OB_SUCCESS; // ignore ret, so as to let this round of major freeze finish
|
||||
} else {
|
||||
LOG_ERROR("no higher version tablet checksum of first table exist", KR(ret),
|
||||
K(global_broadcast_scn), K(major_merge_special_table_id), K(expected_epoch));
|
||||
}
|
||||
} else {
|
||||
LOG_WARN("fail to write tablet checksum at table level", KR(ret), K_(tenant_id), K(pairs));
|
||||
}
|
||||
} else if (OB_FAIL(ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
||||
tenant_id_, global_broadcast_scn.get_val_for_tx(),
|
||||
pairs, ObTabletReplica::ScnStatus::SCN_STATUS_ERROR, expected_epoch))) {
|
||||
|
@ -496,15 +496,12 @@ int ObMajorMergeScheduler::update_merge_status(const int64_t expected_epoch)
|
||||
|
||||
ObAllZoneMergeProgress all_progress;
|
||||
SCN global_broadcast_scn;
|
||||
ObSimpleFrozenStatus frozen_status;
|
||||
DEBUG_SYNC(RS_VALIDATE_CHECKSUM);
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not inited", KR(ret));
|
||||
} else if (OB_FAIL(zone_merge_mgr_->get_global_broadcast_scn(global_broadcast_scn))) {
|
||||
LOG_WARN("fail to get_global_broadcast_scn", KR(ret), K_(tenant_id));
|
||||
} else if (OB_FAIL(freeze_info_mgr_->get_freeze_info(global_broadcast_scn, frozen_status))) {
|
||||
LOG_WARN("fail to get freeze info", KR(ret), K_(tenant_id), K(global_broadcast_scn));
|
||||
} else if (OB_FAIL(progress_checker_.check_merge_progress(stop_, global_broadcast_scn,
|
||||
all_progress, expected_epoch))) {
|
||||
LOG_WARN("fail to check merge status", KR(ret), K_(tenant_id), K(global_broadcast_scn), K(expected_epoch));
|
||||
|
@ -169,6 +169,52 @@ int ObFreezeInfoProxy::get_freeze_info_larger_or_equal_than(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObFreezeInfoProxy::get_frozen_scn_larger_or_equal_than(
|
||||
ObISQLClient &sql_proxy,
|
||||
const SCN &frozen_scn,
|
||||
ObIArray<uint64_t> &frozen_scn_vals)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSqlString sql;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
ObMySQLResult *result = nullptr;
|
||||
const uint64_t frozen_scn_val = frozen_scn.get_val_for_inner_table_field();
|
||||
if (OB_FAIL(sql.assign_fmt("SELECT frozen_scn FROM %s WHERE frozen_scn >= %lu ORDER BY frozen_scn",
|
||||
OB_ALL_FREEZE_INFO_TNAME, frozen_scn_val))) {
|
||||
LOG_WARN("fail to append sql", KR(ret), K_(tenant_id), K(frozen_scn));
|
||||
} else if (OB_FAIL(sql_proxy.read(res, tenant_id_, sql.ptr()))) {
|
||||
LOG_WARN("fail to execute sql", KR(ret), K(sql), K_(tenant_id));
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get sql result", KR(ret), K(sql), K_(tenant_id));
|
||||
} else {
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(result->next())) {
|
||||
if (OB_ITER_END != ret) {
|
||||
LOG_WARN("fail to get next row", KR(ret), K_(tenant_id));
|
||||
}
|
||||
} else {
|
||||
uint64_t frozen_scn_val = OB_INVALID_SCN_VAL;
|
||||
EXTRACT_UINT_FIELD_MYSQL(*result, "frozen_scn", frozen_scn_val, uint64_t);
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_UNLIKELY(OB_INVALID_SCN_VAL == frozen_scn_val)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("invalid frozen scn val", KR(ret), K(frozen_scn_val), K_(tenant_id), K(sql));
|
||||
} else if (OB_FAIL(frozen_scn_vals.push_back(frozen_scn_val))) {
|
||||
LOG_WARN("fail to push back", KR(ret), K(frozen_scn_val), K_(tenant_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_ITER_END == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG_INFO("finish load frozen scn", KR(ret), K_(tenant_id), K(sql));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObFreezeInfoProxy::get_max_frozen_scn_smaller_or_equal_than(
|
||||
ObISQLClient &sql_proxy,
|
||||
const SCN &compaction_scn,
|
||||
|
@ -133,6 +133,11 @@ public:
|
||||
const SCN &frozen_scn,
|
||||
common::ObIArray<ObSimpleFrozenStatus> &frozen_statuses);
|
||||
|
||||
int get_frozen_scn_larger_or_equal_than(
|
||||
common::ObISQLClient &sql_proxy,
|
||||
const SCN &frozen_scn,
|
||||
common::ObIArray<uint64_t> &frozen_scn_vals);
|
||||
|
||||
// get the maximum frozen_scn which is smaller than or equal to the given @compaction_scn
|
||||
int get_max_frozen_scn_smaller_or_equal_than(
|
||||
common::ObISQLClient &sql_proxy,
|
||||
|
@ -686,6 +686,76 @@ int ObTabletChecksumOperator::is_first_tablet_in_sys_ls_exist(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletChecksumOperator::is_all_tablet_checksum_sync(
|
||||
ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
ObIArray<uint64_t> &frozen_scn_vals,
|
||||
bool &is_sync)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t frozen_scn_vals_cnt = frozen_scn_vals.count();
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || (frozen_scn_vals_cnt <= 0))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid arguments", KR(ret), K(tenant_id), K(frozen_scn_vals_cnt));
|
||||
} else {
|
||||
is_sync = false;
|
||||
const uint64_t extract_tenant_id = 0;
|
||||
// split into several batches, so as to avoid the sql too long
|
||||
const int64_t batch_cnt = 100;
|
||||
int64_t start_idx = 0;
|
||||
int64_t end_idx = min(batch_cnt, frozen_scn_vals_cnt);
|
||||
while (OB_SUCC(ret) && !is_sync && (start_idx < end_idx)) {
|
||||
ObSqlString sql;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
ObMySQLResult *result = nullptr;
|
||||
if (OB_FAIL(sql.append_fmt("SELECT COUNT(*) AS cnt FROM %s WHERE tenant_id = '%lu' AND "
|
||||
"compaction_scn IN (", OB_ALL_TABLET_CHECKSUM_TNAME, extract_tenant_id))) {
|
||||
LOG_WARN("fail to append sql", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
for (int64_t i = start_idx; (i < end_idx) && OB_SUCC(ret); ++i) {
|
||||
if (OB_FAIL(sql.append_fmt("%lu%s", frozen_scn_vals.at(i),
|
||||
(i == (end_idx - 1)) ? "" : ","))) {
|
||||
LOG_WARN("fail to append sql", KR(ret), K(tenant_id));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(sql.append_fmt(") AND tablet_id = %lu AND ls_id = %ld",
|
||||
ObTabletID::MIN_VALID_TABLET_ID, ObLSID::SYS_LS_ID))) {
|
||||
LOG_WARN("fail to append sql", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(sql_client.read(res, tenant_id, sql.ptr()))) {
|
||||
LOG_WARN("fail to execute sql", KR(ret), K(tenant_id), K(tenant_id), K(sql));
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get sql result", KR(ret), K(tenant_id), K(sql));
|
||||
} else if (OB_FAIL(result->next())) {
|
||||
LOG_WARN("get next result failed", KR(ret), K(tenant_id), K(sql));
|
||||
} else {
|
||||
int64_t cnt = 0;
|
||||
EXTRACT_INT_FIELD_MYSQL(*result, "cnt", cnt, int64_t);
|
||||
if (OB_SUCC(ret)) {
|
||||
if (cnt >= 1) {
|
||||
is_sync = true;
|
||||
} else if (0 == cnt) {
|
||||
is_sync = false;
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected count", KR(ret), K(tenant_id), K(sql), K(cnt));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
start_idx = end_idx;
|
||||
end_idx = min(start_idx + batch_cnt, frozen_scn_vals_cnt);
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG_INFO("finish to check is all tablet checksum sync", KR(ret), K(is_sync),
|
||||
K(tenant_id), K(frozen_scn_vals));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletChecksumOperator::get_tablet_cnt(
|
||||
ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
|
@ -124,6 +124,11 @@ public:
|
||||
const uint64_t tenant_id,
|
||||
const SCN &compaction_scn,
|
||||
bool &is_exist);
|
||||
static int is_all_tablet_checksum_sync(
|
||||
common::ObISQLClient &sql_client,
|
||||
const uint64_t tenant_id,
|
||||
common::ObIArray<uint64_t> &frozen_scn_vals,
|
||||
bool &is_sync);
|
||||
|
||||
private:
|
||||
static int construct_load_sql_str_(
|
||||
|
@ -1858,5 +1858,53 @@ int ObTabletReplicaChecksumOperator::convert_array_to_map(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTabletReplicaChecksumOperator::is_higher_ver_tablet_rep_ckm_exist(
|
||||
const uint64_t tenant_id,
|
||||
const SCN &compaction_scn,
|
||||
const uint64_t tablet_id,
|
||||
common::ObISQLClient &sql_proxy,
|
||||
bool &is_exist)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_exist = false;
|
||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || !compaction_scn.is_valid() || (tablet_id <= 0))) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(compaction_scn), K(tablet_id));
|
||||
} else {
|
||||
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
|
||||
ObSqlString sql;
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
ObMySQLResult *result = nullptr;
|
||||
uint64_t compaction_scn_val = compaction_scn.get_val_for_inner_table_field();
|
||||
if (OB_FAIL(sql.assign_fmt("SELECT COUNT(*) AS cnt FROM %s WHERE tenant_id = '%lu' AND "
|
||||
"tablet_id = %lu AND compaction_scn > %lu", OB_ALL_TABLET_REPLICA_CHECKSUM_TNAME,
|
||||
tenant_id, tablet_id, compaction_scn_val))) {
|
||||
LOG_WARN("fail to append sql", KR(ret), K(tenant_id));
|
||||
} else if (OB_FAIL(sql_proxy.read(res, meta_tenant_id, sql.ptr()))) {
|
||||
LOG_WARN("fail to execute sql", KR(ret), K(meta_tenant_id), K(tenant_id), K(sql));
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("fail to get sql result", KR(ret), K(meta_tenant_id), K(tenant_id), K(sql));
|
||||
} else if (OB_FAIL(result->next())) {
|
||||
LOG_WARN("get next result failed", KR(ret), K(meta_tenant_id), K(tenant_id), K(sql));
|
||||
} else {
|
||||
int64_t cnt = 0;
|
||||
EXTRACT_INT_FIELD_MYSQL(*result, "cnt", cnt, int64_t);
|
||||
if (OB_SUCC(ret)) {
|
||||
if (cnt >= 1) {
|
||||
is_exist = true;
|
||||
} else if (0 == cnt) {
|
||||
is_exist = false;
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected count", KR(ret), K(meta_tenant_id), K(tenant_id), K(sql), K(cnt));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
} // share
|
||||
} // oceanbase
|
||||
|
@ -193,6 +193,13 @@ public:
|
||||
common::ObIAllocator &allocator,
|
||||
common::ObString &column_meta_hex_str);
|
||||
|
||||
static int is_higher_ver_tablet_rep_ckm_exist(
|
||||
const uint64_t tenant_id,
|
||||
const SCN &compaction_scn,
|
||||
const uint64_t tablet_id,
|
||||
common::ObISQLClient &sql_proxy,
|
||||
bool &is_exist);
|
||||
|
||||
private:
|
||||
static int batch_insert_or_update_with_trans_(
|
||||
const uint64_t tenant_id,
|
||||
|
Loading…
x
Reference in New Issue
Block a user