[master] fix ls gc in primary when switchover

This commit is contained in:
taoshuning
2023-08-04 08:42:26 +00:00
committed by ob-robot
parent 600aa34ecc
commit bd5ae50a72
4 changed files with 207 additions and 30 deletions

View File

@ -21,12 +21,16 @@
#include "lib/string/ob_string.h" // ObString
#include "lib/time/ob_time_utility.h" // ObTimeUtility
#include "lib/utility/ob_macro_utils.h"
#include "logservice/ob_garbage_collector.h" // ObGCLSLog
#include "logservice/ob_log_base_type.h" // ObLogBaseHeader
#include "logservice/ob_log_service.h" // ObLogService
#include "logservice/palf/log_define.h"
#include "logservice/palf/log_group_entry.h"
#include "logservice/palf/lsn.h"
#include "logservice/palf/palf_env.h" // PalfEnv
#include "logservice/palf/palf_iterator.h"
#include "logservice/palf/palf_options.h"
#include "logservice/palf_handle_guard.h"
#include "logservice/replayservice/ob_log_replay_service.h"
#include "ob_log_archive_piece_mgr.h" // ObLogArchivePieceContext
#include "ob_fetch_log_task.h" // ObFetchLogTask
@ -40,6 +44,8 @@
#include "share/ob_log_restore_proxy.h"
#include "share/ob_ls_id.h"
#include "share/restore/ob_log_restore_source.h"
#include "storage/ls/ob_ls_get_mod.h"
#include "storage/tx_storage/ob_ls_handle.h"
namespace oceanbase
{
@ -403,15 +409,8 @@ int ObLogRestoreHandler::raw_write(const int64_t proposal_id,
void ObLogRestoreHandler::deep_copy_source(ObRemoteSourceGuard &source_guard)
{
int ret = OB_SUCCESS;
RLockGuard guard(lock_);
ObRemoteLogParent *source = NULL;
if (OB_ISNULL(parent_)) {
CLOG_LOG(WARN, "parent_ is NULL", K(ret));
} else if (OB_ISNULL(source = ObResSrcAlloctor::alloc(parent_->get_source_type(), share::ObLSID(id_)))) {
} else if (FALSE_IT(parent_->deep_copy_to(*source))) {
} else if (FALSE_IT(source_guard.set_source(source))) {
}
deep_copy_source_(source_guard);
}
int ObLogRestoreHandler::schedule(const int64_t id,
@ -655,18 +654,24 @@ int ObLogRestoreHandler::check_restore_to_newest(share::SCN &end_scn, share::SCN
ObRemoteLogParent *source = NULL;
ObLogArchivePieceContext *piece_context = NULL;
share::ObBackupDest *dest = NULL;
palf::LSN end_lsn;
end_scn = SCN::min_scn();
archive_scn = SCN::min_scn();
SCN restore_scn = SCN::min_scn();
CLOG_LOG(INFO, "start check restore to newest", K(id_));
{
RLockGuard lock_guard(lock_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "ObLogRestoreHandler not init", K(ret), KPC(this));
} else if (! is_strong_leader(role_)) {
ret = OB_NOT_MASTER;
CLOG_LOG(WARN, "not leader", K(ret), KPC(this));
} else if (FALSE_IT(deep_copy_source(guard))) {
} else if (OB_FAIL(palf_handle_.get_end_lsn(end_lsn))) {
CLOG_LOG(WARN, "get end lsn failed", K(id_));
} else if (OB_FAIL(palf_handle_.get_end_scn(end_scn))) {
CLOG_LOG(WARN, "get end scn failed", K(id_));
} else if (FALSE_IT(deep_copy_source_(guard))) {
} else if (OB_ISNULL(source = guard.get_source())) {
ret = OB_EAGAIN;
CLOG_LOG(WARN, "invalid source", K(ret), KPC(this), KPC(source));
@ -677,12 +682,6 @@ int ObLogRestoreHandler::check_restore_to_newest(share::SCN &end_scn, share::SCN
}
if (OB_SUCC(ret) && NULL != source) {
palf::LSN end_lsn;
if (OB_FAIL(palf_handle_.get_end_lsn(end_lsn))) {
CLOG_LOG(WARN, "get end lsn failed", K(id_));
} else if (OB_FAIL(palf_handle_.get_end_scn(end_scn))) {
CLOG_LOG(WARN, "get end scn failed", K(id_));
} else {
if (share::is_location_log_source_type(source->get_source_type())) {
ObRemoteLocationParent *location_source = dynamic_cast<ObRemoteLocationParent *>(source);
ObLogArchivePieceContext *piece_context = NULL;
@ -697,7 +696,6 @@ int ObLogRestoreHandler::check_restore_to_newest(share::SCN &end_scn, share::SCN
ret = OB_NOT_SUPPORTED;
}
}
}
return ret;
}
@ -823,6 +821,7 @@ int ObLogRestoreHandler::check_restore_to_newest_from_service_(
share::SCN &archive_scn)
{
int ret = OB_SUCCESS;
bool offline_log_exist = false;
share::ObTenantRole tenant_role;
share::schema::ObTenantStatus tenant_status;
palf::AccessMode access_mode;
@ -845,10 +844,35 @@ int ObLogRestoreHandler::check_restore_to_newest_from_service_(
ret = OB_SOURCE_TENANT_STATE_NOT_MATCH;
CLOG_LOG(WARN, "tenant role or status not match", K(id_), K(tenant_role), K(tenant_status), K(service_attr));
} else if (OB_FAIL(proxy_util.get_max_log_info(share::ObLSID(id_), access_mode, archive_scn))) {
// OB_ENTRY_NOT_EXIST, ls not exist in gv$ob_log_stat, a) ls has no leader; b) access virtual table failed; c) ls gc
if (OB_ENTRY_NOT_EXIST == ret) {
// get ls from dba_ob_ls
// 1. OB_LS_NOT_EXIST, ls gc in log restore source tenant, check if offline_log already transported successfully
// 2. OB_SUCCESS, ls still exist, check in next turn
// 3. other error code
if (OB_FAIL(proxy_util.is_ls_existing(share::ObLSID(id_))) && OB_LS_NOT_EXIST != ret) {
CLOG_LOG(WARN, "get_ls failed", K(id_));
} else if (OB_SUCCESS == ret) {
ret = OB_EAGAIN;
CLOG_LOG(WARN, "get_ls succ, while get ls max_log info failed, just retry", K(id_));
} else if (OB_FAIL(check_if_ls_gc_(offline_log_exist))) {
CLOG_LOG(WARN, "check ls_gc failed", K(id_));
} else if (offline_log_exist) {
archive_scn = end_scn;
CLOG_LOG(INFO, "check ls_gc succ, set archive_scn equals to end_scn", K(id_), K(end_scn), K(service_attr));
} else {
ret = OB_ERR_OUT_OF_LOWER_BOUND;
CLOG_LOG(ERROR, "ls not exist in primary while offline_log not exist in standby, check log gap",
K(id_), K(end_scn), K(service_attr));
}
} else {
CLOG_LOG(WARN, "get max_log info failed", K(id_));
}
} else if (!palf::is_valid_access_mode(access_mode) || palf::AccessMode::RAW_WRITE != access_mode) {
ret = OB_SOURCE_LS_STATE_NOT_MATCH;
CLOG_LOG(WARN, "access_mode not match", K(id_), K(access_mode));
CLOG_LOG(WARN, "access_mode not match, check if ls gc in log restore source tenant", K(id_), K(access_mode));
// rewrite ret code, retry next time
ret = OB_EAGAIN;
} else if (end_scn < archive_scn) {
CLOG_LOG(INFO, "end_scn smaller than archive_scn", K(id_), K(archive_scn), K(end_scn));
} else {
@ -885,6 +909,85 @@ int ObLogRestoreHandler::check_restore_to_newest_from_archive_(
return ret;
}
int ObLogRestoreHandler::check_if_ls_gc_(bool &done)
{
int ret = OB_SUCCESS;
share::SCN offline_scn;
done = false;
if (OB_FAIL(get_offline_scn_(offline_scn))) {
CLOG_LOG(WARN, "get offline_scn failed", K(offline_scn), K(id_));
} else if (offline_scn.is_valid()) {
done = true;
CLOG_LOG(INFO, "offline_scn is valid, ls gc", K(id_), K(offline_scn));
} else if (OB_FAIL(check_offline_log_(done))) {
CLOG_LOG(WARN, "check offline_log failed", K(id_), K(offline_scn));
} else if (!done) {
// if check offline_log failed, double check if offline_scn valid
if (OB_FAIL(get_offline_scn_(offline_scn))) {
CLOG_LOG(WARN, "get offline_scn failed", K(offline_scn), K(id_));
} else if (offline_scn.is_valid()) {
done = true;
CLOG_LOG(INFO, "offline_scn is valid, ls gc", K(id_), K(offline_scn));
}
}
return ret;
}
// function out of lock, use palf guard to open palf
int ObLogRestoreHandler::check_offline_log_(bool &done)
{
int ret = OB_SUCCESS;
share::SCN replayed_scn;
palf::PalfHandleGuard guard;
palf::PalfGroupBufferIterator iter;
done = false;
if (OB_FAIL(MTL(ObLogService*)->get_log_replay_service()->get_max_replayed_scn(
share::ObLSID(id_), replayed_scn))) {
CLOG_LOG(WARN, "get replayed_lsn failed", K(id_));
} else if (OB_FAIL(MTL(ObLogService*)->open_palf(share::ObLSID(id_), guard))) {
CLOG_LOG(WARN, "open_palf failed", K(id_));
// rewrite ret_code
ret = OB_EAGAIN;
} else if (OB_FAIL(guard.seek(replayed_scn, iter))) {
CLOG_LOG(WARN, "seek failed", K(id_));
} else {
palf::LogGroupEntry entry;
palf::LSN lsn;
while (OB_SUCC(ret)) {
if (OB_FAIL(iter.next())) {
CLOG_LOG(WARN, "next failed", K(id_));
} else if (OB_FAIL(iter.get_entry(entry, lsn))) {
CLOG_LOG(WARN, "get entry failed", K(id_), K(entry));
} else {
int64_t pos = 0;
const char *log_buf = entry.get_data_buf();
const int64_t log_length = entry.get_data_len();
logservice::ObLogBaseHeader header;
const int64_t header_size = header.get_serialize_size();
if (OB_FAIL(header.deserialize(log_buf, header_size, pos))) {
CLOG_LOG(WARN, "ObLogBaseHeader deserialize failed", K(id_), K(entry));
} else if (OB_UNLIKELY(pos >= log_length)) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "unexpected log pos", K(id_), K(pos), K(log_length), K(entry));
} else if (logservice::GC_LS_LOG_BASE_TYPE == header.get_log_type()) {
logservice::ObGCLSLog gc_log;
if (OB_FAIL(gc_log.deserialize(log_buf, log_length, pos))) {
CLOG_LOG(WARN, "gc_log deserialize failed", K(id_), K(pos), K(log_length), K(entry));
} else if (logservice::ObGCLSLOGType::OFFLINE_LS == gc_log.get_log_type()) {
done = true;
CLOG_LOG(INFO, "offline_log exist", K(id_), K(gc_log), K(lsn), K(entry));
break;
}
}
}
} // while
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
}
return ret;
}
bool ObLogRestoreHandler::restore_to_end_unlock_() const
{
int ret = OB_SUCCESS;
@ -1004,6 +1107,31 @@ int ObLogRestoreHandler::get_restore_sync_status(int ret_code,
return ret;
}
int ObLogRestoreHandler::get_offline_scn_(share::SCN &scn)
{
int ret = OB_SUCCESS;
storage::ObLSHandle handle;
if (OB_FAIL(MTL(storage::ObLSService*)->get_ls(share::ObLSID(id_), handle, ObLSGetMod::LOG_MOD))) {
CLOG_LOG(WARN, "get ls failed", K(id_));
} else if (OB_FAIL(handle.get_ls()->get_offline_scn(scn))) {
CLOG_LOG(WARN, "get offline_scn failed", K(id_));
}
return ret;
}
void ObLogRestoreHandler::deep_copy_source_(ObRemoteSourceGuard &source_guard)
{
int ret = OB_SUCCESS;
ObRemoteLogParent *source = NULL;
if (OB_ISNULL(parent_)) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "parent_ is NULL", K(ret));
} else if (OB_ISNULL(source = ObResSrcAlloctor::alloc(parent_->get_source_type(), share::ObLSID(id_)))) {
} else if (FALSE_IT(parent_->deep_copy_to(*source))) {
} else if (FALSE_IT(source_guard.set_source(source))) {
}
}
RestoreStatusInfo::RestoreStatusInfo()
: ls_id_(share::ObLSID::INVALID_LS_ID),
sync_lsn_(LOG_INVALID_LSN_VAL),

View File

@ -249,6 +249,10 @@ private:
int check_restore_to_newest_from_archive_(ObLogArchivePieceContext &piece_context,
const palf::LSN &end_lsn, const share::SCN &end_scn, share::SCN &archive_scn);
bool restore_to_end_unlock_() const;
int get_offline_scn_(share::SCN &scn);
void deep_copy_source_(ObRemoteSourceGuard &guard);
int check_if_ls_gc_(bool &done);
int check_offline_log_(bool &done);
private:
ObRemoteLogParent *parent_;

View File

@ -625,7 +625,12 @@ int ObLogRestoreProxyUtil::get_max_log_info(const ObLSID &id, palf::AccessMode &
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get result failed", K(sql));
} else if (OB_FAIL(result->next())) {
LOG_WARN("next failed");
if (OB_ITER_END == ret) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("get max log info failed", K(sql), K(id));
} else {
LOG_WARN("next failed", K(sql), K(id));
}
} else {
uint64_t max_scn = 0;
ObString access_mode_str;
@ -645,6 +650,44 @@ int ObLogRestoreProxyUtil::get_max_log_info(const ObLSID &id, palf::AccessMode &
return ret;
}
int ObLogRestoreProxyUtil::is_ls_existing(const ObLSID &id)
{
int ret = OB_SUCCESS;
const char *LS_ID = "LS_ID";
common::ObMySQLProxy *proxy = &sql_proxy_;
if (OB_UNLIKELY(!inited_)) {
ret = OB_NOT_INIT;
} else if (OB_UNLIKELY(!id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invlaid argument", K(id));
} else {
RESTORE_RETRY(
SMART_VAR(common::ObMySQLProxy::MySQLResult, res) {
common::sqlclient::ObMySQLResult *result = NULL;
common::ObSqlString sql;
const char *GET_LS_SQL = "SELECT COUNT(1) as COUNT FROM %s WHERE %s=%ld";
if (OB_FAIL(sql.append_fmt(GET_LS_SQL, OB_DBA_OB_LS_TNAME, LS_ID, id.id()))) {
LOG_WARN("append_fmt failed");
} else if (OB_FAIL(proxy->read(res, sql.ptr()))) {
LOG_WARN("excute sql failed", K(sql));
} else if (OB_ISNULL(result = res.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get result failed", K(sql));
} else if (OB_FAIL(result->next())) {
LOG_WARN("next failed", K(id), K(sql));
} else {
uint64_t count = 0;
EXTRACT_UINT_FIELD_MYSQL(*result, "COUNT", count, uint64_t);
if (OB_SUCC(ret) && 0 == count) {
ret = OB_LS_NOT_EXIST;
}
}
}
)
}
return ret;
}
void ObLogRestoreProxyUtil::destroy_tg_()
{
if (-1 != tg_id_) {

View File

@ -144,6 +144,8 @@ public:
int get_tenant_info(ObTenantRole &role, schema::ObTenantStatus &status);
// get the access_mode and max_scn of the specific LS in log restore source tenant
int get_max_log_info(const ObLSID &id, palf::AccessMode &mode, SCN &scn);
// get ls from dba_ob_ls
int is_ls_existing(const ObLSID &id);
private:
bool is_user_changed_(const char *user_name, const char *user_password, const char *db_name);
void destroy_tg_();