[Physical Standby] Support for region-specific to fetch log

This commit is contained in:
obdev 2023-06-15 12:42:16 +00:00 committed by ob-robot
parent 4260921c59
commit f6664ffdf8
23 changed files with 602 additions and 105 deletions

View File

@ -75,6 +75,7 @@ ob_set_subtarget(ob_logservice leader_coordinator
ob_set_subtarget(ob_logservice logrouteservice
logrouteservice/ob_all_server_info.cpp
logrouteservice/ob_all_zone_info.cpp
logrouteservice/ob_all_units_info.cpp
logrouteservice/ob_external_server_blacklist.cpp
logrouteservice/ob_log_all_svr_cache.cpp
logrouteservice/ob_log_route_key.cpp

View File

@ -127,7 +127,8 @@ int ObLogFetcher::init(
cfg.blacklist_survival_time_penalty_period_min,
cfg.blacklist_history_overdue_time_min,
cfg.blacklist_history_clear_interval_min,
is_tenant_mode))) {
is_tenant_mode,
OB_INVALID_TENANT_ID))) {
LOG_ERROR("ObLogRouterService init failer", KR(ret), K(prefer_region), K(cluster_id));
} else if (OB_FAIL(progress_controller_.init(cfg.ls_count_upper_limit))) {
LOG_ERROR("init progress controller fail", KR(ret));
@ -546,8 +547,8 @@ void ObLogFetcher::configure(const ObLogConfig &cfg)
K(blacklist_survival_time_penalty_period_min),
K(blacklist_history_overdue_time_min),
K(blacklist_history_clear_interval_min));
} else if (OB_FAIL(log_route_service_.update_assign_region(cfg.region.str()))) {
LOG_ERROR("update_assign_region failed", KR(ret), "region", cfg.region);
} else if (OB_FAIL(log_route_service_.update_preferred_upstream_log_region(cfg.region.str()))) {
LOG_ERROR("update_preferred_upsteam_log_region failed", KR(ret), "region", cfg.region);
} else if (OB_FAIL(log_route_service_.update_cache_update_interval(
all_server_cache_update_interval_sec,
all_zone_cache_update_interval_sec))) {

View File

@ -195,7 +195,7 @@ public:
T_DEF_INT_INFT(blacklist_history_clear_interval_min, OB_CLUSTER_PARAMETER, 20, 10, "blacklist history clear interval in minute");
// Check the need for active cut-off cycles, in minutes
T_DEF_INT_INFT(check_switch_server_interval_min, OB_CLUSTER_PARAMETER, 30, 1, "check switch server interval in minute");
T_DEF_INT_INFT(check_switch_server_interval_min, OB_CLUSTER_PARAMETER, 10, 1, "check switch server interval in minute");
// Print the number of LSs with the slowest progress of the Fetcher module
T_DEF_INT_INFT(print_fetcher_slowest_ls_num, OB_CLUSTER_PARAMETER, 10, 1, "print fetcher slowest ls num");

View File

@ -98,11 +98,11 @@ int ObLogFetcher::init(
cfg_ = &cfg;
// Before the LogFetcher module is initialized, the following configuration items need to be loaded
configure(cfg);
const common::ObRegion prefer_region(cfg.region.str());
const common::ObRegion region(cfg.region.str());
if (is_integrated_fetching_mode(fetching_mode) && OB_FAIL(log_route_service_.init(
proxy,
prefer_region,
region,
cluster_id,
false/*is_across_cluster*/,
err_handler,
@ -115,8 +115,9 @@ int ObLogFetcher::init(
cfg.blacklist_survival_time_penalty_period_min,
cfg.blacklist_history_overdue_time_min,
cfg.blacklist_history_clear_interval_min,
true/*is_tenant_mode*/))) {
LOG_ERROR("ObLogRouterService init failer", KR(ret), K(prefer_region), K(cluster_id));
true/*is_tenant_mode*/,
source_tenant_id))) {
LOG_ERROR("ObLogRouterService init failer", KR(ret), K(region), K(cluster_id), K(source_tenant_id));
} else if (OB_FAIL(progress_controller_.init(cfg.ls_count_upper_limit))) {
LOG_ERROR("init progress controller fail", KR(ret));
} else if (OB_FAIL(large_buffer_pool_.init("ObLogFetcher", 1L * 1024 * 1024 * 1024))) {
@ -239,7 +240,7 @@ int ObLogFetcher::start()
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("not inited", KR(ret));
LOG_ERROR("ObLogFetcher has not been inited", KR(ret));
} else if (OB_UNLIKELY(! stop_flag_)) {
LOG_ERROR("fetcher has been started", K(stop_flag_));
ret = OB_INIT_TWICE;
@ -335,6 +336,34 @@ void ObLogFetcher::mark_stop_flag()
}
}
int ObLogFetcher::update_preferred_upstream_log_region(const common::ObRegion &region)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("ObLogFetcher has not been inited", KR(ret));
} else if (OB_FAIL(log_route_service_.update_preferred_upstream_log_region(region))) {
LOG_WARN("ObLogRouteService update_preferred_upstream_log_region failed", KR(ret), K(region));
}
return ret;
}
int ObLogFetcher::get_preferred_upstream_log_region(common::ObRegion &region)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("ObLogFetcher has not been inited", KR(ret));
} else if (OB_FAIL(log_route_service_.get_preferred_upstream_log_region(region))) {
LOG_WARN("ObLogRouteService get_preferred_upstream_log_region failed", KR(ret), K(region));
}
return ret;
}
int ObLogFetcher::add_ls(
const share::ObLSID &ls_id,
const ObLogFetcherStartParameters &start_parameters)
@ -354,7 +383,7 @@ int ObLogFetcher::add_ls(
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("not inited", KR(ret));
LOG_ERROR("ObLogFetcher has not been inited", KR(ret));
}
// Requires a valid start-up timestamp
else if (OB_UNLIKELY(start_tstamp_ns <= -1)) {
@ -395,7 +424,7 @@ int ObLogFetcher::recycle_ls(const share::ObLSID &ls_id)
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("Fetcher not inited", KR(ret));
LOG_ERROR("ObLogFetcher has not been inited", KR(ret));
} else if (OB_FAIL(ls_fetch_mgr_.recycle_ls(tls_id))) {
if (OB_ENTRY_NOT_EXIST == ret) {
LOG_INFO("ls has been recycled in fetcher", K(tls_id));
@ -418,7 +447,7 @@ int ObLogFetcher::remove_ls(const share::ObLSID &ls_id)
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("Fetcher not inited", KR(ret));
LOG_ERROR("ObLogFetcher has not been inited", KR(ret));
} else if (OB_FAIL(recycle_ls(ls_id))) {
LOG_ERROR("recycle_ls failed", KR(ret), K(ls_id));
} else {
@ -452,7 +481,7 @@ int ObLogFetcher::remove_ls_physically(const share::ObLSID &ls_id)
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("LogFetcher is not inited", KR(ret));
LOG_ERROR("ObLogFetcher has not been inited", KR(ret));
} else if (OB_FAIL(fs_container_mgr_.remove_fsc(removed_tls_id))) {
LOG_ERROR("fs_container_mgr_ remove_fsc failed", KR(ret), K(removed_tls_id));
} else if (OB_FAIL(ls_fetch_mgr_.remove_ls(removed_tls_id))) {
@ -470,7 +499,7 @@ int ObLogFetcher::get_all_ls(ObIArray<share::ObLSID> &ls_ids)
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("LogFetcher is not inited", KR(ret));
LOG_ERROR("ObLogFetcher has not been inited", KR(ret));
} else {
FetchCtxMapLSGetter ls_getter(ls_ids);
@ -495,7 +524,7 @@ int ObLogFetcher::get_ls_proposal_id(const share::ObLSID &ls_id, int64_t &propos
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("LogFetcher is not inited", KR(ret));
LOG_ERROR("ObLogFetcher has not been inited", KR(ret));
} else if (OB_UNLIKELY(!ls_id.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ls_id));
@ -515,7 +544,7 @@ int ObLogFetcher::update_fetching_log_upper_limit(const share::SCN &upper_limit_
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("LogFetcher is not inited", KR(ret));
LOG_ERROR("ObLogFetcher has not been inited", KR(ret));
} else if (OB_UNLIKELY(!upper_limit_scn.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(upper_limit_scn));
@ -533,7 +562,7 @@ int ObLogFetcher::update_compressor_type(const common::ObCompressorType &compres
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("LogFetcher is not inited", KR(ret));
LOG_ERROR("ObLogFetcher has not been inited", KR(ret));
} else if (OB_FAIL(rpc_.update_compressor_type(compressor_type))) {
LOG_WARN("ObLogRpc update_compressor_type failed", K(compressor_type));
}
@ -547,7 +576,7 @@ int ObLogFetcher::get_progress_info(ProgressInfo &progress_info)
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("LogFetcher is not inited", KR(ret));
LOG_ERROR("ObLogFetcher has not been inited", KR(ret));
} else {
if (OB_FAIL(ls_fetch_mgr_.for_each_ls(progress_info))) {
LOG_ERROR("for each part fetch ctx fail", KR(ret));
@ -563,7 +592,7 @@ int ObLogFetcher::wait_for_all_ls_to_be_removed(const int64_t timeout)
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_ERROR("Fetcher not inited", KR(ret));
LOG_ERROR("ObLogFetcher has not been inited", KR(ret));
} else {
const int64_t start_time = get_timestamp();
@ -619,8 +648,8 @@ void ObLogFetcher::configure(const ObLogFetcherConfig &cfg)
K(blacklist_survival_time_penalty_period_min),
K(blacklist_history_overdue_time_min),
K(blacklist_history_clear_interval_min));
} else if (OB_FAIL(log_route_service_.update_assign_region(cfg.region.str()))) {
LOG_ERROR("update_assign_region failed", KR(ret), "region", cfg.region);
} else if (OB_FAIL(log_route_service_.update_preferred_upstream_log_region(cfg.region.str()))) {
LOG_ERROR("update_preferred_upstream_log_region failed", KR(ret), "region", cfg.region);
} else if (OB_FAIL(log_route_service_.update_cache_update_interval(
all_server_cache_update_interval_sec,
all_zone_cache_update_interval_sec))) {

View File

@ -68,6 +68,12 @@ public:
// Get Tenant ID
virtual uint64_t get_source_tenant_id() const = 0;
// Update assign region to fetch log
virtual int update_preferred_upstream_log_region(const common::ObRegion &region) = 0;
// Get assign region
virtual int get_preferred_upstream_log_region(common::ObRegion &region) = 0;
// Add the log stream
//
// @param [in] ls_id LS ID
@ -206,6 +212,9 @@ public:
virtual int64_t get_cluster_id() const { return cluster_id_; }
virtual uint64_t get_source_tenant_id() const { return source_tenant_id_; }
virtual int update_preferred_upstream_log_region(const common::ObRegion &region);
virtual int get_preferred_upstream_log_region(common::ObRegion &region);
virtual int add_ls(
const share::ObLSID &ls_id,
const ObLogFetcherStartParameters &start_parameters);

View File

@ -51,10 +51,6 @@ struct KickOutInfo
KickOutReason kick_out_reason_;
KickOutInfo() : tls_id_(), kick_out_reason_(NONE) {}
explicit KickOutInfo(const logservice::TenantLSID &tls_id) :
tls_id_(tls_id),
kick_out_reason_(NONE)
{}
KickOutInfo(const logservice::TenantLSID &tls_id, KickOutReason kick_out_reason) :
tls_id_(tls_id),
kick_out_reason_(kick_out_reason)

View File

@ -956,11 +956,10 @@ bool LSFetchCtx::need_switch_server(const common::ObAddr &cur_svr)
if (OB_FAIL(get_log_route_service_(log_route_service))) {
LOG_ERROR("get_log_route_service_ failed", KR(ret));
} else if (OB_FAIL(log_route_service->need_switch_server(tls_id_.get_tenant_id(), tls_id_.get_ls_id(),
next_lsn, cur_svr))) {
LOG_ERROR("ObLogRouteService need_switch_server failed", KR(ret), K(tls_id_), K(next_lsn),
K(cur_svr));
} else {}
} else {
bool_ret = log_route_service->need_switch_server(tls_id_.get_tenant_id(), tls_id_.get_ls_id(),
next_lsn, cur_svr);
}
return bool_ret;
}

View File

@ -730,7 +730,7 @@ int FetchStream::handle_fetch_log_task_(volatile bool &stop_flag)
bool rpc_is_flying = false;
bool is_stream_valid = true;
FetchLogARpcResult *result = NULL;
KickOutInfo kickout_info(ls_fetch_ctx_->get_tls_id());
KickOutInfo kickout_info;
// Whether the log stream is taken over by RPC, default is false
bool stream_been_taken_over_by_rpc = false;
@ -1241,7 +1241,7 @@ int FetchStream::handle_fetch_log_error_(
ls_fetch_ctx_->get_next_lsn(),
rcode.rcode_,
"%s");
LOG_ERROR("fetch log fail on rpc, need_switch_server", K(svr_), K(rcode), "fetch_stream", this);
LOG_ERROR("fetch log fail on rpc, need_switch_server", K(svr_), K(rcode), "fetch_stream", this);
}
}
// server return error
@ -1260,7 +1260,6 @@ int FetchStream::handle_fetch_log_error_(
"svr_err", resp.get_err(), "svr_debug_err", resp.get_debug_err(),
K(rcode), K(resp));
}
} else {
need_kick_out = false;
}
@ -1500,7 +1499,7 @@ int FetchStream::update_fetch_task_state_(KickOutInfo &kick_out_info,
LOG_ERROR("ls_fetch_ctx_ is NULL", KR(ret), K(ls_fetch_ctx_));
} else {
LSFetchCtx *task = ls_fetch_ctx_;
bool need_check_switch_server = check_need_switch_server_();
const bool need_check_switch_server = check_need_switch_server_();
// If the task is deleted, it is kicked out directly
if (OB_UNLIKELY(task->is_discarded())) {
@ -1522,7 +1521,7 @@ int FetchStream::update_fetch_task_state_(KickOutInfo &kick_out_info,
task->update_touch_tstamp_if_progress_beyond_upper_limit(upper_limit_);
}
// Update each partition's progress to the global
// Update each LS's progress to the global
if (OB_SUCCESS == ret && OB_FAIL(publish_progress_(*task))) {
LOG_ERROR("update progress fail", KR(ret), K(task), KPC(task));
}
@ -1637,10 +1636,13 @@ int FetchStream::check_fetch_timeout_(LSFetchCtx &task, KickOutInfo &kick_out_in
int FetchStream::check_switch_server_(LSFetchCtx &task, KickOutInfo &kick_out_info)
{
int ret = OB_SUCCESS;
const char *branch_str = nullptr;
if (exist_(kick_out_info, task.get_tls_id())) {
// Do not check for LS already located in kick_out_info
branch_str = "exist_kick_out_info";
} else if (task.need_switch_server(svr_)) {
branch_str = "need_switch_server";
LOG_TRACE("exist higher priority server, need switch server", KR(ret), "key", task.get_tls_id(),
K_(svr));
// If need to switch the stream, add it to the kick out collection
@ -1653,8 +1655,10 @@ int FetchStream::check_switch_server_(LSFetchCtx &task, KickOutInfo &kick_out_in
}
} else {
// do nothing
branch_str = "no_need_switch_server";
}
LOG_TRACE("check_switch_server", "ls_id", ls_fetch_ctx_->get_tls_id(), K(branch_str));
return ret;
}

View File

@ -0,0 +1,70 @@
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX OBLOG
#include "share/ob_errno.h" // KR
#include "ob_all_units_info.h"
using namespace oceanbase::common;
namespace oceanbase
{
namespace logservice
{
int ObUnitsRecord::init(
const common::ObAddr &server,
ObString &zone,
common::ObZoneType &zone_type,
ObString &region)
{
int ret = OB_SUCCESS;
server_ = server;
zone_type_ = zone_type;
if (OB_FAIL(zone_.assign(zone))) {
LOG_ERROR("zone assign fail", KR(ret), K(zone));
} else if (OB_FAIL(region_.assign(region))) {
LOG_ERROR("zone assign fail", KR(ret), K(region));
} else {}
return ret;
}
void ObUnitsRecordInfo::reset()
{
cluster_id_ = 0;
units_record_array_.reset();
}
int ObUnitsRecordInfo::init(const int64_t cluster_id)
{
int ret = OB_SUCCESS;
cluster_id_ = cluster_id;
units_record_array_.reset();
return ret;
}
int ObUnitsRecordInfo::add(ObUnitsRecord &record)
{
int ret = OB_SUCCESS;
if (OB_FAIL(units_record_array_.push_back(record))) {
LOG_ERROR("units_record_array_ push_back failed", KR(ret), K(record));
}
return ret;
}
} // namespace logservice
} // namespace oceanbase

View File

@ -0,0 +1,79 @@
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_OB_UNITS_INFO_H_
#define OCEANBASE_OB_UNITS_INFO_H_
#include "lib/container/ob_se_array.h" // ObSEArray
#include "common/ob_zone.h" // ObZone
#include "common/ob_region.h" // ObRegin
#include "common/ob_zone_type.h" // ObZoneType
namespace oceanbase
{
namespace logservice
{
// Records in table GV$OB_UNITS
struct ObUnitsRecord
{
common::ObAddr server_;
common::ObZone zone_;
common::ObZoneType zone_type_;
common::ObRegion region_;
ObUnitsRecord() { reset(); }
void reset()
{
server_.reset();
zone_.reset();
zone_type_ = common::ZONE_TYPE_INVALID;
region_.reset();
}
int init(
const common::ObAddr &server,
ObString &zone,
common::ObZoneType &zone_type,
ObString &region);
TO_STRING_KV(K_(server), K_(zone), K_(zone_type), K_(region));
};
class ObUnitsRecordInfo
{
public:
static const int64_t ALL_SERVER_DEFAULT_RECORDS_NUM = 26;
typedef common::ObSEArray<ObUnitsRecord, ALL_SERVER_DEFAULT_RECORDS_NUM> ObUnitsRecordArray;
ObUnitsRecordInfo() { reset(); }
virtual ~ObUnitsRecordInfo() { reset(); }
int init(const int64_t cluster_id);
void reset();
inline int64_t get_cluster_id() { return cluster_id_; }
inline ObUnitsRecordArray &get_units_record_array() { return units_record_array_; }
int add(ObUnitsRecord &record);
TO_STRING_KV(K_(cluster_id), K_(units_record_array));
private:
int64_t cluster_id_;
ObUnitsRecordArray units_record_array_;
DISALLOW_COPY_AND_ASSIGN(ObUnitsRecordInfo);
};
} // namespace logservice
} // namespace oceanbase
#endif

View File

@ -37,7 +37,8 @@ struct AllZoneRecord
storage_type_ = share::ObZoneInfo::StorageType::STORAGE_TYPE_LOCAL;
}
int init(ObString &zone,
int init(
ObString &zone,
ObString &region);
void set_storage_type_by_str(common::ObString &storage_type_str) {

View File

@ -18,7 +18,7 @@
#include "lib/allocator/ob_mod_define.h" // ObModIds
#include "lib/utility/ob_macro_utils.h" // OB_ISNULL, ...
#include "lib/oblog/ob_log_module.h" // LOG_*
#include "logservice/common_util/ob_log_time_utils.h" //
#include "logservice/common_util/ob_log_time_utils.h"
using namespace oceanbase::common;
using namespace oceanbase::share;
@ -28,6 +28,8 @@ namespace oceanbase
namespace logservice
{
ObLogAllSvrCache::ObLogAllSvrCache() :
is_tenant_mode_(false),
tenant_id_(OB_INVALID_TENANT_ID),
systable_queryer_(NULL),
all_server_cache_update_interval_(0),
all_zone_cache_update_interval_(0),
@ -38,7 +40,8 @@ ObLogAllSvrCache::ObLogAllSvrCache() :
zone_need_update_(false),
zone_last_update_tstamp_(OB_INVALID_TIMESTAMP),
svr_map_(),
zone_map_()
zone_map_(),
units_map_()
{
}
@ -64,28 +67,45 @@ bool ObLogAllSvrCache::is_svr_avail(
bool bool_ret = false;
region_priority = REGION_PRIORITY_UNKNOWN;
SvrItem svr_item;
ZoneItem zone_item;
bool find_agent = false;
if (!svr.is_valid()) {
LOG_WARN("svr is not valid!", K(svr));
bool_ret = false;
} else if (OB_FAIL(get_svr_item_(svr, svr_item))) {
LOG_WARN("is_svr_avail: failed to get svr item", KR(ret), K(svr), K(svr_item));
bool_ret = false;
} else if (OB_FAIL(get_zone_item_(svr_item.zone_, zone_item))) {
LOG_ERROR("failed to get zone item", KR(ret), K(svr_item), K(zone_item));
bool_ret = false;
} else if (is_svr_serve_(svr_item, zone_item)) {
bool_ret = true;
// get region priority of server if server is available
region_priority = svr_item.region_priority_;
LOG_DEBUG("is svr avail", K(svr), K(region_priority), K(zone_item));
} else if (! is_tenant_mode_) {
SvrItem svr_item;
ZoneItem zone_item;
if (OB_FAIL(get_svr_item_(svr, svr_item))) {
LOG_WARN("is_svr_avail: failed to get svr item", KR(ret), K(svr), K(svr_item));
bool_ret = false;
} else if (OB_FAIL(get_zone_item_(svr_item.zone_, zone_item))) {
LOG_ERROR("failed to get zone item", KR(ret), K(svr_item), K(zone_item));
bool_ret = false;
} else if (is_svr_serve_(svr_item, zone_item)) {
bool_ret = true;
// get region priority of server if server is available
region_priority = svr_item.region_priority_;
LOG_DEBUG("is svr avail", K(svr), K(region_priority), K(zone_item));
} else {
bool_ret = false;
region_priority = REGION_PRIORITY_UNKNOWN;
LOG_DEBUG("svr not avail", K(svr), K(zone_item), K(svr_item));
}
} else {
bool_ret = false;
region_priority = REGION_PRIORITY_UNKNOWN;
LOG_DEBUG("svr not avail", K(svr), K(zone_item), K(svr_item));
UnitsRecordItem units_record_item;
if (OB_FAIL(get_units_record_item_(svr, units_record_item))) {
if (OB_ENTRY_NOT_EXIST != ret) {
bool_ret = false;
LOG_WARN("get_units_record_item_ failed", KR(ret), K(svr), K(units_record_item));
} else {
bool_ret = true;
}
} else if (is_svr_serve_(units_record_item)) {
bool_ret = true;
// get the region priority of the server if server is available
region_priority = units_record_item.region_priority_;
LOG_TRACE("is svr avail", K(svr), K(region_priority), K(units_record_item));
} else {}
}
return bool_ret;
@ -204,10 +224,20 @@ bool ObLogAllSvrCache::is_svr_serve_(const SvrItem &svr_item, const ZoneItem &zo
return bool_ret;
}
bool ObLogAllSvrCache::is_assign_region_(const common::ObRegion &region) const
bool ObLogAllSvrCache::is_svr_serve_(const UnitsRecordItem &units_record_item) const
{
bool bool_ret = false;
bool_ret = ObZoneType::ZONE_TYPE_ENCRYPTION != units_record_item.get_zone_type();
return bool_ret;
}
bool ObLogAllSvrCache::is_assign_region_(const common::ObRegion &region)
{
bool bool_ret = false;
ObByteLockGuard guard(region_lock_);
// ignore case
bool_ret = (0 == strncasecmp(assign_region_.ptr(),
region.ptr(),
@ -218,19 +248,32 @@ bool ObLogAllSvrCache::is_assign_region_(const common::ObRegion &region) const
int ObLogAllSvrCache::init(
ObLogSysTableQueryer &systable_queryer,
const bool is_tenant_mode,
const uint64_t tenant_id,
const common::ObRegion &prefer_region,
const int64_t all_server_cache_update_interval_sec,
const int64_t all_zone_cache_update_interval_sec)
{
int ret = OB_SUCCESS;
if (OB_FAIL(svr_map_.init(ObModIds::OB_LOG_ALL_SERVER_CACHE))) {
LOG_ERROR("init svr map fail", KR(ret));
} else if (OB_FAIL(zone_map_.init(ObModIds::OB_LOG_ALL_SERVER_CACHE))) {
LOG_ERROR("init zone map fail", KR(ret));
if (! is_tenant_mode) {
if (OB_FAIL(svr_map_.init(ObModIds::OB_LOG_ALL_SERVER_CACHE))) {
LOG_ERROR("init svr map fail", KR(ret));
} else if (OB_FAIL(zone_map_.init(ObModIds::OB_LOG_ALL_SERVER_CACHE))) {
LOG_ERROR("init zone map fail", KR(ret));
}
} else {
if (OB_FAIL(units_map_.init("UnitCache"))) {
LOG_ERROR("init units map fail", KR(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(assign_region_.assign(prefer_region))) {
LOG_ERROR("assign_region assign fail", KR(ret), K(assign_region_), K(prefer_region));
} else {
is_tenant_mode_ = is_tenant_mode;
tenant_id_ = tenant_id;
systable_queryer_ = &systable_queryer;
all_server_cache_update_interval_ = all_server_cache_update_interval_sec * _SEC_;
all_zone_cache_update_interval_ = all_zone_cache_update_interval_sec * _SEC_;
@ -239,7 +282,7 @@ int ObLogAllSvrCache::init(
zone_need_update_ = false;
zone_last_update_tstamp_ = OB_INVALID_TIMESTAMP;
LOG_INFO("ObLogAllSvrCache init succ", K(prefer_region));
LOG_INFO("ObLogAllSvrCache init succ", K(prefer_region), K(is_tenant_mode));
}
return ret;
@ -247,6 +290,7 @@ int ObLogAllSvrCache::init(
void ObLogAllSvrCache::destroy()
{
LOG_INFO("destroy all svr cache begin");
systable_queryer_ = NULL;
all_server_cache_update_interval_ = 0;
all_zone_cache_update_interval_ = 0;
@ -254,37 +298,55 @@ void ObLogAllSvrCache::destroy()
cur_zone_version_ = 0;
zone_need_update_ = false;
zone_last_update_tstamp_ = OB_INVALID_TIMESTAMP;
(void)svr_map_.destroy();
(void)zone_map_.destroy();
if (! is_tenant_mode_) {
(void)svr_map_.destroy();
(void)zone_map_.destroy();
} else {
(void)units_map_.destroy();
}
is_tenant_mode_ = false;
tenant_id_ = OB_INVALID_TENANT_ID;
LOG_INFO("destroy all svr cache succ");
LOG_INFO("destroy all svr cache success");
}
void ObLogAllSvrCache::query_and_update()
{
int ret = OB_SUCCESS;
if (need_update_zone_()) {
if (OB_FAIL(update_zone_cache_())) {
LOG_ERROR("update zone cache error", KR(ret));
} else if (OB_FAIL(purge_stale_zone_records_())) {
LOG_ERROR("purge stale records fail", KR(ret));
} else {
// do nothing
}
}
if (OB_SUCC(ret)) {
int64_t all_svr_cache_update_interval = ATOMIC_LOAD(&all_server_cache_update_interval_);
if (REACH_TIME_INTERVAL_THREAD_LOCAL(all_svr_cache_update_interval)) {
if (OB_FAIL(update_server_cache_())) {
LOG_ERROR("update server cache error", KR(ret));
} else if (OB_FAIL(purge_stale_records_())) {
if (! is_tenant_mode_) {
if (need_update_zone_()) {
if (OB_FAIL(update_zone_cache_())) {
LOG_ERROR("update zone cache error", KR(ret));
} else if (OB_FAIL(purge_stale_zone_records_())) {
LOG_ERROR("purge stale records fail", KR(ret));
} else {
// do nothing
}
}
if (OB_SUCC(ret)) {
const int64_t all_svr_cache_update_interval = ATOMIC_LOAD(&all_server_cache_update_interval_);
if (REACH_TIME_INTERVAL_THREAD_LOCAL(all_svr_cache_update_interval)) {
if (OB_FAIL(update_server_cache_())) {
LOG_ERROR("update server cache error", KR(ret));
} else if (OB_FAIL(purge_stale_records_())) {
LOG_ERROR("purge stale records fail", KR(ret));
} else {
// do nothing
}
}
}
} else {
// is_tenant_mode_ = true
const int64_t all_svr_cache_update_interval = ATOMIC_LOAD(&all_server_cache_update_interval_);
if (REACH_TIME_INTERVAL_THREAD_LOCAL(all_svr_cache_update_interval)) {
if (OB_FAIL(update_unit_info_cache_())) {
LOG_WARN("update_unit_info_cache_ failed", KR(ret));
}
}
}
}
@ -452,6 +514,76 @@ int ObLogAllSvrCache::update_server_cache_()
return ret;
}
int ObLogAllSvrCache::update_unit_info_cache_()
{
int ret = OB_SUCCESS;
ObUnitsRecordInfo units_record_info;
if (OB_ISNULL(systable_queryer_)) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid systable queryer", KR(ret), K(systable_queryer_));
} else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id_)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid tenant_id", KR(ret), K(tenant_id_));
} else if (OB_FAIL(systable_queryer_->get_all_units_info(tenant_id_, units_record_info))) {
if (OB_NEED_RETRY == ret) {
LOG_WARN("query the GV$OB_UNITS failed, need retry", KR(ret));
ret = OB_SUCCESS;
} else {
LOG_ERROR("query the GV$OB_UNITS failed", KR(ret));
}
} else {
int64_t next_version = cur_version_ + 1;
ObUnitsRecordInfo::ObUnitsRecordArray &units_record_array = units_record_info.get_units_record_array();
ARRAY_FOREACH_N(units_record_array, idx, count) {
ObUnitsRecord &record = units_record_array.at(idx);
ObAddr &svr = record.server_;
RegionPriority region_priority = REGION_PRIORITY_UNKNOWN;
if (OB_FAIL(get_region_priority_(record.region_, region_priority))) {
LOG_ERROR("get priority based region fail", KR(ret), K(svr),
"region", record.region_,
"region_priority", print_region_priority(region_priority));
} else {
UnitsRecordItem units_record_item;
units_record_item.reset(next_version, record.zone_, record.zone_type_, region_priority);
if (OB_FAIL(units_map_.insert_or_update(svr, units_record_item))) {
LOG_ERROR("units_map_ insert_or_update fail", KR(ret), K(svr), K(units_record_item));
}
_LOG_INFO("[STAT] [ALL_SERVER_LIST] INDEX=%ld/%ld SERVER=%s "
"ZONE=%s REGION=%s(%s) VERSION=%lu",
idx, count, to_cstring(svr),
to_cstring(record.zone_), to_cstring(record.region_),
print_region_priority(region_priority), next_version);
}
}
ATOMIC_INC(&cur_version_);
_LOG_INFO("[STAT] [ALL_SERVER_LIST] COUNT=%ld VERSION=%lu", units_record_array.count(), cur_version_);
}
return ret;
}
int ObLogAllSvrCache::get_units_record_item_(
const common::ObAddr &svr,
UnitsRecordItem &item)
{
int ret = OB_SUCCESS;
item.reset();
if (OB_FAIL(units_map_.get(svr, item))) {
if (OB_ENTRY_NOT_EXIST != ret) {
LOG_ERROR("get UnitsRecordItem from map fail", KR(ret), K(svr));
}
}
return ret;
}
int ObLogAllSvrCache::get_zone_item_(const common::ObZone &zone,
ZoneItem &zone_item)
{

View File

@ -63,6 +63,8 @@ public:
public:
int init(ObLogSysTableQueryer &systable_queryer,
const bool is_tenant_mode,
const uint64_t tenant_id,
const common::ObRegion &prefer_region,
const int64_t all_server_cache_update_interval_sec,
const int64_t all_zone_cache_update_interval_sec);
@ -81,18 +83,27 @@ private:
// 2. other region or empty region(no retion info for lower version of observer)
// region_priority = REGION_PRIORITY_LOW
int get_region_priority_(const common::ObRegion &region, RegionPriority &priority);
bool is_assign_region_(const common::ObRegion &region) const;
bool is_assign_region_(const common::ObRegion &region);
struct UnitsRecordItem;
int get_units_record_item_(const common::ObAddr &svr, UnitsRecordItem &item);
bool need_update_zone_();
int update_zone_cache_();
int update_server_cache_();
int purge_stale_records_();
int purge_stale_zone_records_();
int update_unit_info_cache_();
// NOTE: server serve in such cases:
// 1. server status is ACTIVE or DELETING
// 2. server not in ENCRYPTION zone
bool is_svr_serve_(const SvrItem &svr_item, const ZoneItem &zone_item) const;
// NOTE: server serve in such cases:
// 1. server not in ENCRYPTION zone
bool is_svr_serve_(const UnitsRecordItem &units_record_item) const;
private:
typedef share::ObServerStatus::DisplayStatus StatusType;
typedef share::ObZoneInfo::StorageType ZoneStorageType;
@ -187,10 +198,44 @@ private:
bool operator()(const common::ObZone &zone, const ZoneItem &zone_item);
};
struct UnitsRecordItem
{
uint64_t version_;
common::ObZone zone_;
common::ObZoneType zone_type_;
RegionPriority region_priority_;
void reset()
{
version_ = -1;
zone_.reset();
zone_type_ = common::ZONE_TYPE_INVALID;
region_priority_ = REGION_PRIORITY_UNKNOWN;
}
void reset(
const uint64_t version,
const common::ObZone &zone,
const common::ObZoneType &zone_type,
const RegionPriority region_priority)
{
version_ = version;
zone_ = zone;
zone_type_ = zone_type;
region_priority_ = region_priority;
}
const common::ObZoneType& get_zone_type() const { return zone_type_; }
TO_STRING_KV(K_(zone), K_(zone_type), K_(region_priority));
};
typedef common::ObLinearHashMap<common::ObAddr, UnitsRecordItem> UnitsMap;
// set all_server_cache_update_interval for unitest
void set_update_interval_(const int64_t time);
private:
bool is_tenant_mode_;
uint64_t tenant_id_;
ObLogSysTableQueryer *systable_queryer_;
int64_t all_server_cache_update_interval_;
int64_t all_zone_cache_update_interval_;
@ -205,6 +250,7 @@ private:
SvrMap svr_map_;
ZoneMap zone_map_;
UnitsMap units_map_;
private:
DISALLOW_COPY_AND_ASSIGN(ObLogAllSvrCache);

View File

@ -27,9 +27,10 @@ namespace logservice
{
ObLogRouteService::ObLogRouteService() :
is_inited_(false),
is_tenant_mode_(false),
is_stopped_(true),
cluster_id_(OB_INVALID_CLUSTER_ID),
is_tenant_mode_(false),
tenant_id_(OB_INVALID_TENANT_ID),
is_stopped_(true),
ls_route_key_set_(),
ls_router_map_(),
log_router_allocator_(),
@ -70,7 +71,8 @@ int ObLogRouteService::init(ObISQLClient *proxy,
const int64_t blacklist_survival_time_penalty_period_min,
const int64_t blacklist_history_overdue_time_min,
const int64_t blacklist_history_clear_interval_min,
const bool is_tenant_mode)
const bool is_tenant_mode,
const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
@ -100,7 +102,7 @@ int ObLogRouteService::init(ObISQLClient *proxy,
K(external_server_blacklist));
} else if (OB_FAIL(systable_queryer_.init(cluster_id, is_across_cluster, *proxy, err_handler))) {
LOG_WARN("systable_queryer_ init failed", KR(ret), K(cluster_id), K(is_across_cluster));
} else if (OB_FAIL(all_svr_cache_.init(systable_queryer_, prefer_region,
} else if (OB_FAIL(all_svr_cache_.init(systable_queryer_, is_tenant_mode, tenant_id, prefer_region,
all_server_cache_update_interval_sec, all_zone_cache_update_interval_sec))) {
LOG_WARN("all_svr_cache_ init failed", KR(ret), K(is_tenant_mode), K(prefer_region),
K(all_server_cache_update_interval_sec), K(all_zone_cache_update_interval_sec));
@ -112,6 +114,7 @@ int ObLogRouteService::init(ObISQLClient *proxy,
LOG_WARN("TG_SET_HANDLER_AND_START failed", KR(ret), K(tg_id_));
} else {
cluster_id_ = cluster_id;
tenant_id_ = tenant_id;
log_router_allocator_.set_nway(NWAY);
asyn_task_allocator_.set_nway(NWAY);
timer_id_ = lib::TGDefIDs::LogRouterTimer;
@ -201,6 +204,7 @@ void ObLogRouteService::destroy()
err_handler_ = NULL;
cluster_id_ = OB_INVALID_CLUSTER_ID;
tenant_id_ = OB_INVALID_TENANT_ID;
background_refresh_time_sec_ = 0;
blacklist_survival_time_sec_ = 0;
blacklist_survival_time_upper_limit_min_ = 0;
@ -314,7 +318,7 @@ int ObLogRouteService::get_background_refresh_time(int64_t &background_refresh_t
return ret;
}
int ObLogRouteService::update_assign_region(const common::ObRegion &prefer_region)
int ObLogRouteService::update_preferred_upstream_log_region(const common::ObRegion &prefer_region)
{
int ret = OB_SUCCESS;
@ -328,7 +332,7 @@ int ObLogRouteService::update_assign_region(const common::ObRegion &prefer_regio
return ret;
}
int ObLogRouteService::get_assign_region(common::ObRegion &prefer_region)
int ObLogRouteService::get_preferred_upstream_log_region(common::ObRegion &prefer_region)
{
int ret = OB_SUCCESS;
@ -924,10 +928,9 @@ int ObLogRouteService::update_server_list_(
const ObAddr &server = record.server_;
RegionPriority region_priority = REGION_PRIORITY_UNKNOWN;
// TODO support wait GV$UNIT
// if (! all_svr_cache_.is_svr_avail(server, region_priority)) {
// ignore server not in __all_server table
if (OB_FAIL(ls_svr_list.add_server_or_update(server,
if (! all_svr_cache_.is_svr_avail(server, region_priority)) {
// ignore the server which is not available
} else if (OB_FAIL(ls_svr_list.add_server_or_update(server,
record.begin_lsn_, record.end_lsn_, region_priority, (LEADER == record.role_)))) {
LOG_WARN("ObLogRouteService add_server_or_update failed", KR(ret), K(router_key),
K(router_value));
@ -976,9 +979,7 @@ int ObLogRouteService::update_all_server_and_zone_cache_()
ret = OB_NOT_INIT;
LOG_ERROR("ObLogRouteService has not been inited", KR(ret));
} else {
if (! is_tenant_mode_) {
all_svr_cache_.query_and_update();
}
all_svr_cache_.query_and_update();
}
return ret;

View File

@ -85,7 +85,8 @@ public:
const int64_t blacklist_survival_time_penalty_period_min = 1,
const int64_t blacklist_history_overdue_time_min = 30,
const int64_t blacklist_history_clear_interval_min = 20,
const bool is_tenant_mode = false);
const bool is_tenant_mode = false,
const uint64_t tenant_id = OB_INVALID_TENANT_ID);
int start();
void stop();
void wait();
@ -99,8 +100,8 @@ public:
int get_background_refresh_time(int64_t &background_refresh_time_sec);
// Region
int update_assign_region(const common::ObRegion &prefer_region);
int get_assign_region(common::ObRegion &prefer_region);
int update_preferred_upstream_log_region(const common::ObRegion &prefer_region);
int get_preferred_upstream_log_region(common::ObRegion &prefer_region);
// Cache interval
int update_cache_update_interval(const int64_t all_server_cache_update_interval_sec,
@ -350,9 +351,10 @@ private:
private:
bool is_inited_;
bool is_tenant_mode_;
volatile bool is_stopped_ CACHE_ALIGNED;
int64_t cluster_id_;
bool is_tenant_mode_;
uint64_t tenant_id_;
volatile bool is_stopped_ CACHE_ALIGNED;
LSRouteKeySet ls_route_key_set_;
LSRouterMap ls_router_map_;
ObSliceAlloc log_router_allocator_;

View File

@ -100,7 +100,7 @@ int ObLogSysTableQueryer::get_ls_log_info(
tenant_id, ls_id.id()))) {
LOG_WARN("assign sql string failed", KR(ret), K(tenant_id),
K(ls_id));
// Use OB_SYS_TENANT_ID to query GV$OB_LOG_STAT
// Use OB_SYS_TENANT_ID to query the GV$OB_LOG_STAT
} else if (OB_FAIL(do_query_(OB_SYS_TENANT_ID, sql, result))) {
LOG_WARN("do_query_ failed", KR(ret), K(cluster_id_), K(tenant_id), "sql", sql.ptr());
} else if (OB_ISNULL(result.get_result())) {
@ -119,6 +119,46 @@ int ObLogSysTableQueryer::get_ls_log_info(
return ret;
}
////////////////////////////////////// QueryAllUnitsInfo /////////////////////////////////
int ObLogSysTableQueryer::get_all_units_info(
const uint64_t tenant_id,
ObUnitsRecordInfo &units_record_info)
{
int ret = OB_SUCCESS;
const char *select_fields = "SVR_IP, SVR_PORT, ZONE, ZONE_TYPE, REGION";
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObLogSysTableQueryer not init", KR(ret));
} else if (OB_FAIL(units_record_info.init(cluster_id_))) {
LOG_WARN("fail to init units_record_info", KR(ret), K(cluster_id_));
} else {
ObSqlString sql;
int64_t record_count;
SMART_VAR(ObISQLClient::ReadResult, result) {
if (OB_FAIL(sql.assign_fmt(
"SELECT %s FROM %s"
" WHERE tenant_id = %lu",
select_fields, OB_GV_OB_UNITS_TNAME, tenant_id))) {
LOG_WARN("assign sql string failed", KR(ret), K(cluster_id_), K(tenant_id));
// Use OB_SYS_TENANT_ID to query the GV$OB_UNITS
} else if (OB_FAIL(do_query_(OB_SYS_TENANT_ID, sql, result))) {
LOG_WARN("do_query_ failed", KR(ret), K(cluster_id_), K(tenant_id), "sql", sql.ptr());
} else if (OB_ISNULL(result.get_result())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get mysql result failed", KR(ret));
} else if (OB_FAIL(get_records_template_(*result.get_result(), units_record_info,
"ObUnitsRecordInfo", record_count))) {
LOG_WARN("construct units record info failed", KR(ret), K(units_record_info));
}
}
}
return ret;
}
////////////////////////////////////// QueryAllServerInfo /////////////////////////////////
int ObLogSysTableQueryer::get_all_server_info(
const uint64_t tenant_id,
@ -308,6 +348,43 @@ int ObLogSysTableQueryer::get_records_template_(common::sqlclient::ObMySQLResult
return ret;
}
////////////////////////////////////// QueryAllUnitsInfo - parse /////////////////////////////////
int ObLogSysTableQueryer::parse_record_from_row_(common::sqlclient::ObMySQLResult &res,
ObUnitsRecordInfo &units_record_info)
{
int ret = OB_SUCCESS;
ObUnitsRecord units_record;
ObString ip;
int64_t port = 0;
common::ObAddr server;
ObString zone;
ObString region;
ObString zone_type_str;
(void)GET_COL_IGNORE_NULL(res.get_varchar, "SVR_IP", ip);
(void)GET_COL_IGNORE_NULL(res.get_int, "SVR_PORT", port);
(void)GET_COL_IGNORE_NULL(res.get_varchar, "ZONE", zone);
(void)GET_COL_IGNORE_NULL(res.get_varchar, "ZONE_TYPE", zone_type_str);
(void)GET_COL_IGNORE_NULL(res.get_varchar, "REGION", region);
if (false == server.set_ip_addr(ip, static_cast<uint32_t>(port))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid server address", K(ip), K(port));
} else {
common::ObZoneType zone_type = str_to_zone_type(zone_type_str.ptr());
if (OB_FAIL(units_record.init(server, zone, zone_type, region))) {
LOG_ERROR("units_record init failed", KR(ret), K(server), K(zone), K(zone_type), K(region));
} else if (OB_FAIL(units_record_info.add(units_record))) {
LOG_WARN("units_record_info add failed", KR(ret), K(units_record));
} else {
LOG_TRACE("units_record_info add success", K(units_record));
}
}
return ret;
}
int ObLogSysTableQueryer::parse_record_from_row_(common::sqlclient::ObMySQLResult &res,
ObLSLogInfo &ls_log_info)
{

View File

@ -17,6 +17,7 @@
#include "ob_all_server_info.h" // ObAllServerInfo
#include "ob_all_zone_info.h" // ObAllZoneInfo, ObAllZoneTypeInfo
#include "src/logservice/logfetcher/ob_log_fetcher_err_handler.h"
#include "ob_all_units_info.h" // ObUnitsRecordInfo
namespace oceanbase
{
@ -49,6 +50,11 @@ public:
const share::ObLSID &ls_id,
ObLSLogInfo &ls_log_info);
// SELECT SVR_IP, SVR_PORT, ZONE, ZONE_TYPE, REGION from GV$OB_UNITS;
int get_all_units_info(
const uint64_t tenant_id,
ObUnitsRecordInfo &units_record_info);
int get_all_server_info(
const uint64_t tenant_id,
ObAllServerInfo &all_server_info);
@ -72,6 +78,12 @@ private:
const char *event,
int64_t &record_count);
// ObUnitsRecordInfo
// @param [in] res, result read from the GV$OB_UNITS table
// @param [out] units_record_info, items in the GV$OB_UNITS table
int parse_record_from_row_(common::sqlclient::ObMySQLResult &res,
ObUnitsRecordInfo &units_record_info);
// ObLSLogInfo
// @param [in] res, result read from __all_virtual_log_stat table
// @param [out] ls_log_info, meta/user tenant's LS Palf Info

View File

@ -233,7 +233,10 @@ bool LSSvrList::need_switch_server(const ObLSRouterKey &key,
bool is_svr_invalid = false;
SvrItem &svr_item = svr_items_.at(svr_idx);
svr_item.check_and_update_serve_info(next_lsn, is_log_served, is_svr_invalid);
// Switch the Server scenario and consider that the Server is always serving
svr_item.check_and_update_serve_info(true/*is_always_serving*/, next_lsn, is_log_served, is_svr_invalid);
LOG_INFO("need_switch_server", K(key), K(next_lsn), K(cur_svr), K(svr_item), K(is_log_served), K(is_svr_invalid));
if (is_log_served && !is_svr_invalid && !blacklist.exist(svr_item.svr_)) {
if (cur_svr == svr_item.svr_) {
@ -326,7 +329,7 @@ int LSSvrList::get_next_server_based_on_blacklist_(const palf::LSN &next_lsn,
// Automatically modify next_svr_index_ to move to the next server
int64_t svr_idx = next_svr_index_++ % svr_items_.count();
SvrItem &svr_item = svr_items_.at(svr_idx);
svr_item.check_and_update_serve_info(next_lsn, is_log_served, is_svr_invalid);
svr_item.check_and_update_serve_info(false/*is_always_serving*/,next_lsn, is_log_served, is_svr_invalid);
LOG_DEBUG("next_server-debug 2", K(next_lsn), K(svr_items_), K(is_log_served));
@ -414,7 +417,9 @@ void LSSvrList::SvrItem::update(const palf::LSN &start_lsn,
is_leader_ = is_leader;
}
void LSSvrList::SvrItem::check_and_update_serve_info(const palf::LSN &lsn,
void LSSvrList::SvrItem::check_and_update_serve_info(
const bool is_always_serving,
const palf::LSN &lsn,
bool &is_log_served,
bool &is_server_invalid)
{
@ -426,6 +431,11 @@ void LSSvrList::SvrItem::check_and_update_serve_info(const palf::LSN &lsn,
if (! is_log_served) {
is_server_invalid = true;
}
if (is_always_serving) {
is_log_served = true;
is_server_invalid = false;
}
}
bool LSSvrList::SvrItem::is_priority_equal(const SvrItem &svr_item) const

View File

@ -186,7 +186,9 @@ private:
// @param [in] lsn target lsn
// @param [out] is_log_served whether serve target log id
// @param [out] is_server_invalid whether the server is no longer valid (no more valid ranges)
void check_and_update_serve_info(const palf::LSN &lsn,
void check_and_update_serve_info(
const bool is_always_serving,
const palf::LSN &lsn,
bool &is_log_served,
bool &is_server_invalid);

View File

@ -220,6 +220,7 @@ int ObLogRestoreNetDriver::scan_ls(const share::ObLogRestoreSourceType &type)
}
delete_fetcher_if_needed_with_lock_();
update_config_();
update_standby_preferred_upstream_log_region_();
return ret;
}
@ -461,6 +462,19 @@ int64_t ObLogRestoreNetDriver::get_rpc_timeout_sec_()
return rpc_timeout;
}
void ObLogRestoreNetDriver::update_standby_preferred_upstream_log_region_()
{
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id_));
if (! tenant_config.is_valid()) {
} else {
if (nullptr != fetcher_) {
const char *region_string = tenant_config->standby_db_preferred_upstream_log_region;
fetcher_->update_preferred_upstream_log_region(common::ObRegion(region_string));
}
}
}
bool ObLogRestoreNetDriver::is_fetcher_stale_(const int64_t cluster_id, const uint64_t tenant_id)
{
bool bret = false;

View File

@ -17,6 +17,7 @@
#include "lib/ob_errno.h"
#include "lib/utility/ob_macro_utils.h"
#include "lib/compress/ob_compress_util.h" // ObCompressorType
#include "common/ob_region.h" // ObRegion
#include "logservice/logfetcher/ob_log_fetcher_ls_ctx_additional_info_factory.h"
#include "logservice/logfetcher/ob_log_fetcher_err_handler.h"
#include "logservice/logfetcher/ob_log_fetcher_ls_ctx_default_factory.h"
@ -94,6 +95,9 @@ private:
void delete_fetcher_if_needed_with_lock_();
void update_config_();
int64_t get_rpc_timeout_sec_();
// update standby_fetch_log_specified_region
void update_standby_preferred_upstream_log_region_();
int refresh_proxy_(const share::ObRestoreSourceServiceAttr &source);

View File

@ -604,6 +604,13 @@ DEF_INT(_log_writer_parallelism, OB_TENANT_PARAMETER, "3",
"the number of parallel log writer threads that can be used to write redo log entries to disk. ",
ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::STATIC_EFFECTIVE));
DEF_STR(standby_db_preferred_upstream_log_region, OB_TENANT_PARAMETER, "",
"The preferred upstream log region for Standby db. "
"The Standby db will give priority to the preferred upstream log region to fetch log. "
"For high availability,the Standby db will also switch to the other region "
"when the preferred upstream log region can not fetch log because of exception etc.",
ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
// ========================= LogService Config End =====================
DEF_INT(resource_hard_limit, OB_CLUSTER_PARAMETER, "100", "[100, 10000]",
"system utilization should not be large than resource_hard_limit",

View File

@ -204,6 +204,7 @@ ssl_client_authentication
ssl_external_kms_info
stack_size
standby_db_fetch_log_rpc_timeout
standby_db_preferred_upstream_log_region
standby_fetch_log_bandwidth_limit
syslog_io_bandwidth_limit
syslog_level