[4.1] add defense check if switch piece v$ob_archive_dest_status
This commit is contained in:
@ -131,7 +131,7 @@ int ObVirtualArchiveDestStatus::inner_get_next_row(common::ObNewRow *&row)
|
||||
SERVER_LOG(INFO, "no archive dest exist, just skip", K(ret), K(curr_tenant));
|
||||
} else {
|
||||
for (int64_t dest_idx = 0; OB_SUCC(ret) && dest_idx < dest_array.count(); dest_idx++) {
|
||||
const uint64_t curr_dest = dest_array.at(dest_idx).second;
|
||||
const int64_t curr_dest = dest_array.at(dest_idx).second;
|
||||
ObArchiveDestStatusInfo dest_status_info;
|
||||
ObArray<Column> columns;
|
||||
|
||||
@ -151,20 +151,19 @@ int ObVirtualArchiveDestStatus::inner_get_next_row(common::ObNewRow *&row)
|
||||
SERVER_LOG(WARN, "get ls max scn failed", K(curr_tenant), K(ret));
|
||||
} else if (ls_checkpoint_map_.count() == 0 || ls_end_map_.count() == 0 || ls_checkpoint_map_.count() != ls_end_map_.count()) {
|
||||
SERVER_LOG(WARN, "map may be empty", K(ls_end_map_.count()), K(ls_checkpoint_map_.count()));
|
||||
if (OB_FAIL(dest_status_info.synchronized_.assign("NO"))) {
|
||||
SERVER_LOG(WARN, "fail to assign synchronized", K(ret));
|
||||
}
|
||||
} else if (OB_FAIL(compare_scn_map_())) {
|
||||
SERVER_LOG(WARN, "compare scn map failed", K(ret));
|
||||
if (OB_FAIL(dest_status_info.synchronized_.assign("NO"))) {
|
||||
SERVER_LOG(WARN, "fail to assign synchronized", K(ret));
|
||||
}
|
||||
} else if (is_synced_) {
|
||||
if (OB_FAIL(dest_status_info.synchronized_.assign("YES"))) {
|
||||
SERVER_LOG(WARN, "fail to assign synchronized", K(ret));
|
||||
if (OB_FAIL(check_if_switch_piece_(curr_tenant, curr_dest))) {
|
||||
SERVER_LOG(WARN, "fail to check if switch piece", K(ret), K(curr_tenant), K(curr_dest));
|
||||
} else if (is_synced_) {
|
||||
dest_status_info.synchronized_.reset();
|
||||
if (OB_FAIL(dest_status_info.synchronized_.assign("YES"))) {
|
||||
SERVER_LOG(WARN, "fail to assign synchronized YES", K(ret));
|
||||
} else {
|
||||
SERVER_LOG(INFO, "success to assign dest status synchronized YES", K(dest_status_info.synchronized_));
|
||||
}
|
||||
}
|
||||
} else if (OB_FAIL(dest_status_info.synchronized_.assign("NO"))) {
|
||||
SERVER_LOG(WARN, "fail to assign synchronized", K(ret));
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && OB_FAIL(get_full_row_(table_schema_, dest_status_info, columns))) {
|
||||
@ -262,7 +261,7 @@ int ObVirtualArchiveDestStatus::get_all_tenant_ls_(const uint64_t tenant_id)
|
||||
ObMySQLResult *result = NULL;
|
||||
ObSqlString sql;
|
||||
|
||||
const static char *SELECT_ALL_LS = "SELECT ls_id FROM %s WHERE tenant_id = %d and status not in "
|
||||
const static char *SELECT_ALL_LS = "SELECT ls_id FROM %s WHERE tenant_id = %ld and status not in "
|
||||
"('CREATING', 'CREATED', 'TENANT_DROPPING', 'CREATE_ABORT', 'PRE_TENANT_DROPPING')";
|
||||
if (OB_FAIL(sql.append_fmt(SELECT_ALL_LS, OB_ALL_VIRTUAL_LS_STATUS_TNAME, tenant_id))){
|
||||
SERVER_LOG(WARN, "failed to append table name", K(ret));
|
||||
@ -306,7 +305,7 @@ int ObVirtualArchiveDestStatus::get_ls_max_scn_(const uint64_t tenant_id)
|
||||
ObMySQLResult *result = NULL;
|
||||
ObSqlString sql;
|
||||
|
||||
const static char *SELECT_LS_BY_TENANT = "SELECT ls_id, max_scn FROM %s WHERE tenant_id=%d and role='LEADER'";
|
||||
const static char *SELECT_LS_BY_TENANT = "SELECT ls_id, max_scn FROM %s WHERE tenant_id=%ld and role='LEADER'";
|
||||
if (OB_FAIL(sql.append_fmt(SELECT_LS_BY_TENANT, OB_ALL_VIRTUAL_LOG_STAT_TNAME, tenant_id))) {
|
||||
SERVER_LOG(WARN, "failed to append table name", K(ret));
|
||||
} else if (OB_FAIL(sql_client_retry_weak.read(res, sql.ptr()))) {
|
||||
@ -371,7 +370,7 @@ int ObVirtualArchiveDestStatus::get_ls_checkpoint_scn_(const uint64_t tenant_id,
|
||||
ObSqlString sql;
|
||||
|
||||
const static char *SELECT_LS_CHECKPOINT = "select ls_id, max(checkpoint_scn) as checkpoint_scn from %s "
|
||||
"where ls_id in (select ls_id from %s where tenant_id=%d) and dest_id=%d and tenant_id=%d group by ls_id";
|
||||
"where ls_id in (select ls_id from %s where tenant_id=%ld) and dest_id=%ld and tenant_id=%ld group by ls_id";
|
||||
if (OB_FAIL(sql.append_fmt(SELECT_LS_CHECKPOINT, OB_ALL_VIRTUAL_LS_LOG_ARCHIVE_PROGRESS_TNAME,
|
||||
OB_ALL_VIRTUAL_LS_STATUS_TNAME, tenant_id, dest_id, tenant_id))) {
|
||||
SERVER_LOG(WARN, "failed to append table name", K(ret));
|
||||
@ -463,7 +462,7 @@ int ObVirtualArchiveDestStatus::get_status_info_(const uint64_t tenant_id,
|
||||
ObSqlString sql;
|
||||
|
||||
const static char *SELECT_LOG_ARCHIVE_PROGRESS = "SELECT status,path,checkpoint_scn,comment from %s "
|
||||
"where tenant_id=%d and dest_id=%d";
|
||||
"where tenant_id=%ld and dest_id=%ld";
|
||||
if (OB_FAIL(sql.append_fmt(SELECT_LOG_ARCHIVE_PROGRESS, OB_ALL_VIRTUAL_LOG_ARCHIVE_PROGRESS_TNAME,
|
||||
tenant_id, dest_id))) {
|
||||
SERVER_LOG(WARN, "failed to append table name", K(ret));
|
||||
@ -494,6 +493,11 @@ int ObVirtualArchiveDestStatus::get_status_info_(const uint64_t tenant_id,
|
||||
}
|
||||
dest_status_info.tenant_id_ = tenant_id;
|
||||
dest_status_info.dest_id_ = dest_id;
|
||||
if (OB_FAIL(dest_status_info.synchronized_.assign("NO"))) {
|
||||
SERVER_LOG(WARN, "fail to assign default synchronized NO", K(ret));
|
||||
} else {
|
||||
SERVER_LOG(INFO, "success to assign default synchronized NO");
|
||||
}
|
||||
}
|
||||
if (OB_ITER_END != ret) {
|
||||
SERVER_LOG(WARN, "failed to get dest status info", K(ret));
|
||||
@ -543,5 +547,89 @@ int ObVirtualArchiveDestStatus::compare_scn_map_()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObVirtualArchiveDestStatus::check_if_switch_piece_(const uint64_t tenant_id, const int64_t dest_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLClientRetryWeak sql_client_retry_weak(sql_proxy_);
|
||||
int64_t used_piece_id = OB_BACKUP_INVALID_PIECE_ID;
|
||||
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
SERVER_LOG(WARN, "not inited", K(ret));
|
||||
} else if (OB_FAIL(get_log_archive_used_piece_id_(tenant_id, dest_id, used_piece_id))){
|
||||
SERVER_LOG(WARN, "get log archive used piece id failed", K(ret));
|
||||
} else {
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
ObMySQLResult *result = NULL;
|
||||
ObSqlString sql;
|
||||
int64_t tmp_ls_id = share::ObLSID::INVALID_LS_ID;
|
||||
int64_t tmp_piece_id = OB_BACKUP_INVALID_PIECE_ID;
|
||||
const static char *SELECT_PIECE_ID_SQL = "select ls_id, piece_id FROM %s "
|
||||
"WHERE piece_id > %ld and tenant_id=%ld and dest_id=%ld";
|
||||
if (OB_FAIL(sql.append_fmt(SELECT_PIECE_ID_SQL, OB_ALL_VIRTUAL_LS_LOG_ARCHIVE_PROGRESS_TNAME,
|
||||
used_piece_id, tenant_id, dest_id))) {
|
||||
SERVER_LOG(WARN, "failed to append table name", K(ret));
|
||||
} else if (OB_FAIL(sql_client_retry_weak.read(res, sql.ptr()))) {
|
||||
SERVER_LOG(WARN, "failed to execute sql", K(sql), K(ret));
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
SERVER_LOG(WARN, "get piece result failed", K(sql.ptr()), K(ret));
|
||||
} else if (OB_FAIL(result->next()) && (OB_ITER_END == ret)) {
|
||||
ret = OB_SUCCESS;
|
||||
is_synced_ = true;
|
||||
SERVER_LOG(INFO, "no ls piece_id is bigger than archive used_piece_id", K(sql.ptr()), K(ret));
|
||||
} else {
|
||||
is_synced_ = false;
|
||||
while (OB_SUCC(ret) && OB_SUCC(result->next())) {
|
||||
EXTRACT_INT_FIELD_MYSQL(*result, "ls_id", tmp_ls_id, int64_t);
|
||||
EXTRACT_INT_FIELD_MYSQL(*result, "piece_id", tmp_piece_id, int64_t);
|
||||
SERVER_LOG(INFO, "ls piece_id is bigger than archive used_piece_id",
|
||||
K(tenant_id), K(dest_id), K(tmp_ls_id), K(tmp_piece_id), K(used_piece_id), K(sql.ptr()));
|
||||
}
|
||||
if (OB_ITER_END != ret) {
|
||||
SERVER_LOG(WARN, "failed to used piece id", K(ret));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObVirtualArchiveDestStatus::get_log_archive_used_piece_id_(const uint64_t tenant_id, const int64_t dest_id, int64_t &piece_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLClientRetryWeak sql_client_retry_weak(sql_proxy_);
|
||||
|
||||
if (OB_UNLIKELY(!is_inited_)) {
|
||||
SERVER_LOG(WARN, "not inited", K(ret));
|
||||
} else {
|
||||
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
|
||||
ObMySQLResult *result = NULL;
|
||||
ObSqlString sql;
|
||||
|
||||
const static char *SELECT_RS_PIECE_ID_SQL = "SELECT used_piece_id FROM %s WHERE tenant_id=%ld and dest_id=%ld";
|
||||
if (OB_FAIL(sql.append_fmt(SELECT_RS_PIECE_ID_SQL, OB_ALL_VIRTUAL_LOG_ARCHIVE_PROGRESS_TNAME, tenant_id, dest_id))) {
|
||||
SERVER_LOG(WARN, "failed to append table name", K(ret));
|
||||
} else if (OB_FAIL(sql_client_retry_weak.read(res, sql.ptr()))) {
|
||||
SERVER_LOG(WARN, "failed to execute sql", K(sql), K(ret));
|
||||
} else if (OB_ISNULL(result = res.get_result())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
SERVER_LOG(WARN, "failed to get result", "sql", sql.ptr(), K(ret));
|
||||
} else {
|
||||
while (OB_SUCC(ret) && OB_SUCC(result->next())) {
|
||||
EXTRACT_INT_FIELD_MYSQL(*result, "used_piece_id", piece_id, int64_t);
|
||||
SERVER_LOG(INFO, "get used_piece_id success", K(piece_id));
|
||||
}
|
||||
if (OB_ITER_END != ret) {
|
||||
SERVER_LOG(WARN, "failed to get used piece id", K(ret));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}// end namespace observer
|
||||
}// end namespace oceanbase
|
||||
|
||||
@ -79,7 +79,8 @@ private:
|
||||
ObIArray<Column> &columns);
|
||||
int get_status_info_(const uint64_t tenant_id, const int64_t dest_id, ObArchiveDestStatusInfo &dest_status_info);
|
||||
int compare_scn_map_();
|
||||
|
||||
int check_if_switch_piece_(const uint64_t tenant_id, const int64_t dest_id);
|
||||
int get_log_archive_used_piece_id_(const uint64_t tenant_id, const int64_t dest_id, int64_t &piece_id);
|
||||
private:
|
||||
bool is_inited_;
|
||||
bool ls_end_map_inited_;
|
||||
|
||||
Reference in New Issue
Block a user