fix update __all_tablet_meta_table about timeout
This commit is contained in:
@ -863,7 +863,7 @@ int ObCrossClusterTabletChecksumValidator::handle_table_verification_finished(
|
|||||||
const int64_t update_start_time_us = ObTimeUtil::current_time();
|
const int64_t update_start_time_us = ObTimeUtil::current_time();
|
||||||
if (OB_FAIL(ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
if (OB_FAIL(ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
||||||
tenant_id_, frozen_scn.get_val_for_tx(),
|
tenant_id_, frozen_scn.get_val_for_tx(),
|
||||||
pairs, ObTabletReplica::ScnStatus::SCN_STATUS_ERROR))) {
|
pairs, ObTabletReplica::ScnStatus::SCN_STATUS_ERROR, expected_epoch))) {
|
||||||
LOG_WARN("fail to batch update report_scn", KR(ret), K_(tenant_id), K(pairs));
|
LOG_WARN("fail to batch update report_scn", KR(ret), K_(tenant_id), K(pairs));
|
||||||
}
|
}
|
||||||
const int64_t update_cost_time_us = ObTimeUtil::current_time() - update_start_time_us;
|
const int64_t update_cost_time_us = ObTimeUtil::current_time() - update_start_time_us;
|
||||||
|
@ -184,7 +184,7 @@ int ObMajorMergeProgressChecker::handle_table_with_first_tablet_in_sys_ls(
|
|||||||
LOG_WARN("fail to write tablet checksum at table level", KR(ret), K_(tenant_id), K(pairs));
|
LOG_WARN("fail to write tablet checksum at table level", KR(ret), K_(tenant_id), K(pairs));
|
||||||
} else if (OB_FAIL(ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
} else if (OB_FAIL(ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
||||||
tenant_id_, global_broadcast_scn.get_val_for_tx(),
|
tenant_id_, global_broadcast_scn.get_val_for_tx(),
|
||||||
pairs, ObTabletReplica::ScnStatus::SCN_STATUS_ERROR))) {
|
pairs, ObTabletReplica::ScnStatus::SCN_STATUS_ERROR, expected_epoch))) {
|
||||||
LOG_WARN("fail to batch update report_scn", KR(ret), K_(tenant_id), K(pairs));
|
LOG_WARN("fail to batch update report_scn", KR(ret), K_(tenant_id), K(pairs));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -701,7 +701,7 @@ int ObMajorMergeScheduler::try_update_global_merged_scn(const int64_t expected_e
|
|||||||
LOG_WARN("fail to handle table with first tablet in sys ls", KR(ret), K_(tenant_id),
|
LOG_WARN("fail to handle table with first tablet in sys ls", KR(ret), K_(tenant_id),
|
||||||
"global_broadcast_scn", global_info.global_broadcast_scn(), K(expected_epoch));
|
"global_broadcast_scn", global_info.global_broadcast_scn(), K(expected_epoch));
|
||||||
} else if (FALSE_IT(global_broadcast_scn_val = global_info.global_broadcast_scn_.get_scn_val())) {
|
} else if (FALSE_IT(global_broadcast_scn_val = global_info.global_broadcast_scn_.get_scn_val())) {
|
||||||
} else if (OB_FAIL(update_all_tablets_report_scn(global_broadcast_scn_val))) {
|
} else if (OB_FAIL(update_all_tablets_report_scn(global_broadcast_scn_val, expected_epoch))) {
|
||||||
LOG_WARN("fail to update all tablets report_scn", KR(ret), K(expected_epoch),
|
LOG_WARN("fail to update all tablets report_scn", KR(ret), K(expected_epoch),
|
||||||
K(global_broadcast_scn_val));
|
K(global_broadcast_scn_val));
|
||||||
} else if (OB_FAIL(zone_merge_mgr_->try_update_global_last_merged_scn(expected_epoch))) {
|
} else if (OB_FAIL(zone_merge_mgr_->try_update_global_last_merged_scn(expected_epoch))) {
|
||||||
@ -897,14 +897,17 @@ int ObMajorMergeScheduler::do_update_and_reload(const int64_t epoch)
|
|||||||
}
|
}
|
||||||
|
|
||||||
int ObMajorMergeScheduler::update_all_tablets_report_scn(
|
int ObMajorMergeScheduler::update_all_tablets_report_scn(
|
||||||
const uint64_t global_braodcast_scn_val)
|
const uint64_t global_braodcast_scn_val,
|
||||||
|
const int64_t expected_epoch)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
FREEZE_TIME_GUARD;
|
FREEZE_TIME_GUARD;
|
||||||
if (OB_FAIL(ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
if (OB_FAIL(ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
||||||
tenant_id_,
|
tenant_id_,
|
||||||
global_braodcast_scn_val,
|
global_braodcast_scn_val,
|
||||||
ObTabletReplica::ScnStatus::SCN_STATUS_ERROR))) {
|
ObTabletReplica::ScnStatus::SCN_STATUS_ERROR,
|
||||||
|
stop_,
|
||||||
|
expected_epoch))) {
|
||||||
LOG_WARN("fail to batch update report_scn", KR(ret), K_(tenant_id), K(global_braodcast_scn_val));
|
LOG_WARN("fail to batch update report_scn", KR(ret), K_(tenant_id), K(global_braodcast_scn_val));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
@ -114,7 +114,8 @@ private:
|
|||||||
bool is_primary_service() const { return is_primary_service_; }
|
bool is_primary_service() const { return is_primary_service_; }
|
||||||
|
|
||||||
// including tablets about can_not_read index and permanent offline server
|
// including tablets about can_not_read index and permanent offline server
|
||||||
int update_all_tablets_report_scn(const uint64_t global_broadcast_scn_val);
|
int update_all_tablets_report_scn(const uint64_t global_broadcast_scn_val,
|
||||||
|
const int64_t expected_epoch);
|
||||||
|
|
||||||
void check_merge_interval_time(const bool is_merging);
|
void check_merge_interval_time(const bool is_merging);
|
||||||
|
|
||||||
|
@ -401,10 +401,14 @@ int ObTabletMetaTableCompactionOperator::construct_compaction_related_info(
|
|||||||
|
|
||||||
int ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
int ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const uint64_t global_braodcast_scn_val,
|
const uint64_t global_broadcast_scn_val,
|
||||||
const ObTabletReplica::ScnStatus &except_status)
|
const ObTabletReplica::ScnStatus &except_status,
|
||||||
|
const volatile bool &stop,
|
||||||
|
const int64_t expected_epoch)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
const int64_t start_time_us = ObTimeUtil::current_time();
|
||||||
|
const int64_t BATCH_UPDATE_CNT = 1000;
|
||||||
uint64_t compat_version = 0;
|
uint64_t compat_version = 0;
|
||||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
|
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
@ -414,37 +418,44 @@ int ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
|||||||
} else if (compat_version < DATA_VERSION_4_1_0_0) {
|
} else if (compat_version < DATA_VERSION_4_1_0_0) {
|
||||||
// do nothing until schema upgrade
|
// do nothing until schema upgrade
|
||||||
} else {
|
} else {
|
||||||
ObMySQLTransaction trans;
|
LOG_INFO("start to batch update report scn", KR(ret), K(tenant_id), K(global_broadcast_scn_val), K(expected_epoch));
|
||||||
int64_t affected_rows = 0;
|
|
||||||
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
|
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
|
||||||
int64_t estimated_timeout_us = 0;
|
bool update_done = false;
|
||||||
ObTimeoutCtx timeout_ctx;
|
SMART_VAR(ObArray<uint64_t>, tablet_ids) {
|
||||||
// set trx_timeout and query_timeout based on tablet_replica_cnt
|
while (OB_SUCC(ret) && !update_done && !stop) {
|
||||||
if (OB_FAIL(ObTabletMetaTableCompactionOperator::get_estimated_timeout_us(tenant_id,
|
bool is_match = true;
|
||||||
estimated_timeout_us))) {
|
ObMySQLTransaction trans;
|
||||||
LOG_WARN("fail to get estimated_timeout_us", KR(ret), K(tenant_id));
|
ObSqlString sql;
|
||||||
} else if (OB_FAIL(timeout_ctx.set_trx_timeout_us(estimated_timeout_us))) {
|
int64_t affected_rows = 0;
|
||||||
LOG_WARN("fail to set trx timeout", KR(ret), K(estimated_timeout_us));
|
if (OB_FAIL(ObTabletMetaTableCompactionOperator::get_next_batch_tablet_ids(tenant_id,
|
||||||
} else if (OB_FAIL(timeout_ctx.set_timeout(estimated_timeout_us))) {
|
BATCH_UPDATE_CNT, tablet_ids))) {
|
||||||
LOG_WARN("fail to set abs timeout", KR(ret), K(estimated_timeout_us));
|
LOG_WARN("fail to get next batch of tablet_ids", KR(ret), K(tenant_id), K(BATCH_UPDATE_CNT));
|
||||||
} else if (OB_FAIL(trans.start(GCTX.sql_proxy_, meta_tenant_id))) {
|
} else if (0 == tablet_ids.count()) {
|
||||||
LOG_WARN("fail to start transaction", KR(ret), K(tenant_id), K(meta_tenant_id));
|
update_done = true;
|
||||||
} else {
|
LOG_INFO("finish all rounds of batch update report scn", KR(ret), K(tenant_id),
|
||||||
ObSqlString sql;
|
"cost_time_us", ObTimeUtil::current_time() - start_time_us);
|
||||||
if (OB_FAIL(sql.assign_fmt("UPDATE %s SET report_scn = '%lu' WHERE tenant_id = '%ld' "
|
} else if (OB_FAIL(construct_batch_update_report_scn_sql_str_(tenant_id,
|
||||||
"AND compaction_scn >= '%lu' AND status != '%ld'",
|
global_broadcast_scn_val, except_status, tablet_ids, sql))) {
|
||||||
OB_ALL_TABLET_META_TABLE_TNAME,
|
LOG_WARN("fail to construct batch update sql str", KR(ret), K(tenant_id),
|
||||||
global_braodcast_scn_val,
|
K(global_broadcast_scn_val), K(except_status));
|
||||||
tenant_id,
|
} else if (OB_FAIL(trans.start(GCTX.sql_proxy_, meta_tenant_id))) {
|
||||||
global_braodcast_scn_val,
|
LOG_WARN("fail to start transaction", KR(ret), K(tenant_id), K(meta_tenant_id));
|
||||||
(int64_t )except_status))) {
|
} else if (OB_FAIL(ObServiceEpochProxy::check_service_epoch_with_trans(trans, tenant_id,
|
||||||
LOG_WARN("fail to assign sql", KR(ret), K(tenant_id), K(global_braodcast_scn_val), K(except_status));
|
ObServiceEpochProxy::FREEZE_SERVICE_EPOCH, expected_epoch, is_match))) {
|
||||||
} else if (OB_FAIL(trans.write(meta_tenant_id, sql.ptr(), affected_rows))) {
|
LOG_WARN("fail to check service_epoch with trans", KR(ret), K(tenant_id), K(expected_epoch));
|
||||||
LOG_WARN("fail to execute sql", KR(ret), K(tenant_id), K(meta_tenant_id), K(sql));
|
} else if (is_match) {
|
||||||
|
if (OB_FAIL(trans.write(meta_tenant_id, sql.ptr(), affected_rows))) {
|
||||||
|
LOG_WARN("fail to execute sql", KR(ret), K(tenant_id), K(meta_tenant_id), K(sql));
|
||||||
|
}
|
||||||
|
} else { // !is_match
|
||||||
|
ret = OB_FREEZE_SERVICE_EPOCH_MISMATCH;
|
||||||
|
LOG_WARN("freeze_service_epoch mismatch, do not update report_scn on this server", KR(ret), K(tenant_id));
|
||||||
|
}
|
||||||
|
handle_trans_stat(trans, ret);
|
||||||
|
LOG_INFO("finish one round of batch update report scn", KR(ret), K(tenant_id),
|
||||||
|
K(affected_rows), K(BATCH_UPDATE_CNT));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
handle_trans_stat(trans, ret);
|
|
||||||
LOG_INFO("finish to batch update report scn", KR(ret), K(tenant_id), K(affected_rows));
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -454,6 +465,8 @@ int ObTabletMetaTableCompactionOperator::batch_update_status(
|
|||||||
const int64_t expected_epoch)
|
const int64_t expected_epoch)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
const int64_t start_time_us = ObTimeUtil::current_time();
|
||||||
|
const int64_t BATCH_UPDATE_CNT = 1000;
|
||||||
uint64_t compat_version = 0;
|
uint64_t compat_version = 0;
|
||||||
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
|
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
@ -463,40 +476,134 @@ int ObTabletMetaTableCompactionOperator::batch_update_status(
|
|||||||
} else if (compat_version < DATA_VERSION_4_1_0_0) {
|
} else if (compat_version < DATA_VERSION_4_1_0_0) {
|
||||||
// do nothing until schema upgrade
|
// do nothing until schema upgrade
|
||||||
} else {
|
} else {
|
||||||
ObMySQLTransaction trans;
|
LOG_INFO("start to batch update status", KR(ret), K(tenant_id), K(expected_epoch));
|
||||||
int64_t affected_rows = 0;
|
|
||||||
bool is_match = true;
|
|
||||||
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
|
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
|
||||||
int64_t estimated_timeout_us = 0;
|
bool update_done = false;
|
||||||
ObTimeoutCtx timeout_ctx;
|
SMART_VAR(ObArray<uint64_t>, tablet_ids) {
|
||||||
// set trx_timeout and query_timeout based on tablet_replica_cnt
|
while (OB_SUCC(ret) && !update_done) {
|
||||||
if (OB_FAIL(ObTabletMetaTableCompactionOperator::get_estimated_timeout_us(tenant_id,
|
bool is_match = true;
|
||||||
estimated_timeout_us))) {
|
ObMySQLTransaction trans;
|
||||||
LOG_WARN("fail to get estimated_timeout_us", KR(ret), K(tenant_id));
|
ObSqlString sql;
|
||||||
} else if (OB_FAIL(timeout_ctx.set_trx_timeout_us(estimated_timeout_us))) {
|
int64_t affected_rows = 0;
|
||||||
LOG_WARN("fail to set trx timeout", KR(ret), K(estimated_timeout_us));
|
if (OB_FAIL(ObTabletMetaTableCompactionOperator::get_next_batch_tablet_ids(tenant_id,
|
||||||
} else if (OB_FAIL(timeout_ctx.set_timeout(estimated_timeout_us))) {
|
BATCH_UPDATE_CNT, tablet_ids))) {
|
||||||
LOG_WARN("fail to set abs timeout", KR(ret), K(estimated_timeout_us));
|
LOG_WARN("fail to get next batch of tablet_ids", KR(ret), K(tenant_id), K(BATCH_UPDATE_CNT));
|
||||||
} else if (OB_FAIL(trans.start(GCTX.sql_proxy_, meta_tenant_id))) {
|
} else if (0 == tablet_ids.count()) {
|
||||||
LOG_WARN("fail to start transaction", KR(ret), K(tenant_id), K(meta_tenant_id));
|
update_done = true;
|
||||||
} else if (OB_FAIL(ObServiceEpochProxy::check_service_epoch_with_trans(trans, tenant_id,
|
LOG_INFO("finish all rounds of batch update status", KR(ret), K(tenant_id),
|
||||||
ObServiceEpochProxy::FREEZE_SERVICE_EPOCH, expected_epoch, is_match))) {
|
"cost_time_us", ObTimeUtil::current_time() - start_time_us);
|
||||||
LOG_WARN("fail to check service_epoch with trans", KR(ret), K(tenant_id), K(expected_epoch));
|
} else if (OB_FAIL(construct_batch_update_status_sql_str_(tenant_id, tablet_ids, sql))) {
|
||||||
} else if (is_match) {
|
LOG_WARN("fail to construct batch update sql str", KR(ret), K(tenant_id));
|
||||||
ObSqlString sql;
|
} else if (OB_FAIL(trans.start(GCTX.sql_proxy_, meta_tenant_id))) {
|
||||||
if (OB_FAIL(sql.assign_fmt("UPDATE %s SET status = '%ld' WHERE tenant_id = '%ld' "
|
LOG_WARN("fail to start transaction", KR(ret), K(tenant_id), K(meta_tenant_id));
|
||||||
"AND status = '%ld'",
|
} else if (OB_FAIL(ObServiceEpochProxy::check_service_epoch_with_trans(trans, tenant_id,
|
||||||
OB_ALL_TABLET_META_TABLE_TNAME,
|
ObServiceEpochProxy::FREEZE_SERVICE_EPOCH, expected_epoch, is_match))) {
|
||||||
(int64_t)ObTabletReplica::ScnStatus::SCN_STATUS_IDLE,
|
LOG_WARN("fail to check service_epoch with trans", KR(ret), K(tenant_id), K(expected_epoch));
|
||||||
tenant_id,
|
} else if (is_match) {
|
||||||
(int64_t)ObTabletReplica::ScnStatus::SCN_STATUS_ERROR))) {
|
if (OB_FAIL(trans.write(meta_tenant_id, sql.ptr(), affected_rows))) {
|
||||||
LOG_WARN("fail to assign sql", KR(ret), K(tenant_id));
|
LOG_WARN("fail to execute sql", KR(ret), K(tenant_id), K(meta_tenant_id), K(sql));
|
||||||
} else if (OB_FAIL(trans.write(meta_tenant_id, sql.ptr(), affected_rows))) {
|
}
|
||||||
LOG_WARN("fail to execute sql", KR(ret), K(tenant_id), K(meta_tenant_id), K(sql));
|
} else { // !is_match
|
||||||
|
ret = OB_FREEZE_SERVICE_EPOCH_MISMATCH;
|
||||||
|
LOG_WARN("freeze_service_epoch mismatch, do not update status on this server", KR(ret), K(tenant_id));
|
||||||
|
}
|
||||||
|
handle_trans_stat(trans, ret);
|
||||||
|
LOG_INFO("finish one round of batch update status", KR(ret), K(tenant_id), K(affected_rows), K(BATCH_UPDATE_CNT));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
handle_trans_stat(trans, ret);
|
}
|
||||||
LOG_INFO("finish to batch update status", KR(ret), K(tenant_id), K(affected_rows));
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObTabletMetaTableCompactionOperator::batch_get_tablet_ids(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const uint64_t start_tablet_id,
|
||||||
|
const int64_t limit_cnt,
|
||||||
|
ObIArray<uint64_t> &tablet_ids)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid argument", KR(ret), K(tenant_id));
|
||||||
|
} else {
|
||||||
|
tablet_ids.reuse();
|
||||||
|
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
|
||||||
|
ObSqlString sql;
|
||||||
|
SMART_VAR(ObISQLClient::ReadResult, res) {
|
||||||
|
ObMySQLResult *result = nullptr;
|
||||||
|
if (OB_FAIL(sql.append_fmt("SELECT DISTINCT tablet_id from %s WHERE tenant_id = '%ld' "
|
||||||
|
"AND tablet_id > '%ld' ORDER BY tenant_id, tablet_id ASC LIMIT %ld",
|
||||||
|
OB_ALL_TABLET_META_TABLE_TNAME, tenant_id, start_tablet_id, limit_cnt))) {
|
||||||
|
LOG_WARN("failed to append fmt", K(ret), K(tenant_id), K(start_tablet_id), K(limit_cnt));
|
||||||
|
} else if (OB_FAIL(GCTX.sql_proxy_->read(res, meta_tenant_id, sql.ptr()))) {
|
||||||
|
LOG_WARN("fail to execute sql", KR(ret), K(tenant_id), K(meta_tenant_id), K(sql));
|
||||||
|
} else if (OB_ISNULL(result = res.get_result())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("fail to get mysql result", KR(ret), K(tenant_id), K(sql));
|
||||||
|
} else {
|
||||||
|
while (OB_SUCC(ret)) {
|
||||||
|
if (OB_FAIL(result->next())) {
|
||||||
|
if (OB_ITER_END != ret) {
|
||||||
|
LOG_WARN("fail to get next result", KR(ret));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
int64_t tmp_tablet_id = 0;
|
||||||
|
EXTRACT_INT_FIELD_MYSQL(*result, "tablet_id", tmp_tablet_id, int64_t);
|
||||||
|
if (FAILEDx(tablet_ids.push_back(tmp_tablet_id))) {
|
||||||
|
LOG_WARN("fail to push_back tablet_id", KR(ret), K(tmp_tablet_id));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (OB_ITER_END == ret) {
|
||||||
|
ret = OB_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG_INFO("finish to batch get tablet_ids", KR(ret), K(tenant_id), K(sql));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObTabletMetaTableCompactionOperator::construct_batch_update_report_scn_sql_str_(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const uint64_t global_braodcast_scn_val,
|
||||||
|
const ObTabletReplica::ScnStatus &except_status,
|
||||||
|
const ObIArray<uint64_t> &tablet_ids,
|
||||||
|
ObSqlString &sql)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
const int64_t tablet_ids_cnt = tablet_ids.count();
|
||||||
|
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || tablet_ids.empty())) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(tablet_ids));
|
||||||
|
} else if (OB_FAIL(sql.assign_fmt("UPDATE %s SET report_scn = '%lu' WHERE tenant_id = '%ld' AND"
|
||||||
|
" tablet_id >= '%lu' AND tablet_id <= '%lu' AND compaction_scn >= '%lu' AND report_scn"
|
||||||
|
" < '%lu' AND status != '%ld'", OB_ALL_TABLET_META_TABLE_TNAME, global_braodcast_scn_val,
|
||||||
|
tenant_id, tablet_ids.at(0), tablet_ids.at(tablet_ids_cnt - 1), global_braodcast_scn_val,
|
||||||
|
global_braodcast_scn_val, (int64_t)except_status))) {
|
||||||
|
LOG_WARN("fail to assign sql", KR(ret), K(tenant_id), K(global_braodcast_scn_val), K(except_status),
|
||||||
|
"start_tablet_id", tablet_ids.at(0), "end_tablet_id", tablet_ids.at(tablet_ids_cnt - 1));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObTabletMetaTableCompactionOperator::construct_batch_update_status_sql_str_(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const ObIArray<uint64_t> &tablet_ids,
|
||||||
|
ObSqlString &sql)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
const int64_t tablet_ids_cnt = tablet_ids.count();
|
||||||
|
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || tablet_ids.empty())) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(tablet_ids));
|
||||||
|
} else if (OB_FAIL(sql.assign_fmt("UPDATE %s SET status = '%ld' WHERE tenant_id = '%ld' AND"
|
||||||
|
" tablet_id >= '%lu' AND tablet_id <= '%lu' AND status = '%ld'",
|
||||||
|
OB_ALL_TABLET_META_TABLE_TNAME, (int64_t)ObTabletReplica::ScnStatus::SCN_STATUS_IDLE,
|
||||||
|
tenant_id, tablet_ids.at(0), tablet_ids.at(tablet_ids_cnt - 1),
|
||||||
|
(int64_t)ObTabletReplica::ScnStatus::SCN_STATUS_ERROR))) {
|
||||||
|
LOG_WARN("fail to assign sql", KR(ret), K(tenant_id), "start_tablet_id", tablet_ids.at(0),
|
||||||
|
"end_tablet_id", tablet_ids.at(tablet_ids_cnt - 1));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -516,7 +623,7 @@ int ObTabletMetaTableCompactionOperator::get_estimated_timeout_us(
|
|||||||
} else {
|
} else {
|
||||||
estimated_timeout_us = tablet_replica_cnt * 1000L; // 1ms for each tablet replica
|
estimated_timeout_us = tablet_replica_cnt * 1000L; // 1ms for each tablet replica
|
||||||
estimated_timeout_us = MAX(estimated_timeout_us, THIS_WORKER.get_timeout_remain());
|
estimated_timeout_us = MAX(estimated_timeout_us, THIS_WORKER.get_timeout_remain());
|
||||||
estimated_timeout_us = MIN(estimated_timeout_us, 3600 * 1000 * 1000L);
|
estimated_timeout_us = MIN(estimated_timeout_us, 3 * 3600 * 1000 * 1000L);
|
||||||
estimated_timeout_us = MAX(estimated_timeout_us, GCONF.rpc_timeout);
|
estimated_timeout_us = MAX(estimated_timeout_us, GCONF.rpc_timeout);
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -556,9 +663,10 @@ int ObTabletMetaTableCompactionOperator::get_tablet_replica_cnt(
|
|||||||
|
|
||||||
int ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
int ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const uint64_t global_braodcast_scn_val,
|
const uint64_t global_broadcast_scn_val,
|
||||||
const ObIArray<ObTabletLSPair> &tablet_pairs,
|
const ObIArray<ObTabletLSPair> &tablet_pairs,
|
||||||
const ObTabletReplica::ScnStatus &except_status)
|
const ObTabletReplica::ScnStatus &except_status,
|
||||||
|
const int64_t expected_epoch)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int64_t affected_rows = 0;
|
int64_t affected_rows = 0;
|
||||||
@ -574,19 +682,17 @@ int ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
|||||||
} else if (compat_version < DATA_VERSION_4_1_0_0) {
|
} else if (compat_version < DATA_VERSION_4_1_0_0) {
|
||||||
} else {
|
} else {
|
||||||
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
|
const uint64_t meta_tenant_id = gen_meta_tenant_id(tenant_id);
|
||||||
ObMySQLTransaction trans;
|
|
||||||
if (OB_FAIL(trans.start(GCTX.sql_proxy_, meta_tenant_id))) {
|
|
||||||
LOG_WARN("fail to start transaction", KR(ret), K(tenant_id), K(meta_tenant_id));
|
|
||||||
}
|
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && (i < all_pair_cnt); i += MAX_BATCH_COUNT) {
|
for (int64_t i = 0; OB_SUCC(ret) && (i < all_pair_cnt); i += MAX_BATCH_COUNT) {
|
||||||
const int64_t cur_end_idx = MIN(i + MAX_BATCH_COUNT, all_pair_cnt);
|
const int64_t cur_end_idx = MIN(i + MAX_BATCH_COUNT, all_pair_cnt);
|
||||||
|
ObMySQLTransaction trans;
|
||||||
ObSqlString sql;
|
ObSqlString sql;
|
||||||
|
bool is_match = true;
|
||||||
if (OB_FAIL(sql.append_fmt(
|
if (OB_FAIL(sql.append_fmt(
|
||||||
"UPDATE %s SET report_scn = '%lu' WHERE tenant_id = %ld AND (tablet_id,ls_id) IN (",
|
"UPDATE %s SET report_scn = '%lu' WHERE tenant_id = %ld AND (tablet_id,ls_id) IN (",
|
||||||
OB_ALL_TABLET_META_TABLE_TNAME,
|
OB_ALL_TABLET_META_TABLE_TNAME,
|
||||||
global_braodcast_scn_val,
|
global_broadcast_scn_val,
|
||||||
tenant_id))) {
|
tenant_id))) {
|
||||||
LOG_WARN("fail to assign sql", KR(ret), K(tenant_id), K(global_braodcast_scn_val));
|
LOG_WARN("fail to assign sql", KR(ret), K(tenant_id), K(global_broadcast_scn_val));
|
||||||
} else {
|
} else {
|
||||||
// handle each batch tablet_ls_pairs
|
// handle each batch tablet_ls_pairs
|
||||||
for (int64_t idx = i; OB_SUCC(ret) && (idx < cur_end_idx); ++idx) {
|
for (int64_t idx = i; OB_SUCC(ret) && (idx < cur_end_idx); ++idx) {
|
||||||
@ -604,19 +710,28 @@ int ObTabletMetaTableCompactionOperator::batch_update_report_scn(
|
|||||||
LOG_WARN("fail to assign sql", KR(ret), K(tablet_id));
|
LOG_WARN("fail to assign sql", KR(ret), K(tablet_id));
|
||||||
}
|
}
|
||||||
} // end for
|
} // end for
|
||||||
if (FAILEDx(sql.append_fmt(") AND compaction_scn >= '%lu' AND status != %ld",
|
if (FAILEDx(sql.append_fmt(") AND compaction_scn >= '%lu' AND report_scn < '%lu' AND status != %ld",
|
||||||
global_braodcast_scn_val,
|
global_broadcast_scn_val, global_broadcast_scn_val, (int64_t)(except_status)))) {
|
||||||
(int64_t)(except_status)))) {
|
|
||||||
LOG_WARN("fail to assign sql", KR(ret), K(tenant_id), K(meta_tenant_id), K(except_status),
|
LOG_WARN("fail to assign sql", KR(ret), K(tenant_id), K(meta_tenant_id), K(except_status),
|
||||||
K(global_braodcast_scn_val));
|
K(global_broadcast_scn_val));
|
||||||
} else if (OB_FAIL(trans.write(meta_tenant_id, sql.ptr(), affected_rows))) {
|
} else if (OB_FAIL(trans.start(GCTX.sql_proxy_, meta_tenant_id))) {
|
||||||
LOG_WARN("fail to execute sql", KR(ret), K(tenant_id), K(meta_tenant_id), K(sql));
|
LOG_WARN("fail to start transaction", KR(ret), K(tenant_id), K(meta_tenant_id));
|
||||||
} else {
|
} else if (OB_FAIL(ObServiceEpochProxy::check_service_epoch_with_trans(trans, tenant_id,
|
||||||
LOG_TRACE("success to update report_scn", KR(ret), K(tenant_id), K(meta_tenant_id), K(tablet_pairs), K(sql));
|
ObServiceEpochProxy::FREEZE_SERVICE_EPOCH, expected_epoch, is_match))) {
|
||||||
|
LOG_WARN("fail to check service_epoch with trans", KR(ret), K(tenant_id), K(expected_epoch));
|
||||||
|
} else if (is_match) {
|
||||||
|
if (OB_FAIL(trans.write(meta_tenant_id, sql.ptr(), affected_rows))) {
|
||||||
|
LOG_WARN("fail to execute sql", KR(ret), K(tenant_id), K(meta_tenant_id), K(sql));
|
||||||
|
} else {
|
||||||
|
LOG_TRACE("success to update report_scn", KR(ret), K(tenant_id), K(meta_tenant_id), K(tablet_pairs), K(sql));
|
||||||
|
}
|
||||||
|
} else { // !is_match
|
||||||
|
ret = OB_FREEZE_SERVICE_EPOCH_MISMATCH;
|
||||||
|
LOG_WARN("freeze_service_epoch mismatch, do not update report_scn on this server", KR(ret), K(tenant_id));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
handle_trans_stat(trans, ret);
|
||||||
}
|
}
|
||||||
handle_trans_stat(trans, ret);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
@ -685,5 +800,28 @@ int ObTabletMetaTableCompactionOperator::get_unique_status(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObTabletMetaTableCompactionOperator::get_next_batch_tablet_ids(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const int64_t batch_update_cnt,
|
||||||
|
ObIArray<uint64_t> &tablet_ids)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || batch_update_cnt < 1)) {
|
||||||
|
ret = OB_INVALID_ARGUMENT;
|
||||||
|
LOG_WARN("invalid argument", KR(ret), K(tenant_id), K(batch_update_cnt));
|
||||||
|
} else {
|
||||||
|
uint64_t start_tablet_id = ObTabletID::INVALID_TABLET_ID;
|
||||||
|
if (tablet_ids.count() > 0) {
|
||||||
|
start_tablet_id = tablet_ids.at(tablet_ids.count() - 1);
|
||||||
|
}
|
||||||
|
tablet_ids.reuse();
|
||||||
|
if (OB_FAIL(ObTabletMetaTableCompactionOperator::batch_get_tablet_ids(tenant_id,
|
||||||
|
start_tablet_id, batch_update_cnt, tablet_ids))) {
|
||||||
|
LOG_WARN("fail to batch get tablet_ids", KR(ret), K(tenant_id), K(start_tablet_id), K(batch_update_cnt));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
} // end namespace share
|
} // end namespace share
|
||||||
} // end namespace oceanbase
|
} // end namespace oceanbase
|
||||||
|
@ -98,16 +98,21 @@ public:
|
|||||||
// update report_scn of all tablets which belong to @tablet_pairs
|
// update report_scn of all tablets which belong to @tablet_pairs
|
||||||
static int batch_update_report_scn(
|
static int batch_update_report_scn(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const uint64_t global_braodcast_scn_val,
|
const uint64_t global_broadcast_scn_val,
|
||||||
const common::ObIArray<ObTabletLSPair> &tablet_pairs,
|
const common::ObIArray<ObTabletLSPair> &tablet_pairs,
|
||||||
const ObTabletReplica::ScnStatus &except_status);
|
const ObTabletReplica::ScnStatus &except_status,
|
||||||
// after major_freeze, update all tablets' report_scn to global_braodcast_scn_val
|
const int64_t expected_epoch);
|
||||||
|
// after major_freeze, update all tablets' report_scn to global_broadcast_scn_val
|
||||||
static int batch_update_report_scn(
|
static int batch_update_report_scn(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
const uint64_t global_braodcast_scn_val,
|
const uint64_t global_broadcast_scn_val,
|
||||||
const ObTabletReplica::ScnStatus &except_status);
|
const ObTabletReplica::ScnStatus &except_status,
|
||||||
|
const volatile bool &stop,
|
||||||
|
const int64_t expected_epoch);
|
||||||
// designed for 'clear merge error'. it updates all tablets' status to SCN_STATUS_IDLE
|
// designed for 'clear merge error'. it updates all tablets' status to SCN_STATUS_IDLE
|
||||||
static int batch_update_status(const uint64_t tenant_id, const int64_t expected_epoch);
|
static int batch_update_status(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const int64_t expected_epoch);
|
||||||
static int get_unique_status(
|
static int get_unique_status(
|
||||||
const uint64_t tenant_id,
|
const uint64_t tenant_id,
|
||||||
common::ObIArray<ObTabletLSPair> &pairs,
|
common::ObIArray<ObTabletLSPair> &pairs,
|
||||||
@ -159,6 +164,26 @@ private:
|
|||||||
common::ObIArray<ObTabletID> &unequal_tablet_id_array);
|
common::ObIArray<ObTabletID> &unequal_tablet_id_array);
|
||||||
static int get_estimated_timeout_us(const uint64_t tenant_id, int64_t &estimated_timeout_us);
|
static int get_estimated_timeout_us(const uint64_t tenant_id, int64_t &estimated_timeout_us);
|
||||||
static int get_tablet_replica_cnt(const uint64_t tenant_id, int64_t &tablet_replica_cnt);
|
static int get_tablet_replica_cnt(const uint64_t tenant_id, int64_t &tablet_replica_cnt);
|
||||||
|
// get tablet_ids larger than @start_tablet_id, and get up to @limit_cnt records
|
||||||
|
static int batch_get_tablet_ids(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const uint64_t start_tablet_id,
|
||||||
|
const int64_t limit_cnt,
|
||||||
|
common::ObIArray<uint64_t> &tablet_ids);
|
||||||
|
static int construct_batch_update_report_scn_sql_str_(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const uint64_t global_braodcast_scn_val,
|
||||||
|
const ObTabletReplica::ScnStatus &except_status,
|
||||||
|
const common::ObIArray<uint64_t> &tablet_ids,
|
||||||
|
ObSqlString &sql);
|
||||||
|
static int construct_batch_update_status_sql_str_(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const common::ObIArray<uint64_t> &tablet_ids,
|
||||||
|
ObSqlString &sql);
|
||||||
|
static int get_next_batch_tablet_ids(
|
||||||
|
const uint64_t tenant_id,
|
||||||
|
const int64_t batch_update_cnt,
|
||||||
|
common::ObIArray<uint64_t> &tablet_ids);
|
||||||
private:
|
private:
|
||||||
const static int64_t MAX_BATCH_COUNT = 150;
|
const static int64_t MAX_BATCH_COUNT = 150;
|
||||||
};
|
};
|
||||||
|
Reference in New Issue
Block a user