net standby support error report
This commit is contained in:
@ -555,8 +555,8 @@ int ObCdcFetcher::ls_fetch_log_(const ObLSID &ls_id,
|
|||||||
// exit
|
// exit
|
||||||
reach_max_lsn = true;
|
reach_max_lsn = true;
|
||||||
}
|
}
|
||||||
} else if (OB_ALREADY_IN_NOARCHIVE_MODE == ret) {
|
} else if (OB_ALREADY_IN_NOARCHIVE_MODE == ret || OB_ENTRY_NOT_EXIST == ret) {
|
||||||
// archive is not on
|
// archive is not on or lsn less than the start_lsn in archive
|
||||||
ret = OB_ERR_OUT_OF_LOWER_BOUND;
|
ret = OB_ERR_OUT_OF_LOWER_BOUND;
|
||||||
} else {
|
} else {
|
||||||
// other error code, retry because various error code would be returned, retry could fix some problem
|
// other error code, retry because various error code would be returned, retry could fix some problem
|
||||||
|
|||||||
@ -23,6 +23,7 @@
|
|||||||
#include "ob_log_sys_ls_task_handler.h" // IObLogSysLsTaskHandler
|
#include "ob_log_sys_ls_task_handler.h" // IObLogSysLsTaskHandler
|
||||||
#include "ob_log_task_pool.h" // ObLogTransTaskPool
|
#include "ob_log_task_pool.h" // ObLogTransTaskPool
|
||||||
#include "ob_log_instance.h" // IObLogErrHandler
|
#include "ob_log_instance.h" // IObLogErrHandler
|
||||||
|
#include "logservice/restoreservice/ob_log_restore_net_driver.h" // logfetcher::LogErrHandler
|
||||||
|
|
||||||
using namespace oceanbase::common;
|
using namespace oceanbase::common;
|
||||||
|
|
||||||
@ -91,6 +92,7 @@ int ObLogFetcher::init(
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int64_t max_cached_ls_fetch_ctx_count = cfg.active_ls_count;
|
int64_t max_cached_ls_fetch_ctx_count = cfg.active_ls_count;
|
||||||
|
LogFetcherErrHandler *fake_err_handler = NULL; // TODO: CDC need to process error handler
|
||||||
|
|
||||||
if (IS_INIT) {
|
if (IS_INIT) {
|
||||||
ret = OB_INIT_TWICE;
|
ret = OB_INIT_TWICE;
|
||||||
@ -115,6 +117,7 @@ int ObLogFetcher::init(
|
|||||||
prefer_region,
|
prefer_region,
|
||||||
cluster_id,
|
cluster_id,
|
||||||
false/*is_across_cluster*/,
|
false/*is_across_cluster*/,
|
||||||
|
fake_err_handler,
|
||||||
cfg.server_blacklist.str(),
|
cfg.server_blacklist.str(),
|
||||||
cfg.log_router_background_refresh_interval_sec,
|
cfg.log_router_background_refresh_interval_sec,
|
||||||
cfg.all_server_cache_update_interval_sec,
|
cfg.all_server_cache_update_interval_sec,
|
||||||
|
|||||||
@ -221,6 +221,21 @@ private:
|
|||||||
K_(part_count));
|
K_(part_count));
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class LogFetcherErrHandler : public logfetcher::IObLogErrHandler
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
LogFetcherErrHandler();
|
||||||
|
virtual ~LogFetcherErrHandler() {}
|
||||||
|
public:
|
||||||
|
virtual void handle_error(const int err_no, const char *fmt, ...) override {}
|
||||||
|
virtual void handle_error(const share::ObLSID &ls_id,
|
||||||
|
const ErrType &err_type,
|
||||||
|
share::ObTaskId &trace_id,
|
||||||
|
const palf::LSN &lsn,
|
||||||
|
const int err_no,
|
||||||
|
const char *fmt, ...) override {}
|
||||||
|
};
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool is_inited_;
|
bool is_inited_;
|
||||||
bool is_loading_data_dict_baseline_data_;
|
bool is_loading_data_dict_baseline_data_;
|
||||||
|
|||||||
@ -544,6 +544,8 @@ void FetchLogARpc::stop()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ERRSIM_POINT_DEF(ERRSIM_FETCH_LOG_TIME_OUT);
|
||||||
|
ERRSIM_POINT_DEF(ERRSIM_FETCH_LOG_RESP_ERROR);
|
||||||
int FetchLogARpc::next_result(FetchLogARpcResult *&result, bool &rpc_is_flying)
|
int FetchLogARpc::next_result(FetchLogARpcResult *&result, bool &rpc_is_flying)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -581,7 +583,10 @@ int FetchLogARpc::next_result(FetchLogARpcResult *&result, bool &rpc_is_flying)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (OB_SUCC(ret) && (ERRSIM_FETCH_LOG_TIME_OUT || ERRSIM_FETCH_LOG_RESP_ERROR)) {
|
||||||
|
process_errsim_code_(result);
|
||||||
|
LOG_TRACE("process errsim code");
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -876,6 +881,20 @@ void FetchLogARpc::print_handle_info_(RpcRequest &rpc_req,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void FetchLogARpc::process_errsim_code_(FetchLogARpcResult *result)
|
||||||
|
{
|
||||||
|
if (ERRSIM_FETCH_LOG_TIME_OUT) {
|
||||||
|
result->rcode_.rcode_ = ERRSIM_FETCH_LOG_TIME_OUT;
|
||||||
|
result->trace_id_ = *ObCurTraceId::get_trace_id();
|
||||||
|
LOG_TRACE("errsim fetch log time out", K(result));
|
||||||
|
}
|
||||||
|
if (ERRSIM_FETCH_LOG_RESP_ERROR) {
|
||||||
|
result->resp_.set_err(ERRSIM_FETCH_LOG_RESP_ERROR);
|
||||||
|
result->trace_id_ = *ObCurTraceId::get_trace_id();
|
||||||
|
LOG_TRACE("errsim fetch log resp error", K(result));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int FetchLogARpc::launch_async_rpc_(RpcRequest &rpc_req,
|
int FetchLogARpc::launch_async_rpc_(RpcRequest &rpc_req,
|
||||||
const palf::LSN &req_start_lsn,
|
const palf::LSN &req_start_lsn,
|
||||||
const int64_t progress,
|
const int64_t progress,
|
||||||
|
|||||||
@ -285,6 +285,7 @@ private:
|
|||||||
const bool need_stop_rpc,
|
const bool need_stop_rpc,
|
||||||
const RpcStopReason rpc_stop_reason,
|
const RpcStopReason rpc_stop_reason,
|
||||||
const bool need_dispatch_stream_task);
|
const bool need_dispatch_stream_task);
|
||||||
|
void process_errsim_code_(FetchLogARpcResult *result);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
////////////////////////////// RpcCB //////////////////////////////
|
////////////////////////////// RpcCB //////////////////////////////
|
||||||
|
|||||||
@ -80,7 +80,7 @@ int ObLogFetcher::init(
|
|||||||
ObILogFetcherLSCtxAddInfoFactory &ls_ctx_add_info_factory,
|
ObILogFetcherLSCtxAddInfoFactory &ls_ctx_add_info_factory,
|
||||||
ILogFetcherHandler &log_handler,
|
ILogFetcherHandler &log_handler,
|
||||||
ObISQLClient *proxy,
|
ObISQLClient *proxy,
|
||||||
IObLogErrHandler *err_handler)
|
logfetcher::IObLogErrHandler *err_handler)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
@ -103,6 +103,7 @@ int ObLogFetcher::init(
|
|||||||
prefer_region,
|
prefer_region,
|
||||||
cluster_id,
|
cluster_id,
|
||||||
false/*is_across_cluster*/,
|
false/*is_across_cluster*/,
|
||||||
|
err_handler,
|
||||||
cfg.server_blacklist.str(),
|
cfg.server_blacklist.str(),
|
||||||
cfg.log_router_background_refresh_interval_sec,
|
cfg.log_router_background_refresh_interval_sec,
|
||||||
cfg.all_server_cache_update_interval_sec,
|
cfg.all_server_cache_update_interval_sec,
|
||||||
@ -360,7 +361,7 @@ int ObLogFetcher::add_ls(
|
|||||||
}
|
}
|
||||||
// Push LS into ObLogLSFetchMgr
|
// Push LS into ObLogLSFetchMgr
|
||||||
else if (OB_FAIL(ls_fetch_mgr_.add_ls(tls_id, start_parameters, is_loading_data_dict_baseline_data_,
|
else if (OB_FAIL(ls_fetch_mgr_.add_ls(tls_id, start_parameters, is_loading_data_dict_baseline_data_,
|
||||||
fetching_mode_, archive_dest_))) {
|
fetching_mode_, archive_dest_, *err_handler_))) {
|
||||||
LOG_ERROR("add partition by part fetch mgr fail", KR(ret), K(tls_id), K(start_parameters),
|
LOG_ERROR("add partition by part fetch mgr fail", KR(ret), K(tls_id), K(start_parameters),
|
||||||
K(is_loading_data_dict_baseline_data_));
|
K(is_loading_data_dict_baseline_data_));
|
||||||
} else if (OB_FAIL(ls_fetch_mgr_.get_ls_fetch_ctx(tls_id, ls_fetch_ctx))) {
|
} else if (OB_FAIL(ls_fetch_mgr_.get_ls_fetch_ctx(tls_id, ls_fetch_ctx))) {
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
|
|
||||||
#include "share/ob_define.h"
|
#include "share/ob_define.h"
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
#include "src/logservice/palf/lsn.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
@ -40,6 +41,7 @@ public:
|
|||||||
virtual void handle_error(const share::ObLSID &ls_id,
|
virtual void handle_error(const share::ObLSID &ls_id,
|
||||||
const ErrType &err_type,
|
const ErrType &err_type,
|
||||||
share::ObTaskId &trace_id,
|
share::ObTaskId &trace_id,
|
||||||
|
const palf::LSN &lsn,
|
||||||
const int err_no,
|
const int err_no,
|
||||||
const char *fmt, ...) = 0;
|
const char *fmt, ...) = 0;
|
||||||
};
|
};
|
||||||
|
|||||||
@ -120,6 +120,7 @@ void LSFetchCtx::reset()
|
|||||||
data_dict_iterator_.reset();
|
data_dict_iterator_.reset();
|
||||||
fetched_log_size_ = 0;
|
fetched_log_size_ = 0;
|
||||||
ctx_desc_.reset();
|
ctx_desc_.reset();
|
||||||
|
err_handler_ = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int LSFetchCtx::init(
|
int LSFetchCtx::init(
|
||||||
@ -129,7 +130,8 @@ int LSFetchCtx::init(
|
|||||||
const int64_t progress_id,
|
const int64_t progress_id,
|
||||||
const ClientFetchingMode fetching_mode,
|
const ClientFetchingMode fetching_mode,
|
||||||
const ObBackupPathString &archive_dest_str,
|
const ObBackupPathString &archive_dest_str,
|
||||||
ObILogFetcherLSCtxAddInfo &ls_ctx_add_info)
|
ObILogFetcherLSCtxAddInfo &ls_ctx_add_info,
|
||||||
|
IObLogErrHandler &err_handler)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
const int64_t start_tstamp_ns = start_parameters.get_start_tstamp_ns();
|
const int64_t start_tstamp_ns = start_parameters.get_start_tstamp_ns();
|
||||||
@ -151,6 +153,7 @@ int LSFetchCtx::init(
|
|||||||
progress_.reset(start_lsn, start_tstamp_ns);
|
progress_.reset(start_lsn, start_tstamp_ns);
|
||||||
start_parameters_ = start_parameters;
|
start_parameters_ = start_parameters;
|
||||||
fetched_log_size_ = 0;
|
fetched_log_size_ = 0;
|
||||||
|
err_handler_ = &err_handler;
|
||||||
|
|
||||||
if (start_lsn.is_valid()) {
|
if (start_lsn.is_valid()) {
|
||||||
// LSN is valid, init mem_storage; otherwise after need locate start_lsn success, we can init mem_storage
|
// LSN is valid, init mem_storage; otherwise after need locate start_lsn success, we can init mem_storage
|
||||||
@ -962,6 +965,18 @@ bool LSFetchCtx::need_switch_server(const common::ObAddr &cur_svr)
|
|||||||
return bool_ret;
|
return bool_ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void LSFetchCtx::handle_error(const share::ObLSID &ls_id,
|
||||||
|
const IObLogErrHandler::ErrType &err_type,
|
||||||
|
share::ObTaskId &trace_id,
|
||||||
|
const palf::LSN &lsn,
|
||||||
|
const int err_no,
|
||||||
|
const char *fmt, ...)
|
||||||
|
{
|
||||||
|
if (OB_NOT_NULL(err_handler_)) {
|
||||||
|
err_handler_->handle_error(ls_id, err_type, trace_id, lsn, err_no, fmt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/////////////////////////////////// LSProgress ///////////////////////////////////
|
/////////////////////////////////// LSProgress ///////////////////////////////////
|
||||||
|
|
||||||
void LSFetchCtx::LSProgress::reset()
|
void LSFetchCtx::LSProgress::reset()
|
||||||
|
|||||||
@ -35,6 +35,7 @@
|
|||||||
#include "ob_log_fetcher_ls_ctx_additional_info.h" // PartTransDispatchInfo
|
#include "ob_log_fetcher_ls_ctx_additional_info.h" // PartTransDispatchInfo
|
||||||
#include "logservice/common_util/ob_log_ls_define.h" // logservice::TenantLSID
|
#include "logservice/common_util/ob_log_ls_define.h" // logservice::TenantLSID
|
||||||
#include "ob_log_fetcher_start_parameters.h" // ObLogFetcherStartParameters
|
#include "ob_log_fetcher_start_parameters.h" // ObLogFetcherStartParameters
|
||||||
|
#include "logservice/logfetcher/ob_log_fetcher_err_handler.h" // IObLogErrHandler
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
@ -92,7 +93,8 @@ public:
|
|||||||
const int64_t progress_id,
|
const int64_t progress_id,
|
||||||
const ClientFetchingMode fetching_mode,
|
const ClientFetchingMode fetching_mode,
|
||||||
const ObBackupPathString &archive_dest_str,
|
const ObBackupPathString &archive_dest_str,
|
||||||
ObILogFetcherLSCtxAddInfo &ls_ctx_add_info);
|
ObILogFetcherLSCtxAddInfo &ls_ctx_add_info,
|
||||||
|
IObLogErrHandler &err_handler);
|
||||||
|
|
||||||
void set_host(IObLogLSFetchMgr &ls_fetch_mgr) { ls_fetch_mgr_ = &ls_fetch_mgr; }
|
void set_host(IObLogLSFetchMgr &ls_fetch_mgr) { ls_fetch_mgr_ = &ls_fetch_mgr; }
|
||||||
|
|
||||||
@ -288,6 +290,13 @@ public:
|
|||||||
|
|
||||||
int64_t get_proposal_id() const { return start_parameters_.get_proposal_id(); }
|
int64_t get_proposal_id() const { return start_parameters_.get_proposal_id(); }
|
||||||
|
|
||||||
|
void handle_error(const share::ObLSID &ls_id,
|
||||||
|
const IObLogErrHandler::ErrType &err_type,
|
||||||
|
share::ObTaskId &trace_id,
|
||||||
|
const palf::LSN &lsn,
|
||||||
|
const int err_no,
|
||||||
|
const char *fmt, ...);
|
||||||
|
|
||||||
// Internal member functions
|
// Internal member functions
|
||||||
private:
|
private:
|
||||||
static const int64_t SERVER_LIST_UPDATE_INTERVAL_SEC = 5 * _SEC_;
|
static const int64_t SERVER_LIST_UPDATE_INTERVAL_SEC = 5 * _SEC_;
|
||||||
@ -540,6 +549,9 @@ protected:
|
|||||||
// extent description of LSFetchCtx
|
// extent description of LSFetchCtx
|
||||||
LSFetchCtxDesc ctx_desc_;
|
LSFetchCtxDesc ctx_desc_;
|
||||||
|
|
||||||
|
// Log fetcher error handler
|
||||||
|
IObLogErrHandler *err_handler_;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
DISALLOW_COPY_AND_ASSIGN(LSFetchCtx);
|
DISALLOW_COPY_AND_ASSIGN(LSFetchCtx);
|
||||||
};
|
};
|
||||||
|
|||||||
@ -111,7 +111,8 @@ int ObLogLSFetchMgr::add_ls(
|
|||||||
const ObLogFetcherStartParameters &start_parameters,
|
const ObLogFetcherStartParameters &start_parameters,
|
||||||
const bool is_loading_data_dict_baseline_data,
|
const bool is_loading_data_dict_baseline_data,
|
||||||
const ClientFetchingMode fetching_mode,
|
const ClientFetchingMode fetching_mode,
|
||||||
const ObBackupPathString &archive_dest_str)
|
const ObBackupPathString &archive_dest_str,
|
||||||
|
IObLogErrHandler &err_handler)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
LSFetchCtx *ctx = NULL;
|
LSFetchCtx *ctx = NULL;
|
||||||
@ -157,7 +158,7 @@ int ObLogLSFetchMgr::add_ls(
|
|||||||
LOG_ERROR("acquire_progress fail", KR(ret), K(start_tstamp_ns));
|
LOG_ERROR("acquire_progress fail", KR(ret), K(start_tstamp_ns));
|
||||||
// init LSFetchCtx
|
// init LSFetchCtx
|
||||||
} else if (OB_FAIL(ctx->init(tls_id, start_parameters, is_loading_data_dict_baseline_data,
|
} else if (OB_FAIL(ctx->init(tls_id, start_parameters, is_loading_data_dict_baseline_data,
|
||||||
progress_id, fetching_mode, archive_dest_str, *ls_ctx_add_info))) {
|
progress_id, fetching_mode, archive_dest_str, *ls_ctx_add_info, err_handler))) {
|
||||||
LOG_ERROR("ctx init fail", KR(ret), K(tls_id), K(start_tstamp_ns), K(start_lsn), K(progress_id));
|
LOG_ERROR("ctx init fail", KR(ret), K(tls_id), K(start_tstamp_ns), K(start_lsn), K(progress_id));
|
||||||
} else {
|
} else {
|
||||||
ctx->set_host(*this);
|
ctx->set_host(*this);
|
||||||
|
|||||||
@ -43,7 +43,8 @@ public:
|
|||||||
const ObLogFetcherStartParameters &start_parameters,
|
const ObLogFetcherStartParameters &start_parameters,
|
||||||
const bool is_loading_data_dict_baseline_data,
|
const bool is_loading_data_dict_baseline_data,
|
||||||
const ClientFetchingMode fetching_mode,
|
const ClientFetchingMode fetching_mode,
|
||||||
const ObBackupPathString &archive_dest_str) = 0;
|
const ObBackupPathString &archive_dest_str,
|
||||||
|
IObLogErrHandler &err_handler) = 0;
|
||||||
|
|
||||||
/// recycle a LS
|
/// recycle a LS
|
||||||
/// mark LS deleted and begin recycle resource
|
/// mark LS deleted and begin recycle resource
|
||||||
@ -97,7 +98,8 @@ public:
|
|||||||
const ObLogFetcherStartParameters &start_parameters,
|
const ObLogFetcherStartParameters &start_parameters,
|
||||||
const bool is_loading_data_dict_baseline_data,
|
const bool is_loading_data_dict_baseline_data,
|
||||||
const ClientFetchingMode fetching_mode,
|
const ClientFetchingMode fetching_mode,
|
||||||
const ObBackupPathString &archive_dest_str);
|
const ObBackupPathString &archive_dest_str,
|
||||||
|
IObLogErrHandler &err_handler);
|
||||||
virtual int recycle_ls(const logservice::TenantLSID &tls_id);
|
virtual int recycle_ls(const logservice::TenantLSID &tls_id);
|
||||||
virtual int remove_ls(const logservice::TenantLSID &tls_id);
|
virtual int remove_ls(const logservice::TenantLSID &tls_id);
|
||||||
virtual int get_ls_fetch_ctx(const logservice::TenantLSID &tls_id, LSFetchCtx *&ctx);
|
virtual int get_ls_fetch_ctx(const logservice::TenantLSID &tls_id, LSFetchCtx *&ctx);
|
||||||
|
|||||||
@ -835,6 +835,7 @@ int FetchStream::read_group_entry_(
|
|||||||
volatile bool &stop_flag)
|
volatile bool &stop_flag)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
ObTaskId trace_id(*ObCurTraceId::get_trace_id());
|
||||||
|
|
||||||
if (OB_ISNULL(ls_fetch_ctx_) || OB_ISNULL(log_handler_)) {
|
if (OB_ISNULL(ls_fetch_ctx_) || OB_ISNULL(log_handler_)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
@ -853,6 +854,9 @@ int FetchStream::read_group_entry_(
|
|||||||
} else if (OB_IN_STOP_STATE != ret) {
|
} else if (OB_IN_STOP_STATE != ret) {
|
||||||
LOG_ERROR("LogHander handle_group_entry failed", KR(ret), K(tenant_id), K(ls_id), K(proposal_id),
|
LOG_ERROR("LogHander handle_group_entry failed", KR(ret), K(tenant_id), K(ls_id), K(proposal_id),
|
||||||
K(group_start_lsn), K(group_entry));
|
K(group_start_lsn), K(group_entry));
|
||||||
|
if (OB_NOT_NULL(ls_fetch_ctx_)) {
|
||||||
|
ls_fetch_ctx_->handle_error(ls_id, IObLogErrHandler::ErrType::SUBMIT_LOG, trace_id, group_start_lsn, ret, "%s");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1217,21 +1221,39 @@ int FetchStream::handle_fetch_log_error_(
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
bool need_kick_out = false;
|
bool need_kick_out = false;
|
||||||
KickOutReason kick_out_reason = NONE;
|
KickOutReason kick_out_reason = NONE;
|
||||||
|
ObTaskId trace_id(*ObCurTraceId::get_trace_id());
|
||||||
|
|
||||||
// RPC failure, need switch server
|
// RPC failure, need switch server
|
||||||
if (OB_SUCCESS != rcode.rcode_) {
|
if (OB_SUCCESS != rcode.rcode_) {
|
||||||
need_kick_out = true;
|
need_kick_out = true;
|
||||||
kick_out_reason = FETCH_LOG_FAIL_ON_RPC;
|
kick_out_reason = FETCH_LOG_FAIL_ON_RPC;
|
||||||
|
if (OB_NOT_NULL(ls_fetch_ctx_)) {
|
||||||
|
ls_fetch_ctx_->handle_error(ls_fetch_ctx_->get_tls_id().get_ls_id(),
|
||||||
|
IObLogErrHandler::ErrType::FETCH_LOG,
|
||||||
|
trace_id,
|
||||||
|
ls_fetch_ctx_->get_next_lsn(),
|
||||||
|
rcode.rcode_,
|
||||||
|
"%s");
|
||||||
LOG_ERROR("fetch log fail on rpc, need_switch_server", K(svr_), K(rcode), "fetch_stream", this);
|
LOG_ERROR("fetch log fail on rpc, need_switch_server", K(svr_), K(rcode), "fetch_stream", this);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// server return error
|
// server return error
|
||||||
else if (OB_SUCCESS != resp.get_err()) {
|
else if (OB_SUCCESS != resp.get_err()) {
|
||||||
// Other errors, switch server directly
|
// Other errors, switch server directly
|
||||||
need_kick_out = true;
|
need_kick_out = true;
|
||||||
kick_out_reason = FETCH_LOG_FAIL_ON_SERVER;
|
kick_out_reason = FETCH_LOG_FAIL_ON_SERVER;
|
||||||
LOG_ERROR("fetch log fail on server, need_switch_server", "fetch_stream", this, K(svr_),
|
if (OB_NOT_NULL(ls_fetch_ctx_)) {
|
||||||
"svr_err", resp.get_err(), "svr_debug_err", resp.get_debug_err(),
|
ls_fetch_ctx_->handle_error(ls_fetch_ctx_->get_tls_id().get_ls_id(),
|
||||||
K(rcode), K(resp));
|
IObLogErrHandler::ErrType::FETCH_LOG,
|
||||||
|
trace_id,
|
||||||
|
ls_fetch_ctx_->get_next_lsn(),
|
||||||
|
resp.get_err(),
|
||||||
|
"%s");
|
||||||
|
LOG_ERROR("fetch log fail on server, need_switch_server", "fetch_stream", this, K(svr_),
|
||||||
|
"svr_err", resp.get_err(), "svr_debug_err", resp.get_debug_err(),
|
||||||
|
K(rcode), K(resp));
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
need_kick_out = false;
|
need_kick_out = false;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -39,6 +39,7 @@ ObLogRouteService::ObLogRouteService() :
|
|||||||
all_svr_cache_(),
|
all_svr_cache_(),
|
||||||
ls_route_timer_task_(*this),
|
ls_route_timer_task_(*this),
|
||||||
timer_(),
|
timer_(),
|
||||||
|
err_handler_(NULL),
|
||||||
timer_id_(-1),
|
timer_id_(-1),
|
||||||
tg_id_(-1),
|
tg_id_(-1),
|
||||||
background_refresh_time_sec_(0),
|
background_refresh_time_sec_(0),
|
||||||
@ -59,6 +60,7 @@ int ObLogRouteService::init(ObISQLClient *proxy,
|
|||||||
const common::ObRegion &prefer_region,
|
const common::ObRegion &prefer_region,
|
||||||
const int64_t cluster_id,
|
const int64_t cluster_id,
|
||||||
const bool is_across_cluster,
|
const bool is_across_cluster,
|
||||||
|
logfetcher::IObLogErrHandler *err_handler,
|
||||||
const char *external_server_blacklist,
|
const char *external_server_blacklist,
|
||||||
const int64_t background_refresh_time_sec,
|
const int64_t background_refresh_time_sec,
|
||||||
const int64_t all_server_cache_update_interval_sec,
|
const int64_t all_server_cache_update_interval_sec,
|
||||||
@ -96,7 +98,7 @@ int ObLogRouteService::init(ObISQLClient *proxy,
|
|||||||
} else if (OB_FAIL(svr_blacklist_.init(external_server_blacklist, false/*is_sql_server*/))) {
|
} else if (OB_FAIL(svr_blacklist_.init(external_server_blacklist, false/*is_sql_server*/))) {
|
||||||
LOG_WARN("ObLogSvrBlacklist init failed", KR(ret), K(cluster_id), K(is_across_cluster),
|
LOG_WARN("ObLogSvrBlacklist init failed", KR(ret), K(cluster_id), K(is_across_cluster),
|
||||||
K(external_server_blacklist));
|
K(external_server_blacklist));
|
||||||
} else if (OB_FAIL(systable_queryer_.init(cluster_id, is_across_cluster, *proxy))) {
|
} else if (OB_FAIL(systable_queryer_.init(cluster_id, is_across_cluster, *proxy, err_handler))) {
|
||||||
LOG_WARN("systable_queryer_ init failed", KR(ret), K(cluster_id), K(is_across_cluster));
|
LOG_WARN("systable_queryer_ init failed", KR(ret), K(cluster_id), K(is_across_cluster));
|
||||||
} else if (OB_FAIL(all_svr_cache_.init(systable_queryer_, prefer_region,
|
} else if (OB_FAIL(all_svr_cache_.init(systable_queryer_, prefer_region,
|
||||||
all_server_cache_update_interval_sec, all_zone_cache_update_interval_sec))) {
|
all_server_cache_update_interval_sec, all_zone_cache_update_interval_sec))) {
|
||||||
@ -196,6 +198,7 @@ void ObLogRouteService::destroy()
|
|||||||
systable_queryer_.destroy();
|
systable_queryer_.destroy();
|
||||||
all_svr_cache_.destroy();
|
all_svr_cache_.destroy();
|
||||||
svr_blacklist_.destroy();
|
svr_blacklist_.destroy();
|
||||||
|
err_handler_ = NULL;
|
||||||
|
|
||||||
cluster_id_ = OB_INVALID_CLUSTER_ID;
|
cluster_id_ = OB_INVALID_CLUSTER_ID;
|
||||||
background_refresh_time_sec_ = 0;
|
background_refresh_time_sec_ = 0;
|
||||||
|
|||||||
@ -75,6 +75,7 @@ public:
|
|||||||
const common::ObRegion &prefer_region,
|
const common::ObRegion &prefer_region,
|
||||||
const int64_t cluster_id,
|
const int64_t cluster_id,
|
||||||
const bool is_across_cluster,
|
const bool is_across_cluster,
|
||||||
|
logfetcher::IObLogErrHandler *err_handler,
|
||||||
const char *external_server_blacklist = "|",
|
const char *external_server_blacklist = "|",
|
||||||
const int64_t background_refresh_time_sec = 10,
|
const int64_t background_refresh_time_sec = 10,
|
||||||
const int64_t all_server_cache_update_interval_sec = 5,
|
const int64_t all_server_cache_update_interval_sec = 5,
|
||||||
@ -361,6 +362,7 @@ private:
|
|||||||
ObLogAllSvrCache all_svr_cache_;
|
ObLogAllSvrCache all_svr_cache_;
|
||||||
ObLSRouteTimerTask ls_route_timer_task_;
|
ObLSRouteTimerTask ls_route_timer_task_;
|
||||||
common::ObTimer timer_;
|
common::ObTimer timer_;
|
||||||
|
logfetcher::IObLogErrHandler *err_handler_;
|
||||||
int timer_id_;
|
int timer_id_;
|
||||||
int tg_id_;
|
int tg_id_;
|
||||||
int64_t background_refresh_time_sec_;
|
int64_t background_refresh_time_sec_;
|
||||||
|
|||||||
@ -17,6 +17,7 @@
|
|||||||
#include "lib/mysqlclient/ob_mysql_result.h" // ObMySQLResult
|
#include "lib/mysqlclient/ob_mysql_result.h" // ObMySQLResult
|
||||||
#include "lib/string/ob_sql_string.h" // ObSqlString
|
#include "lib/string/ob_sql_string.h" // ObSqlString
|
||||||
#include "share/inner_table/ob_inner_table_schema_constants.h" // OB_***_TNAME
|
#include "share/inner_table/ob_inner_table_schema_constants.h" // OB_***_TNAME
|
||||||
|
#include "lib/utility/ob_tracepoint.h"
|
||||||
|
|
||||||
using namespace oceanbase::share;
|
using namespace oceanbase::share;
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
@ -27,7 +28,8 @@ ObLogSysTableQueryer::ObLogSysTableQueryer() :
|
|||||||
is_inited_(false),
|
is_inited_(false),
|
||||||
is_across_cluster_(false),
|
is_across_cluster_(false),
|
||||||
cluster_id_(OB_INVALID_CLUSTER_ID),
|
cluster_id_(OB_INVALID_CLUSTER_ID),
|
||||||
sql_proxy_(NULL)
|
sql_proxy_(NULL),
|
||||||
|
err_handler_(NULL)
|
||||||
{
|
{
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -39,7 +41,8 @@ ObLogSysTableQueryer::~ObLogSysTableQueryer()
|
|||||||
|
|
||||||
int ObLogSysTableQueryer::init(const int64_t cluster_id,
|
int ObLogSysTableQueryer::init(const int64_t cluster_id,
|
||||||
const bool is_across_cluster,
|
const bool is_across_cluster,
|
||||||
common::ObISQLClient &sql_proxy)
|
common::ObISQLClient &sql_proxy,
|
||||||
|
logfetcher::IObLogErrHandler *err_handler)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
|
||||||
@ -51,6 +54,7 @@ int ObLogSysTableQueryer::init(const int64_t cluster_id,
|
|||||||
cluster_id_ = cluster_id;
|
cluster_id_ = cluster_id;
|
||||||
sql_proxy_ = &sql_proxy;
|
sql_proxy_ = &sql_proxy;
|
||||||
is_inited_ = true;
|
is_inited_ = true;
|
||||||
|
err_handler_ = err_handler;
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
@ -62,6 +66,7 @@ void ObLogSysTableQueryer::destroy()
|
|||||||
is_across_cluster_ = false;
|
is_across_cluster_ = false;
|
||||||
cluster_id_ = OB_INVALID_CLUSTER_ID;
|
cluster_id_ = OB_INVALID_CLUSTER_ID;
|
||||||
sql_proxy_ = NULL;
|
sql_proxy_ = NULL;
|
||||||
|
err_handler_ = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObLogSysTableQueryer::get_ls_log_info(
|
int ObLogSysTableQueryer::get_ls_log_info(
|
||||||
@ -232,11 +237,13 @@ int ObLogSysTableQueryer::get_all_zone_type_info(
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ERRSIM_POINT_DEF(ERRSIM_FETCH_LOG_SYS_QUERY_FAILED);
|
||||||
int ObLogSysTableQueryer::do_query_(const uint64_t tenant_id,
|
int ObLogSysTableQueryer::do_query_(const uint64_t tenant_id,
|
||||||
ObSqlString &sql,
|
ObSqlString &sql,
|
||||||
ObISQLClient::ReadResult &result)
|
ObISQLClient::ReadResult &result)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
ObTaskId trace_id(*ObCurTraceId::get_trace_id());
|
||||||
|
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
@ -253,6 +260,14 @@ int ObLogSysTableQueryer::do_query_(const uint64_t tenant_id,
|
|||||||
LOG_WARN("execute sql failed", KR(ret), K(tenant_id), "sql", sql.ptr());
|
LOG_WARN("execute sql failed", KR(ret), K(tenant_id), "sql", sql.ptr());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (OB_SUCC(ret) && ERRSIM_FETCH_LOG_SYS_QUERY_FAILED) {
|
||||||
|
ret = ERRSIM_FETCH_LOG_SYS_QUERY_FAILED;
|
||||||
|
LOG_WARN("errsim do query error", K(ERRSIM_FETCH_LOG_SYS_QUERY_FAILED));
|
||||||
|
}
|
||||||
|
if (OB_NOT_NULL(err_handler_) && (-ER_CONNECT_FAILED == ret || -ER_ACCESS_DENIED_ERROR == ret)) {
|
||||||
|
err_handler_->handle_error(share::SYS_LS, logfetcher::IObLogErrHandler::ErrType::FETCH_LOG, trace_id,
|
||||||
|
palf::LSN(palf::LOG_INVALID_LSN_VAL)/*no need to pass lsn*/, ret, "%s");
|
||||||
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
#include "ob_ls_log_stat_info.h" // ObLSLogInfo
|
#include "ob_ls_log_stat_info.h" // ObLSLogInfo
|
||||||
#include "ob_all_server_info.h" // ObAllServerInfo
|
#include "ob_all_server_info.h" // ObAllServerInfo
|
||||||
#include "ob_all_zone_info.h" // ObAllZoneInfo, ObAllZoneTypeInfo
|
#include "ob_all_zone_info.h" // ObAllZoneInfo, ObAllZoneTypeInfo
|
||||||
|
#include "src/logservice/logfetcher/ob_log_fetcher_err_handler.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
@ -37,7 +38,8 @@ public:
|
|||||||
virtual ~ObLogSysTableQueryer();
|
virtual ~ObLogSysTableQueryer();
|
||||||
int init(const int64_t cluster_id,
|
int init(const int64_t cluster_id,
|
||||||
const bool is_across_cluster,
|
const bool is_across_cluster,
|
||||||
common::ObISQLClient &sql_proxy);
|
common::ObISQLClient &sql_proxy,
|
||||||
|
logfetcher::IObLogErrHandler *err_handler);
|
||||||
bool is_inited() const { return is_inited_; }
|
bool is_inited() const { return is_inited_; }
|
||||||
void destroy();
|
void destroy();
|
||||||
|
|
||||||
@ -99,6 +101,7 @@ private:
|
|||||||
bool is_across_cluster_; // whether the SQL query across cluster
|
bool is_across_cluster_; // whether the SQL query across cluster
|
||||||
int64_t cluster_id_; // ClusterID
|
int64_t cluster_id_; // ClusterID
|
||||||
common::ObISQLClient *sql_proxy_; // sql_proxy to use
|
common::ObISQLClient *sql_proxy_; // sql_proxy to use
|
||||||
|
logfetcher::IObLogErrHandler *err_handler_;
|
||||||
|
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObLogSysTableQueryer);
|
DISALLOW_COPY_AND_ASSIGN(ObLogSysTableQueryer);
|
||||||
};
|
};
|
||||||
|
|||||||
@ -28,6 +28,7 @@ void ObLogRestoreErrorContext::reset()
|
|||||||
{
|
{
|
||||||
ret_code_ = OB_SUCCESS;
|
ret_code_ = OB_SUCCESS;
|
||||||
trace_id_.reset();
|
trace_id_.reset();
|
||||||
|
err_lsn_ = palf::LSN(palf::LOG_INVALID_LSN_VAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
ObLogRestoreErrorContext &ObLogRestoreErrorContext::operator=(const ObLogRestoreErrorContext &other)
|
ObLogRestoreErrorContext &ObLogRestoreErrorContext::operator=(const ObLogRestoreErrorContext &other)
|
||||||
|
|||||||
@ -41,11 +41,12 @@ struct ObLogRestoreErrorContext
|
|||||||
ErrorType error_type_;
|
ErrorType error_type_;
|
||||||
int ret_code_;
|
int ret_code_;
|
||||||
share::ObTaskId trace_id_;
|
share::ObTaskId trace_id_;
|
||||||
|
palf::LSN err_lsn_;
|
||||||
ObLogRestoreErrorContext() { reset(); }
|
ObLogRestoreErrorContext() { reset(); }
|
||||||
virtual ~ObLogRestoreErrorContext() { reset(); }
|
virtual ~ObLogRestoreErrorContext() { reset(); }
|
||||||
void reset();
|
void reset();
|
||||||
ObLogRestoreErrorContext &operator=(const ObLogRestoreErrorContext &other);
|
ObLogRestoreErrorContext &operator=(const ObLogRestoreErrorContext &other);
|
||||||
TO_STRING_KV(K_(ret_code), K_(trace_id));
|
TO_STRING_KV(K_(ret_code), K_(trace_id), K_(error_type), K_(err_lsn));
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ObRestoreLogContext
|
struct ObRestoreLogContext
|
||||||
|
|||||||
@ -52,9 +52,12 @@ const char *type_str[static_cast<int>(RestoreSyncStatus::MAX_RESTORE_SYNC_STATUS
|
|||||||
"Invalid restore status",
|
"Invalid restore status",
|
||||||
" ",
|
" ",
|
||||||
"There is a gap between the log source and standby",
|
"There is a gap between the log source and standby",
|
||||||
"Log conflicts, the standby with the same LSN is different from the log source",
|
"Log conflicts, the standby with the same LSN is different from the log source when submit log",
|
||||||
"Log source cannot be accessed, the user name or password of the replication account may be incorrect",
|
"Log conflicts, the standby with the same LSN is different from the log source when fetch log",
|
||||||
|
"Log source can not be accessed, the replication account may be incorrect or the privelege is insufficient",
|
||||||
"Log source is unreachable, the log source access point may be unavailable",
|
"Log source is unreachable, the log source access point may be unavailable",
|
||||||
|
"Fetch log time out",
|
||||||
|
"Restore suspend, the standby has synchronized to recovery until scn",
|
||||||
"Unexpected exceptions",
|
"Unexpected exceptions",
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -310,6 +313,7 @@ int ObLogRestoreHandler::clean_source()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ERRSIM_POINT_DEF(ERRSIM_SUBMIT_LOG_ERROR);
|
||||||
int ObLogRestoreHandler::raw_write(const int64_t proposal_id,
|
int ObLogRestoreHandler::raw_write(const int64_t proposal_id,
|
||||||
const palf::LSN &lsn,
|
const palf::LSN &lsn,
|
||||||
const SCN &scn,
|
const SCN &scn,
|
||||||
@ -346,14 +350,20 @@ int ObLogRestoreHandler::raw_write(const int64_t proposal_id,
|
|||||||
CLOG_LOG(INFO, "submit log to end, just skip", K(ret), K(lsn), KPC(this));
|
CLOG_LOG(INFO, "submit log to end, just skip", K(ret), K(lsn), KPC(this));
|
||||||
} else {
|
} else {
|
||||||
opts.proposal_id = proposal_id_;
|
opts.proposal_id = proposal_id_;
|
||||||
ret = palf_handle_.raw_write(opts, lsn, buf, buf_size);
|
// errsim fake error
|
||||||
if (OB_SUCC(ret)) {
|
if (ERRSIM_SUBMIT_LOG_ERROR) {
|
||||||
context_.max_fetch_lsn_ = lsn + buf_size;
|
ret = ERRSIM_SUBMIT_LOG_ERROR;
|
||||||
context_.max_fetch_scn_ = scn;
|
CLOG_LOG(TRACE, "errsim submit log error");
|
||||||
context_.last_fetch_ts_ = ObTimeUtility::fast_current_time();
|
} else {
|
||||||
if (parent_->set_to_end(scn)) {
|
ret = palf_handle_.raw_write(opts, lsn, buf, buf_size);
|
||||||
// To stop and clear all restore log tasks and restore context, reset context and advance issue version
|
if (OB_SUCC(ret)) {
|
||||||
CLOG_LOG(INFO, "restore log to_end succ", KPC(this), KPC(parent_));
|
context_.max_fetch_lsn_ = lsn + buf_size;
|
||||||
|
context_.max_fetch_scn_ = scn;
|
||||||
|
context_.last_fetch_ts_ = ObTimeUtility::fast_current_time();
|
||||||
|
if (parent_->set_to_end(scn)) {
|
||||||
|
// To stop and clear all restore log tasks and restore context, reset context and advance issue version
|
||||||
|
CLOG_LOG(INFO, "restore log to_end succ", KPC(this), KPC(parent_));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -446,8 +456,6 @@ int ObLogRestoreHandler::need_schedule(bool &need_schedule,
|
|||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
} else if (OB_ISNULL(parent_)) {
|
} else if (OB_ISNULL(parent_)) {
|
||||||
// do nothing
|
// do nothing
|
||||||
} else if (OB_SUCCESS != context_.error_context_.ret_code_) {
|
|
||||||
// error exist, no need schedule
|
|
||||||
} else {
|
} else {
|
||||||
need_schedule = is_strong_leader(role_) && ! restore_to_end_unlock_();
|
need_schedule = is_strong_leader(role_) && ! restore_to_end_unlock_();
|
||||||
proposal_id = proposal_id_;
|
proposal_id = proposal_id_;
|
||||||
@ -483,10 +491,11 @@ void ObLogRestoreHandler::mark_error(share::ObTaskId &trace_id,
|
|||||||
CLOG_LOG(WARN, "get end_lsn failed", K(id_));
|
CLOG_LOG(WARN, "get end_lsn failed", K(id_));
|
||||||
} else if (end_lsn < lsn) {
|
} else if (end_lsn < lsn) {
|
||||||
CLOG_LOG(WARN, "end_lsn smaller than error lsn, just skip", K(id_), K(end_lsn), K(lsn), KPC(parent_), KPC(this));
|
CLOG_LOG(WARN, "end_lsn smaller than error lsn, just skip", K(id_), K(end_lsn), K(lsn), KPC(parent_), KPC(this));
|
||||||
} else if (OB_SUCCESS == context_.error_context_.ret_code_) {
|
} else if (OB_SUCCESS == context_.error_context_.ret_code_ || OB_TIMEOUT == context_.error_context_.ret_code_) {
|
||||||
context_.error_context_.error_type_ = error_type;
|
context_.error_context_.error_type_ = error_type;
|
||||||
context_.error_context_.ret_code_ = ret_code;
|
context_.error_context_.ret_code_ = ret_code;
|
||||||
context_.error_context_.trace_id_.set(trace_id);
|
context_.error_context_.trace_id_.set(trace_id);
|
||||||
|
context_.error_context_.err_lsn_ = lsn;
|
||||||
CLOG_LOG(ERROR, "fatal error occur in restore", KPC(parent_), KPC(this));
|
CLOG_LOG(ERROR, "fatal error occur in restore", KPC(parent_), KPC(this));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -495,6 +504,15 @@ int ObLogRestoreHandler::get_restore_error(share::ObTaskId &trace_id, int &ret_c
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
RLockGuard guard(lock_);
|
RLockGuard guard(lock_);
|
||||||
|
if (OB_FAIL(get_restore_error_unlock_(trace_id, ret_code, error_exist))) {
|
||||||
|
CLOG_LOG(ERROR, "fail to get restore error");
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int ObLogRestoreHandler::get_restore_error_unlock_(share::ObTaskId &trace_id, int &ret_code, bool &error_exist)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
error_exist = false;
|
error_exist = false;
|
||||||
if (IS_NOT_INIT) {
|
if (IS_NOT_INIT) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
@ -751,6 +769,25 @@ int ObLogRestoreHandler::diagnose(RestoreDiagnoseInfo &diagnose_info) const
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObLogRestoreHandler::refresh_error_context()
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
palf::LSN end_lsn;
|
||||||
|
WLockGuard guard(lock_);
|
||||||
|
if (IS_NOT_INIT) {
|
||||||
|
ret = OB_NOT_INIT;
|
||||||
|
} else if (! is_strong_leader(role_)) {
|
||||||
|
CLOG_LOG(INFO, "not leader, no need refresh error context", K(id_));
|
||||||
|
} else if (OB_FAIL(palf_handle_.get_end_lsn(end_lsn))) {
|
||||||
|
CLOG_LOG(WARN, "get end_lsn failed", K(id_));
|
||||||
|
} else if (end_lsn > context_.error_context_.err_lsn_ && OB_SUCCESS != context_.error_context_.ret_code_) {
|
||||||
|
context_.error_context_.ret_code_ = OB_SUCCESS;
|
||||||
|
context_.error_context_.err_lsn_ = palf::LSN(palf::LOG_INVALID_LSN_VAL);
|
||||||
|
CLOG_LOG(INFO, "flush error context to success", K(id_), K(context_), K(end_lsn), KPC(parent_), KPC(this));
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
bool ObLogRestoreHandler::restore_to_end() const
|
bool ObLogRestoreHandler::restore_to_end() const
|
||||||
{
|
{
|
||||||
RLockGuard guard(lock_);
|
RLockGuard guard(lock_);
|
||||||
@ -863,16 +900,19 @@ int ObLogRestoreHandler::get_ls_restore_status_info(RestoreStatusInfo &restore_s
|
|||||||
CLOG_LOG(WARN, "fail to get end lsn when get ls restore status info");
|
CLOG_LOG(WARN, "fail to get end lsn when get ls restore status info");
|
||||||
} else if (OB_FAIL(palf_handle_.get_end_scn(scn))) {
|
} else if (OB_FAIL(palf_handle_.get_end_scn(scn))) {
|
||||||
CLOG_LOG(WARN, "fail to get end scn");
|
CLOG_LOG(WARN, "fail to get end scn");
|
||||||
} else if (OB_FAIL(get_restore_error(trace_id, ret_code, error_exist))) {
|
} else if (OB_FAIL(get_restore_error_unlock_(trace_id, ret_code, error_exist))) {
|
||||||
CLOG_LOG(WARN, "fail to get restore error");
|
CLOG_LOG(WARN, "fail to get restore error");
|
||||||
} else if (error_exist) {
|
} else if (error_exist) {
|
||||||
CLOG_LOG(TRACE, "start to mark restore sync error", K(trace_id), K(ret_code), K(context_.error_context_.error_type_));
|
CLOG_LOG(TRACE, "start to mark restore sync error", K(trace_id), K(ret_code), K(context_.error_context_.error_type_));
|
||||||
if (OB_FAIL(get_err_code_and_message_(ret_code, context_.error_context_.error_type_, sync_status, restore_status_info.comment_))) {
|
if (OB_FAIL(get_err_code_and_message_(ret_code, context_.error_context_.error_type_, sync_status))) {
|
||||||
CLOG_LOG(WARN, "fail to get err code and message", K(ret_code), K(context_.error_context_.error_type_), K(sync_status));
|
CLOG_LOG(WARN, "fail to get err code and message", K(ret_code), K(context_.error_context_.error_type_), K(sync_status));
|
||||||
} else {
|
} else {
|
||||||
restore_status_info.sync_status_ = sync_status;
|
restore_status_info.sync_status_ = sync_status;
|
||||||
}
|
}
|
||||||
} else if (!error_exist) {
|
} else if (restore_to_end_unlock_()) {
|
||||||
|
restore_status_info.sync_status_ = RestoreSyncStatus::RESTORE_SYNC_SUSPEND;
|
||||||
|
CLOG_LOG(TRACE, "restore suspend", K(error_exist), K(restore_status_info.sync_status_));
|
||||||
|
} else {
|
||||||
restore_status_info.sync_status_ = RestoreSyncStatus::RESTORE_SYNC_NORMAL;
|
restore_status_info.sync_status_ = RestoreSyncStatus::RESTORE_SYNC_NORMAL;
|
||||||
CLOG_LOG(TRACE, "error is not exist, restore sync is normal", K(error_exist), K(restore_status_info.sync_status_));
|
CLOG_LOG(TRACE, "error is not exist, restore sync is normal", K(error_exist), K(restore_status_info.sync_status_));
|
||||||
}
|
}
|
||||||
@ -892,22 +932,44 @@ int ObLogRestoreHandler::get_ls_restore_status_info(RestoreStatusInfo &restore_s
|
|||||||
|
|
||||||
int ObLogRestoreHandler::get_err_code_and_message_(int ret_code,
|
int ObLogRestoreHandler::get_err_code_and_message_(int ret_code,
|
||||||
ObLogRestoreErrorContext::ErrorType error_type,
|
ObLogRestoreErrorContext::ErrorType error_type,
|
||||||
RestoreSyncStatus &sync_status,
|
RestoreSyncStatus &sync_status)
|
||||||
ObSqlString &comment)
|
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_ERR_OUT_OF_LOWER_BOUND == ret_code) {
|
|
||||||
sync_status = RestoreSyncStatus::RESTORE_SYNC_STANDBY_LOG_NOT_MATCH;
|
// RESTORE_SYNC_SOURCE_HAS_A_GAP
|
||||||
} else if (OB_ERR_UNEXPECTED == ret_code && error_type == ObLogRestoreErrorContext::ErrorType::SUBMIT_LOG) {
|
if ((OB_ERR_OUT_OF_LOWER_BOUND == ret_code
|
||||||
|
|| OB_ARCHIVE_ROUND_NOT_CONTINUOUS == ret_code
|
||||||
|
|| OB_ARCHIVE_LOG_RECYCLED == ret_code)
|
||||||
|
&& ObLogRestoreErrorContext::ErrorType::FETCH_LOG == error_type) {
|
||||||
sync_status = RestoreSyncStatus::RESTORE_SYNC_SOURCE_HAS_A_GAP;
|
sync_status = RestoreSyncStatus::RESTORE_SYNC_SOURCE_HAS_A_GAP;
|
||||||
} else if (-ER_CONNECT_FAILED == ret_code && error_type == ObLogRestoreErrorContext::ErrorType::FETCH_LOG) {
|
}
|
||||||
|
// RESTORE_SYNC_SUBMIT_LOG_NOT_MATCH
|
||||||
|
else if (OB_ERR_UNEXPECTED == ret_code && ObLogRestoreErrorContext::ErrorType::SUBMIT_LOG == error_type) {
|
||||||
|
sync_status = RestoreSyncStatus::RESTORE_SYNC_SUBMIT_LOG_NOT_MATCH;
|
||||||
|
}
|
||||||
|
// RESTORE_SYNC_FETCH_LOG_NOT_MATCH
|
||||||
|
else if ((OB_INVALID_DATA == ret_code || OB_CHECKSUM_ERROR == ret_code)
|
||||||
|
&& ObLogRestoreErrorContext::ErrorType::FETCH_LOG == error_type) {
|
||||||
|
sync_status = RestoreSyncStatus::RESTORE_SYNC_FETCH_LOG_NOT_MATCH;
|
||||||
|
}
|
||||||
|
// RESTORE_SYNC_CHECK_NETWORK
|
||||||
|
else if (-ER_CONNECT_FAILED == ret_code && ObLogRestoreErrorContext::ErrorType::FETCH_LOG == error_type) {
|
||||||
sync_status = RestoreSyncStatus::RESTORE_SYNC_CHECK_NETWORK;
|
sync_status = RestoreSyncStatus::RESTORE_SYNC_CHECK_NETWORK;
|
||||||
} else if (-ER_ACCESS_DENIED_ERROR == ret_code && error_type == ObLogRestoreErrorContext::ErrorType::FETCH_LOG) {
|
}
|
||||||
|
// RESTORE_SYNC_CHECK_USER_OR_PASSWORD
|
||||||
|
else if ((-ER_ACCESS_DENIED_ERROR == ret_code || -ER_TABLEACCESS_DENIED_ERROR == ret_code)
|
||||||
|
&& ObLogRestoreErrorContext::ErrorType::FETCH_LOG == error_type) {
|
||||||
sync_status = RestoreSyncStatus::RESTORE_SYNC_CHECK_USER_OR_PASSWORD;
|
sync_status = RestoreSyncStatus::RESTORE_SYNC_CHECK_USER_OR_PASSWORD;
|
||||||
} else {
|
}
|
||||||
|
// RESTORE_SYNC_FETCH_LOG_TIME_OUT
|
||||||
|
else if (OB_TIMEOUT == ret_code && ObLogRestoreErrorContext::ErrorType::FETCH_LOG == error_type) {
|
||||||
|
sync_status = RestoreSyncStatus::RESTORE_SYNC_FETCH_LOG_TIME_OUT;
|
||||||
|
}
|
||||||
|
// RESTORE_SYNC_NOT_AVAILABLE
|
||||||
|
else if (OB_SUCCESS != ret_code) {
|
||||||
sync_status = RestoreSyncStatus::RESTORE_SYNC_NOT_AVAILABLE;
|
sync_status = RestoreSyncStatus::RESTORE_SYNC_NOT_AVAILABLE;
|
||||||
}
|
}
|
||||||
CLOG_LOG(TRACE, "get err code and message succ", K(sync_status), K(comment));
|
CLOG_LOG(TRACE, "get error code and message succ", K(sync_status));
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -79,10 +79,13 @@ enum class RestoreSyncStatus {
|
|||||||
INVALID_RESTORE_SYNC_STATUS = 0,
|
INVALID_RESTORE_SYNC_STATUS = 0,
|
||||||
RESTORE_SYNC_NORMAL = 1,
|
RESTORE_SYNC_NORMAL = 1,
|
||||||
RESTORE_SYNC_SOURCE_HAS_A_GAP = 2,
|
RESTORE_SYNC_SOURCE_HAS_A_GAP = 2,
|
||||||
RESTORE_SYNC_STANDBY_LOG_NOT_MATCH = 3,
|
RESTORE_SYNC_SUBMIT_LOG_NOT_MATCH = 3,
|
||||||
RESTORE_SYNC_CHECK_USER_OR_PASSWORD = 4,
|
RESTORE_SYNC_FETCH_LOG_NOT_MATCH = 4,
|
||||||
RESTORE_SYNC_CHECK_NETWORK = 5,
|
RESTORE_SYNC_CHECK_USER_OR_PASSWORD = 5,
|
||||||
RESTORE_SYNC_NOT_AVAILABLE = 6,
|
RESTORE_SYNC_CHECK_NETWORK = 6,
|
||||||
|
RESTORE_SYNC_FETCH_LOG_TIME_OUT = 7,
|
||||||
|
RESTORE_SYNC_SUSPEND = 8,
|
||||||
|
RESTORE_SYNC_NOT_AVAILABLE = 9,
|
||||||
MAX_RESTORE_SYNC_STATUS
|
MAX_RESTORE_SYNC_STATUS
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -93,12 +96,17 @@ inline int restore_sync_status_to_string(const RestoreSyncStatus status, char *s
|
|||||||
strncpy(str_buf_, "NORMAL", str_len);
|
strncpy(str_buf_, "NORMAL", str_len);
|
||||||
} else if (RestoreSyncStatus::RESTORE_SYNC_SOURCE_HAS_A_GAP == status) {
|
} else if (RestoreSyncStatus::RESTORE_SYNC_SOURCE_HAS_A_GAP == status) {
|
||||||
strncpy(str_buf_, "SOURCE HAS A GAP", str_len);
|
strncpy(str_buf_, "SOURCE HAS A GAP", str_len);
|
||||||
} else if (RestoreSyncStatus::RESTORE_SYNC_STANDBY_LOG_NOT_MATCH == status) {
|
} else if (RestoreSyncStatus::RESTORE_SYNC_SUBMIT_LOG_NOT_MATCH == status
|
||||||
|
|| RestoreSyncStatus::RESTORE_SYNC_FETCH_LOG_NOT_MATCH == status) {
|
||||||
strncpy(str_buf_, "STANDBY LOG NOT MATCH", str_len);
|
strncpy(str_buf_, "STANDBY LOG NOT MATCH", str_len);
|
||||||
} else if (RestoreSyncStatus::RESTORE_SYNC_CHECK_USER_OR_PASSWORD == status) {
|
} else if (RestoreSyncStatus::RESTORE_SYNC_CHECK_USER_OR_PASSWORD == status) {
|
||||||
strncpy(str_buf_, "CHECK USER OR PASSWORD", str_len);
|
strncpy(str_buf_, "CHECK USER OR PASSWORD", str_len);
|
||||||
} else if (RestoreSyncStatus::RESTORE_SYNC_CHECK_NETWORK == status) {
|
} else if (RestoreSyncStatus::RESTORE_SYNC_CHECK_NETWORK == status) {
|
||||||
strncpy(str_buf_, "CHECK NETWORK", str_len);
|
strncpy(str_buf_, "CHECK NETWORK", str_len);
|
||||||
|
} else if (RestoreSyncStatus::RESTORE_SYNC_FETCH_LOG_TIME_OUT == status) {
|
||||||
|
strncpy(str_buf_, "FETCH LOG TIMEOUT", str_len);
|
||||||
|
} else if (RestoreSyncStatus::RESTORE_SYNC_SUSPEND == status) {
|
||||||
|
strncpy(str_buf_, "RESTORE SUSPEND", str_len);
|
||||||
} else if (RestoreSyncStatus::RESTORE_SYNC_NOT_AVAILABLE == status) {
|
} else if (RestoreSyncStatus::RESTORE_SYNC_NOT_AVAILABLE == status) {
|
||||||
strncpy(str_buf_, "NOT AVAILABLE", str_len);
|
strncpy(str_buf_, "NOT AVAILABLE", str_len);
|
||||||
} else {
|
} else {
|
||||||
@ -241,7 +249,9 @@ public:
|
|||||||
// other code unexpected ret_code
|
// other code unexpected ret_code
|
||||||
int get_next_sorted_task(ObFetchLogTask *&task);
|
int get_next_sorted_task(ObFetchLogTask *&task);
|
||||||
bool restore_to_end() const;
|
bool restore_to_end() const;
|
||||||
|
int get_restore_error_unlock_(share::ObTaskId &trace_id, int &ret_code, bool &error_exist);
|
||||||
int diagnose(RestoreDiagnoseInfo &diagnose_info) const;
|
int diagnose(RestoreDiagnoseInfo &diagnose_info) const;
|
||||||
|
int refresh_error_context();
|
||||||
int get_ls_restore_status_info(RestoreStatusInfo &restore_status_info);
|
int get_ls_restore_status_info(RestoreStatusInfo &restore_status_info);
|
||||||
TO_STRING_KV(K_(is_inited), K_(is_in_stop_state), K_(id), K_(proposal_id), K_(role), KP_(parent), K_(context), K_(restore_context));
|
TO_STRING_KV(K_(is_inited), K_(is_in_stop_state), K_(id), K_(proposal_id), K_(role), KP_(parent), K_(context), K_(restore_context));
|
||||||
|
|
||||||
@ -256,7 +266,7 @@ private:
|
|||||||
int check_restore_to_newest_from_archive_(ObLogArchivePieceContext &piece_context,
|
int check_restore_to_newest_from_archive_(ObLogArchivePieceContext &piece_context,
|
||||||
const palf::LSN &end_lsn, const share::SCN &end_scn, share::SCN &archive_scn);
|
const palf::LSN &end_lsn, const share::SCN &end_scn, share::SCN &archive_scn);
|
||||||
bool restore_to_end_unlock_() const;
|
bool restore_to_end_unlock_() const;
|
||||||
int get_err_code_and_message_(int ret_code, ObLogRestoreErrorContext::ErrorType error_type, RestoreSyncStatus &err_type, ObSqlString &comment);
|
int get_err_code_and_message_(int ret_code, ObLogRestoreErrorContext::ErrorType error_type, RestoreSyncStatus &err_type);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
ObRemoteLogParent *parent_;
|
ObRemoteLogParent *parent_;
|
||||||
|
|||||||
@ -201,8 +201,10 @@ int ObLogRestoreNetDriver::scan_ls(const share::ObLogRestoreSourceType &type)
|
|||||||
}
|
}
|
||||||
} else if (OB_FAIL(check_ls_stale_(id, proposal_id, is_stale))) {
|
} else if (OB_FAIL(check_ls_stale_(id, proposal_id, is_stale))) {
|
||||||
LOG_WARN("check_ls_stale_ failed", K(id));
|
LOG_WARN("check_ls_stale_ failed", K(id));
|
||||||
} else if (!is_stale) {
|
} else if (! is_stale) {
|
||||||
// do nothing
|
if (OB_FAIL(refresh_error_context_(id))) {
|
||||||
|
LOG_WARN("refresh error context failed", K(id));
|
||||||
|
}
|
||||||
} else if (OB_FAIL(fetcher_->remove_ls(id)) && OB_ENTRY_NOT_EXIST != ret) {
|
} else if (OB_FAIL(fetcher_->remove_ls(id)) && OB_ENTRY_NOT_EXIST != ret) {
|
||||||
LOG_WARN("remove ls failed", K(id), K(is_stale));
|
LOG_WARN("remove ls failed", K(id), K(is_stale));
|
||||||
} else {
|
} else {
|
||||||
@ -557,6 +559,18 @@ int ObLogRestoreNetDriver::get_ls_count_in_fetcher_(int64_t &count)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObLogRestoreNetDriver::refresh_error_context_(const share::ObLSID &ls_id)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObRemoteFetchContext context;
|
||||||
|
GET_RESTORE_HANDLER_CTX(ls_id) {
|
||||||
|
if (OB_FAIL(restore_handler->refresh_error_context())) {
|
||||||
|
LOG_WARN("refresh error failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
int ObLogRestoreNetDriver::set_restore_log_upper_limit()
|
int ObLogRestoreNetDriver::set_restore_log_upper_limit()
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
@ -609,6 +623,7 @@ void ObLogRestoreNetDriver::LogErrHandler::destroy()
|
|||||||
void ObLogRestoreNetDriver::LogErrHandler::handle_error(const share::ObLSID &ls_id,
|
void ObLogRestoreNetDriver::LogErrHandler::handle_error(const share::ObLSID &ls_id,
|
||||||
const ErrType &err_type,
|
const ErrType &err_type,
|
||||||
share::ObTaskId &trace_id,
|
share::ObTaskId &trace_id,
|
||||||
|
const palf::LSN &lsn,
|
||||||
const int err_no,
|
const int err_no,
|
||||||
const char *fmt, ...)
|
const char *fmt, ...)
|
||||||
{
|
{
|
||||||
@ -620,7 +635,19 @@ void ObLogRestoreNetDriver::LogErrHandler::handle_error(const share::ObLSID &ls_
|
|||||||
LOG_WARN("LogErrHandler not init", K(inited_));
|
LOG_WARN("LogErrHandler not init", K(inited_));
|
||||||
} else {
|
} else {
|
||||||
GET_RESTORE_HANDLER_CTX(ls_id) {
|
GET_RESTORE_HANDLER_CTX(ls_id) {
|
||||||
// restore_handler->mark_error(trace_id, err_no, restore_err_type);
|
if (palf::LSN(palf::LOG_INVALID_LSN_VAL) == lsn ) {
|
||||||
|
palf::LSN tmp_lsn = palf::LSN(palf::PALF_INITIAL_LSN_VAL);
|
||||||
|
palf::PalfHandleGuard palf_handle_guard;
|
||||||
|
if (OB_FAIL(OB_FAIL(MTL(ObLogService*)->open_palf(ls_id, palf_handle_guard)))) {
|
||||||
|
LOG_WARN("open palf failed", K(ls_id));
|
||||||
|
} else if (OB_FAIL(palf_handle_guard.get_end_lsn(tmp_lsn))) {
|
||||||
|
LOG_WARN("get end lsn failed", K(ls_id));
|
||||||
|
} else {
|
||||||
|
restore_handler->mark_error(trace_id, err_no, tmp_lsn, restore_err_type);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
restore_handler->mark_error(trace_id, err_no, lsn, restore_err_type);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -112,6 +112,7 @@ private:
|
|||||||
|
|
||||||
int check_ls_stale_(const share::ObLSID &id, const int64_t proposal_id, bool &is_stale);
|
int check_ls_stale_(const share::ObLSID &id, const int64_t proposal_id, bool &is_stale);
|
||||||
int get_ls_count_in_fetcher_(int64_t &count);
|
int get_ls_count_in_fetcher_(int64_t &count);
|
||||||
|
int refresh_error_context_(const share::ObLSID &ls_id);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
class LogErrHandler : public logfetcher::IObLogErrHandler
|
class LogErrHandler : public logfetcher::IObLogErrHandler
|
||||||
@ -125,6 +126,7 @@ private:
|
|||||||
virtual void handle_error(const share::ObLSID &ls_id,
|
virtual void handle_error(const share::ObLSID &ls_id,
|
||||||
const ErrType &err_type,
|
const ErrType &err_type,
|
||||||
share::ObTaskId &trace_id,
|
share::ObTaskId &trace_id,
|
||||||
|
const palf::LSN &lsn,
|
||||||
const int err_no,
|
const int err_no,
|
||||||
const char *fmt, ...) override;
|
const char *fmt, ...) override;
|
||||||
private:
|
private:
|
||||||
|
|||||||
@ -146,7 +146,6 @@ int ObRestoreLogFunction::wait_restore_quota_(const int64_t size, bool &done, vo
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO 接入controller
|
|
||||||
int ObRestoreLogFunction::process_(const share::ObLSID &id,
|
int ObRestoreLogFunction::process_(const share::ObLSID &id,
|
||||||
const int64_t proposal_id,
|
const int64_t proposal_id,
|
||||||
const palf::LSN &lsn,
|
const palf::LSN &lsn,
|
||||||
|
|||||||
Reference in New Issue
Block a user