diff --git a/src/observer/virtual_table/ob_all_virtual_archive_dest_status.cpp b/src/observer/virtual_table/ob_all_virtual_archive_dest_status.cpp index f11beea866..4a33a717f8 100644 --- a/src/observer/virtual_table/ob_all_virtual_archive_dest_status.cpp +++ b/src/observer/virtual_table/ob_all_virtual_archive_dest_status.cpp @@ -131,7 +131,7 @@ int ObVirtualArchiveDestStatus::inner_get_next_row(common::ObNewRow *&row) SERVER_LOG(INFO, "no archive dest exist, just skip", K(ret), K(curr_tenant)); } else { for (int64_t dest_idx = 0; OB_SUCC(ret) && dest_idx < dest_array.count(); dest_idx++) { - const uint64_t curr_dest = dest_array.at(dest_idx).second; + const int64_t curr_dest = dest_array.at(dest_idx).second; ObArchiveDestStatusInfo dest_status_info; ObArray columns; @@ -151,20 +151,19 @@ int ObVirtualArchiveDestStatus::inner_get_next_row(common::ObNewRow *&row) SERVER_LOG(WARN, "get ls max scn failed", K(curr_tenant), K(ret)); } else if (ls_checkpoint_map_.count() == 0 || ls_end_map_.count() == 0 || ls_checkpoint_map_.count() != ls_end_map_.count()) { SERVER_LOG(WARN, "map may be empty", K(ls_end_map_.count()), K(ls_checkpoint_map_.count())); - if (OB_FAIL(dest_status_info.synchronized_.assign("NO"))) { - SERVER_LOG(WARN, "fail to assign synchronized", K(ret)); - } } else if (OB_FAIL(compare_scn_map_())) { SERVER_LOG(WARN, "compare scn map failed", K(ret)); - if (OB_FAIL(dest_status_info.synchronized_.assign("NO"))) { - SERVER_LOG(WARN, "fail to assign synchronized", K(ret)); - } } else if (is_synced_) { - if (OB_FAIL(dest_status_info.synchronized_.assign("YES"))) { - SERVER_LOG(WARN, "fail to assign synchronized", K(ret)); + if (OB_FAIL(check_if_switch_piece_(curr_tenant, curr_dest))) { + SERVER_LOG(WARN, "fail to check if switch piece", K(ret), K(curr_tenant), K(curr_dest)); + } else if (is_synced_) { + dest_status_info.synchronized_.reset(); + if (OB_FAIL(dest_status_info.synchronized_.assign("YES"))) { + SERVER_LOG(WARN, "fail to assign synchronized YES", K(ret)); + } else { + SERVER_LOG(INFO, "success to assign dest status synchronized YES", K(dest_status_info.synchronized_)); + } } - } else if (OB_FAIL(dest_status_info.synchronized_.assign("NO"))) { - SERVER_LOG(WARN, "fail to assign synchronized", K(ret)); } if (OB_SUCC(ret) && OB_FAIL(get_full_row_(table_schema_, dest_status_info, columns))) { @@ -262,7 +261,7 @@ int ObVirtualArchiveDestStatus::get_all_tenant_ls_(const uint64_t tenant_id) ObMySQLResult *result = NULL; ObSqlString sql; - const static char *SELECT_ALL_LS = "SELECT ls_id FROM %s WHERE tenant_id = %d and status not in " + const static char *SELECT_ALL_LS = "SELECT ls_id FROM %s WHERE tenant_id = %ld and status not in " "('CREATING', 'CREATED', 'TENANT_DROPPING', 'CREATE_ABORT', 'PRE_TENANT_DROPPING')"; if (OB_FAIL(sql.append_fmt(SELECT_ALL_LS, OB_ALL_VIRTUAL_LS_STATUS_TNAME, tenant_id))){ SERVER_LOG(WARN, "failed to append table name", K(ret)); @@ -306,7 +305,7 @@ int ObVirtualArchiveDestStatus::get_ls_max_scn_(const uint64_t tenant_id) ObMySQLResult *result = NULL; ObSqlString sql; - const static char *SELECT_LS_BY_TENANT = "SELECT ls_id, max_scn FROM %s WHERE tenant_id=%d and role='LEADER'"; + const static char *SELECT_LS_BY_TENANT = "SELECT ls_id, max_scn FROM %s WHERE tenant_id=%ld and role='LEADER'"; if (OB_FAIL(sql.append_fmt(SELECT_LS_BY_TENANT, OB_ALL_VIRTUAL_LOG_STAT_TNAME, tenant_id))) { SERVER_LOG(WARN, "failed to append table name", K(ret)); } else if (OB_FAIL(sql_client_retry_weak.read(res, sql.ptr()))) { @@ -371,7 +370,7 @@ int ObVirtualArchiveDestStatus::get_ls_checkpoint_scn_(const uint64_t tenant_id, ObSqlString sql; const static char *SELECT_LS_CHECKPOINT = "select ls_id, max(checkpoint_scn) as checkpoint_scn from %s " - "where ls_id in (select ls_id from %s where tenant_id=%d) and dest_id=%d and tenant_id=%d group by ls_id"; + "where ls_id in (select ls_id from %s where tenant_id=%ld) and dest_id=%ld and tenant_id=%ld group by ls_id"; if (OB_FAIL(sql.append_fmt(SELECT_LS_CHECKPOINT, OB_ALL_VIRTUAL_LS_LOG_ARCHIVE_PROGRESS_TNAME, OB_ALL_VIRTUAL_LS_STATUS_TNAME, tenant_id, dest_id, tenant_id))) { SERVER_LOG(WARN, "failed to append table name", K(ret)); @@ -463,7 +462,7 @@ int ObVirtualArchiveDestStatus::get_status_info_(const uint64_t tenant_id, ObSqlString sql; const static char *SELECT_LOG_ARCHIVE_PROGRESS = "SELECT status,path,checkpoint_scn,comment from %s " - "where tenant_id=%d and dest_id=%d"; + "where tenant_id=%ld and dest_id=%ld"; if (OB_FAIL(sql.append_fmt(SELECT_LOG_ARCHIVE_PROGRESS, OB_ALL_VIRTUAL_LOG_ARCHIVE_PROGRESS_TNAME, tenant_id, dest_id))) { SERVER_LOG(WARN, "failed to append table name", K(ret)); @@ -494,6 +493,11 @@ int ObVirtualArchiveDestStatus::get_status_info_(const uint64_t tenant_id, } dest_status_info.tenant_id_ = tenant_id; dest_status_info.dest_id_ = dest_id; + if (OB_FAIL(dest_status_info.synchronized_.assign("NO"))) { + SERVER_LOG(WARN, "fail to assign default synchronized NO", K(ret)); + } else { + SERVER_LOG(INFO, "success to assign default synchronized NO"); + } } if (OB_ITER_END != ret) { SERVER_LOG(WARN, "failed to get dest status info", K(ret)); @@ -543,5 +547,89 @@ int ObVirtualArchiveDestStatus::compare_scn_map_() return ret; } +int ObVirtualArchiveDestStatus::check_if_switch_piece_(const uint64_t tenant_id, const int64_t dest_id) +{ + int ret = OB_SUCCESS; + ObSQLClientRetryWeak sql_client_retry_weak(sql_proxy_); + int64_t used_piece_id = OB_BACKUP_INVALID_PIECE_ID; + + if (OB_UNLIKELY(!is_inited_)) { + SERVER_LOG(WARN, "not inited", K(ret)); + } else if (OB_FAIL(get_log_archive_used_piece_id_(tenant_id, dest_id, used_piece_id))){ + SERVER_LOG(WARN, "get log archive used piece id failed", K(ret)); + } else { + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ObMySQLResult *result = NULL; + ObSqlString sql; + int64_t tmp_ls_id = share::ObLSID::INVALID_LS_ID; + int64_t tmp_piece_id = OB_BACKUP_INVALID_PIECE_ID; + const static char *SELECT_PIECE_ID_SQL = "select ls_id, piece_id FROM %s " + "WHERE piece_id > %ld and tenant_id=%ld and dest_id=%ld"; + if (OB_FAIL(sql.append_fmt(SELECT_PIECE_ID_SQL, OB_ALL_VIRTUAL_LS_LOG_ARCHIVE_PROGRESS_TNAME, + used_piece_id, tenant_id, dest_id))) { + SERVER_LOG(WARN, "failed to append table name", K(ret)); + } else if (OB_FAIL(sql_client_retry_weak.read(res, sql.ptr()))) { + SERVER_LOG(WARN, "failed to execute sql", K(sql), K(ret)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + SERVER_LOG(WARN, "get piece result failed", K(sql.ptr()), K(ret)); + } else if (OB_FAIL(result->next()) && (OB_ITER_END == ret)) { + ret = OB_SUCCESS; + is_synced_ = true; + SERVER_LOG(INFO, "no ls piece_id is bigger than archive used_piece_id", K(sql.ptr()), K(ret)); + } else { + is_synced_ = false; + while (OB_SUCC(ret) && OB_SUCC(result->next())) { + EXTRACT_INT_FIELD_MYSQL(*result, "ls_id", tmp_ls_id, int64_t); + EXTRACT_INT_FIELD_MYSQL(*result, "piece_id", tmp_piece_id, int64_t); + SERVER_LOG(INFO, "ls piece_id is bigger than archive used_piece_id", + K(tenant_id), K(dest_id), K(tmp_ls_id), K(tmp_piece_id), K(used_piece_id), K(sql.ptr())); + } + if (OB_ITER_END != ret) { + SERVER_LOG(WARN, "failed to used piece id", K(ret)); + } else { + ret = OB_SUCCESS; + } + } + } + } + return ret; +} + +int ObVirtualArchiveDestStatus::get_log_archive_used_piece_id_(const uint64_t tenant_id, const int64_t dest_id, int64_t &piece_id) +{ + int ret = OB_SUCCESS; + ObSQLClientRetryWeak sql_client_retry_weak(sql_proxy_); + + if (OB_UNLIKELY(!is_inited_)) { + SERVER_LOG(WARN, "not inited", K(ret)); + } else { + SMART_VAR(ObMySQLProxy::MySQLResult, res) { + ObMySQLResult *result = NULL; + ObSqlString sql; + + const static char *SELECT_RS_PIECE_ID_SQL = "SELECT used_piece_id FROM %s WHERE tenant_id=%ld and dest_id=%ld"; + if (OB_FAIL(sql.append_fmt(SELECT_RS_PIECE_ID_SQL, OB_ALL_VIRTUAL_LOG_ARCHIVE_PROGRESS_TNAME, tenant_id, dest_id))) { + SERVER_LOG(WARN, "failed to append table name", K(ret)); + } else if (OB_FAIL(sql_client_retry_weak.read(res, sql.ptr()))) { + SERVER_LOG(WARN, "failed to execute sql", K(sql), K(ret)); + } else if (OB_ISNULL(result = res.get_result())) { + ret = OB_ERR_UNEXPECTED; + SERVER_LOG(WARN, "failed to get result", "sql", sql.ptr(), K(ret)); + } else { + while (OB_SUCC(ret) && OB_SUCC(result->next())) { + EXTRACT_INT_FIELD_MYSQL(*result, "used_piece_id", piece_id, int64_t); + SERVER_LOG(INFO, "get used_piece_id success", K(piece_id)); + } + if (OB_ITER_END != ret) { + SERVER_LOG(WARN, "failed to get used piece id", K(ret)); + } else { + ret = OB_SUCCESS; + } + } + } + } + return ret; +} }// end namespace observer }// end namespace oceanbase diff --git a/src/observer/virtual_table/ob_all_virtual_archive_dest_status.h b/src/observer/virtual_table/ob_all_virtual_archive_dest_status.h index fb1abebf83..955b30585b 100644 --- a/src/observer/virtual_table/ob_all_virtual_archive_dest_status.h +++ b/src/observer/virtual_table/ob_all_virtual_archive_dest_status.h @@ -79,7 +79,8 @@ private: ObIArray &columns); int get_status_info_(const uint64_t tenant_id, const int64_t dest_id, ObArchiveDestStatusInfo &dest_status_info); int compare_scn_map_(); - + int check_if_switch_piece_(const uint64_t tenant_id, const int64_t dest_id); + int get_log_archive_used_piece_id_(const uint64_t tenant_id, const int64_t dest_id, int64_t &piece_id); private: bool is_inited_; bool ls_end_map_inited_;