reporting events of PALF to history table asynchronously

This commit is contained in:
BinChenn 2023-04-22 14:34:34 +00:00 committed by ob-robot
parent a72f656bab
commit 1d918d8dbf
29 changed files with 650 additions and 143 deletions

View File

@ -49,6 +49,7 @@ ob_set_subtarget(ob_logservice common
ob_server_log_block_mgr.cpp
ob_log_flashback_service.cpp
ob_net_keepalive_adapter.cpp
ob_log_monitor.cpp
)
ob_set_subtarget(ob_logservice common_mixed

View File

@ -0,0 +1,78 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "ob_log_monitor.h"
#include "observer/ob_server_event_history_table_operator.h" // SERVER_EVENT_ADD_WITH_RETRY
namespace oceanbase
{
namespace logservice
{
#define LOG_MONITOR_EVENT_FMT_PREFIX "LOG", type_to_string_(event), "TENANT_ID", mtl_id, "LS_ID", palf_id
// =========== PALF Event Reporting ===========
int ObLogMonitor::record_role_change_event(const int64_t palf_id,
const common::ObRole &prev_role,
const palf::ObReplicaState &prev_state,
const common::ObRole &curr_role,
const palf::ObReplicaState &curr_state,
const char *extra_info)
{
int ret = OB_SUCCESS;
int pret = OB_SUCCESS;
const int64_t mtl_id = MTL_ID();
const EventType event = EventType::ROLE_TRANSITION;
const int64_t MAX_BUF_LEN = 50;
char prev_str[MAX_BUF_LEN] = {'\0'};
char curr_str[MAX_BUF_LEN] = {'\0'};
TIMEGUARD_INIT(LOG_MONITOR, 100_ms, 5_s); \
if (0 >= (pret = snprintf(prev_str, MAX_BUF_LEN, "%s %s", role_to_string(prev_role),
replica_state_to_string(prev_state)))) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "snprintf failed", KR(ret), K(prev_role), K(prev_state));
} else if (0 >= (pret = snprintf(curr_str, MAX_BUF_LEN, "%s %s", role_to_string(curr_role),
replica_state_to_string(curr_state)))) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "snprintf failed", KR(ret), K(curr_role), K(curr_state));
} else {
CLICK();
if (OB_NOT_NULL(extra_info)) {
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX,
"PREVIOUS", prev_str,
"CURRENT", curr_str,
"", NULL,
"", NULL,
extra_info);
} else {
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX,
"PREVIOUS", prev_str,
"CURRENT", curr_str);
}
CLICK();
}
return ret;
}
// =========== PALF Event Reporting ===========
// =========== PALF Performance Statistic ===========
int ObLogMonitor::add_log_write_stat(const int64_t palf_id, const int64_t log_write_size)
{
int ret = OB_SUCCESS;
// TODO
return ret;
}
// =========== PALF Performance Statistic ===========
#undef LOG_MONITOR_EVENT_FMT_PREFIX
} // end namespace logservice
} // end namespace oceanbase

View File

@ -0,0 +1,71 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LOGSERVICE_OB_LOG_MONITOR_H_
#define OCEANBASE_LOGSERVICE_OB_LOG_MONITOR_H_
#include "palf/palf_callback.h"
namespace oceanbase
{
namespace logservice
{
class ObLogMonitor : public palf::PalfMonitorCb
{
public:
ObLogMonitor() { }
virtual ~ObLogMonitor() { }
public:
// =========== PALF Event Reporting ===========
int record_role_change_event(const int64_t palf_id,
const common::ObRole &prev_role,
const palf::ObReplicaState &prev_state,
const common::ObRole &curr_role,
const palf::ObReplicaState &curr_state,
const char *extra_info = NULL) override final;
// =========== PALF Event Reporting ===========
public:
// =========== PALF Performance Statistic ===========
int add_log_write_stat(const int64_t palf_id, const int64_t log_write_size) override final;
// =========== PALF Performance Statistic ===========
private:
enum EventType
{
UNKNOWN = 0,
DEGRADE,
UPGRADE,
ROLE_TRANSITION,
};
const char *type_to_string_(const EventType &event) const
{
#define CHECK_LOG_EVENT_TYPE_STR(x) case(EventType::x): return #x
switch (event)
{
CHECK_LOG_EVENT_TYPE_STR(DEGRADE);
CHECK_LOG_EVENT_TYPE_STR(UPGRADE);
case (EventType::ROLE_TRANSITION):
return "ROLE TRANSITION";
default:
return "UNKNOWN";
}
#undef CHECK_LOG_EVENT_TYPE_STR
}
private:
DISALLOW_COPY_AND_ASSIGN(ObLogMonitor);
};
} // logservice
} // oceanbase
#endif

View File

@ -59,7 +59,9 @@ ObLogService::ObLogService() :
ls_adapter_(),
rpc_proxy_(),
reporter_(),
restore_service_()
restore_service_(),
flashback_service_(),
monitor_()
{}
ObLogService::~ObLogService()
@ -225,7 +227,7 @@ int ObLogService::init(const PalfOptions &options,
KP(alloc_mgr), KP(transport), KP(ls_service), KP(location_service), KP(reporter),
KP(log_block_pool), KP(sql_proxy), KP(net_keepalive_adapter));
} else if (OB_FAIL(PalfEnv::create_palf_env(options, base_dir, self, transport,
alloc_mgr, log_block_pool, palf_env_))) {
alloc_mgr, log_block_pool, &monitor_, palf_env_))) {
CLOG_LOG(WARN, "failed to create_palf_env", K(base_dir), K(ret));
} else if (OB_ISNULL(palf_env_)) {
ret = OB_ERR_UNEXPECTED;

View File

@ -31,6 +31,7 @@
#include "ob_location_adapter.h"
#include "ob_log_flashback_service.h" // ObLogFlashbackService
#include "ob_log_handler.h"
#include "ob_log_monitor.h"
namespace oceanbase
{
@ -228,6 +229,7 @@ private:
cdc::ObCdcService cdc_service_;
ObLogRestoreService restore_service_;
ObLogFlashbackService flashback_service_;
ObLogMonitor monitor_;
ObSpinLock update_palf_opts_lock_;
private:
DISALLOW_COPY_AND_ASSIGN(ObLogService);

View File

@ -79,6 +79,7 @@ LogConfigMgr::LogConfigMgr()
election_(NULL),
mode_mgr_(NULL),
reconfirm_(NULL),
plugins_(NULL),
is_inited_(false)
{}
@ -96,7 +97,8 @@ int LogConfigMgr::init(const int64_t palf_id,
LogStateMgr *state_mgr,
election::Election* election,
LogModeMgr *mode_mgr,
LogReconfirm *reconfirm)
LogReconfirm *reconfirm,
LogPlugins *plugins)
{
int ret = OB_SUCCESS;
if (is_inited_) {
@ -109,10 +111,13 @@ int LogConfigMgr::init(const int64_t palf_id,
OB_ISNULL(sw) ||
OB_ISNULL(state_mgr) ||
OB_ISNULL(election) ||
OB_ISNULL(mode_mgr)) {
OB_ISNULL(mode_mgr) ||
OB_ISNULL(reconfirm) ||
OB_ISNULL(plugins)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "invalid argument", KR(ret), K(palf_id), K(self),
K(log_ms_meta), K(log_engine), K(sw), K(state_mgr), K(election), K(mode_mgr));
K(log_ms_meta), KP(log_engine), KP(sw), KP(state_mgr), KP(election), KP(mode_mgr),
KP(reconfirm), KP(plugins));
} else if (OB_FAIL(paxos_member_region_map_.init("LogRegionMap", OB_MAX_MEMBER_NUMBER))) {
PALF_LOG(WARN, "paxos_member_region_map_ init failed", K(palf_id));
} else {
@ -126,6 +131,7 @@ int LogConfigMgr::init(const int64_t palf_id,
election_ = election;
mode_mgr_ = mode_mgr;
reconfirm_ = reconfirm;
plugins_ = plugins;
if (true == log_ms_meta.curr_.is_valid()) {
if (OB_FAIL(append_config_info_(log_ms_meta.curr_))) {
PALF_LOG(WARN, "append_config_info_ failed", K(ret), K(palf_id), K(log_ms_meta));
@ -156,6 +162,7 @@ void LogConfigMgr::destroy()
sw_ = NULL;
log_engine_ = NULL;
reconfirm_ = NULL;
plugins_ = NULL;
register_time_us_ = OB_INVALID_TIMESTAMP;
parent_.reset();
parent_keepalive_time_us_ = OB_INVALID_TIMESTAMP;

View File

@ -215,7 +215,8 @@ public:
LogStateMgr *state_mgr,
election::Election *election,
LogModeMgr *mode_mgr,
LogReconfirm *reconfirm);
LogReconfirm *reconfirm,
LogPlugins *plugins);
virtual void destroy();
// require caller holds WLock in PalfHandleImpl
@ -557,6 +558,7 @@ private:
election::Election* election_;
LogModeMgr *mode_mgr_;
LogReconfirm *reconfirm_;
LogPlugins *plugins_;
bool is_inited_;
DISALLOW_COPY_AND_ASSIGN(LogConfigMgr);
};

View File

@ -39,6 +39,12 @@ namespace palf
#define PALF_EVENT(info_string, palf_id, args...) FLOG_INFO("[PALF_EVENT] "info_string, "palf_id", palf_id, args)
#define PALF_EVENT_REPORT_INFO_KV(args...) \
const int64_t MAX_INFO_LENGTH = 512; \
char EXTRA_INFOS[MAX_INFO_LENGTH]; \
int64_t pos = 0; \
::oceanbase::common::databuff_print_kv(EXTRA_INFOS, MAX_INFO_LENGTH, pos, ##args); \
typedef int FileDesc;
typedef uint64_t block_id_t ;
typedef uint64_t offset_t;

View File

@ -76,6 +76,7 @@ LogSlidingWindow::LogSlidingWindow()
mm_(NULL),
mode_mgr_(NULL),
log_engine_(NULL),
plugins_(NULL),
lsn_allocator_(),
group_buffer_(),
last_submit_info_lock_(common::ObLatchIds::PALF_SW_SUBMIT_INFO_LOCK),
@ -113,8 +114,6 @@ LogSlidingWindow::LogSlidingWindow()
submit_log_handling_lease_(),
last_fetch_log_renew_leader_ts_us_(OB_INVALID_TIMESTAMP),
end_lsn_stat_time_us_(OB_INVALID_TIMESTAMP),
lc_cb_lock_(common::ObLatchIds::PALF_SW_LOC_CB_LOCK),
lc_cb_(NULL),
reconfirm_fetch_dest_(),
is_truncating_(false),
is_rebuilding_(false),
@ -142,10 +141,10 @@ void LogSlidingWindow::destroy()
sw_.destroy();
group_buffer_.destroy();
match_lsn_map_.destroy();
lc_cb_ = NULL;
reconfirm_fetch_dest_.reset();
state_mgr_ = NULL;
log_engine_ = NULL;
plugins_ = NULL;
mm_ = NULL;
mode_mgr_ = NULL;
}
@ -208,6 +207,7 @@ int LogSlidingWindow::init(const int64_t palf_id,
LogEngine *log_engine,
palf::PalfFSCbWrapper *palf_fs_cb,
common::ObILogAllocator *alloc_mgr,
LogPlugins *plugins,
const PalfBaseInfo &palf_base_info,
const bool is_normal_replica)
{
@ -222,10 +222,11 @@ int LogSlidingWindow::init(const int64_t palf_id,
|| NULL == mm
|| NULL == mode_mgr
|| NULL == log_engine
|| NULL == palf_fs_cb) {
|| NULL == palf_fs_cb
|| NULL == plugins) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "invalid argumetns", K(ret), K(palf_id), K(self), K(palf_base_info),
KP(state_mgr), KP(mm), KP(mode_mgr), KP(log_engine), KP(palf_fs_cb));
KP(state_mgr), KP(mm), KP(mode_mgr), KP(log_engine), KP(palf_fs_cb), KP(plugins));
} else if (is_normal_replica && OB_FAIL(do_init_mem_(palf_id, palf_base_info, alloc_mgr))) {
PALF_LOG(WARN, "do_init_mem_ failed", K(ret), K(palf_id));
} else {
@ -236,6 +237,7 @@ int LogSlidingWindow::init(const int64_t palf_id,
mode_mgr_ = mode_mgr;
log_engine_ = log_engine;
palf_fs_cb_ = palf_fs_cb;
plugins_ = plugins;
last_submit_lsn_ = prev_log_info.lsn_;
last_submit_end_lsn_ = palf_base_info.curr_lsn_;
@ -1951,12 +1953,10 @@ int LogSlidingWindow::get_fetch_log_dst_(common::ObAddr &fetch_dst) const
fetch_dst = parent;
} else if (state_mgr_leader.is_valid()) {
fetch_dst = state_mgr_leader;
} else if (OB_ISNULL(lc_cb_)) {
PALF_LOG(TRACE, "lc_cb_ is NULL", K(ret), K_(palf_id), K_(self));
} else if (palf_reach_time_interval(PALF_FETCH_LOG_RENEW_LEADER_INTERVAL_US, last_fetch_log_renew_leader_ts_us_) &&
OB_FAIL(lc_cb_->nonblock_renew_leader(palf_id_))) {
OB_FAIL(plugins_->nonblock_renew_leader(palf_id_))) {
PALF_LOG(WARN, "nonblock_renew_leader failed", KR(ret), K_(palf_id), K_(self));
} else if (OB_FAIL(lc_cb_->nonblock_get_leader(palf_id_, fetch_dst))) {
} else if (OB_FAIL(plugins_->nonblock_get_leader(palf_id_, fetch_dst))) {
if (palf_reach_time_interval(5 * 1000 * 1000, lc_cb_get_warn_time_)) {
PALF_LOG(WARN, "nonblock_get_leader failed", KR(ret), K_(palf_id), K_(self));
}
@ -4122,39 +4122,6 @@ void LogSlidingWindow::LogTaskGuard::revert_log_task() {
log_id_ = -1;
}
int LogSlidingWindow::set_location_cache_cb(PalfLocationCacheCb *lc_cb)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_ISNULL(lc_cb)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "lc_cb is NULL, can't register", KR(ret), K_(palf_id), K_(self));
} else {
ObSpinLockGuard guard(lc_cb_lock_);
if (OB_NOT_NULL(lc_cb_)) {
ret = OB_NOT_SUPPORTED;
PALF_LOG(WARN, "lc_cb_ is not NULL, can't register", KR(ret), K_(palf_id), K_(self));
} else {
lc_cb_ = lc_cb;
PALF_LOG(INFO, "set_location_cache_cb success", K_(palf_id), K_(self), KP_(lc_cb));
}
}
return ret;
}
int LogSlidingWindow::reset_location_cache_cb()
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
ObSpinLockGuard guard(lc_cb_lock_);
lc_cb_ = NULL;
}
return ret;
}
int LogSlidingWindow::get_min_scn_from_buf_(const LogGroupEntryHeader &header,
const char *buf,
const int64_t buf_len,

View File

@ -159,6 +159,7 @@ public:
LogEngine *log_engine,
palf::PalfFSCbWrapper *palf_fs_cb,
common::ObILogAllocator *alloc_mgr,
LogPlugins *plugins,
const PalfBaseInfo &palf_base_info,
const bool is_normal_replica);
virtual int sliding_cb(const int64_t sn, const FixedSlidingWindowSlot *data);
@ -248,9 +249,6 @@ public:
virtual int get_ack_info_array(LogMemberAckInfoList &ack_info_array) const;
virtual int pre_check_before_degrade_upgrade(const LogMemberAckInfoList &servers,
bool is_degrade);
// location cache will be removed TODO by yunlong
virtual int set_location_cache_cb(PalfLocationCacheCb *lc_cb);
virtual int reset_location_cache_cb();
virtual int advance_reuse_lsn(const LSN &flush_log_end_lsn);
virtual int try_send_committed_info(const common::ObAddr &server,
const LSN &log_lsn,
@ -436,6 +434,7 @@ private:
LogConfigMgr *mm_;
LogModeMgr *mode_mgr_;
LogEngine *log_engine_;
LogPlugins *plugins_;
palf::PalfFSCbWrapper *palf_fs_cb_;
LSNAllocator lsn_allocator_;
LogGroupBuffer group_buffer_;
@ -509,9 +508,6 @@ private:
// last_renew_leader_ts in fetch_log
mutable int64_t last_fetch_log_renew_leader_ts_us_;
int64_t end_lsn_stat_time_us_;
// location ptr, will be removed. TODO by yunlong
mutable common::ObSpinLock lc_cb_lock_;
PalfLocationCacheCb *lc_cb_;
// fetch log dest server for reconfirm
common::ObAddr reconfirm_fetch_dest_;
// whether sw is executing trucnation

View File

@ -41,6 +41,7 @@ LogStateMgr::LogStateMgr()
mode_mgr_(NULL),
palf_role_change_cb_(NULL),
election_(NULL),
plugins_(NULL),
role_state_val_(0),
leader_(),
leader_epoch_(OB_INVALID_TIMESTAMP),
@ -73,7 +74,8 @@ int LogStateMgr::init(const int64_t palf_id,
LogEngine *log_engine,
LogConfigMgr *mm,
LogModeMgr *mode_mgr,
palf::PalfRoleChangeCbWrapper *palf_role_change_cb)
palf::PalfRoleChangeCbWrapper *palf_role_change_cb,
LogPlugins *plugins)
{
int ret = OB_SUCCESS;
if (is_inited_) {
@ -81,14 +83,16 @@ int LogStateMgr::init(const int64_t palf_id,
PALF_LOG(ERROR, "init twice", K(ret), K_(palf_id));
} else if (false == is_valid_palf_id(palf_id) || NULL == sw || NULL == reconfirm || NULL == log_engine
|| NULL == election || NULL == mm || NULL == mode_mgr || NULL == palf_role_change_cb ||
!self.is_valid() || !replica_property_meta.is_valid()) {
NULL == plugins || !self.is_valid() || !replica_property_meta.is_valid()) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "invalid arguments", K(ret), K(palf_id), KP(sw), KP(reconfirm), K(log_prepare_meta),
KP(election), KP(log_engine), KP(mm), KP(mode_mgr), KP(palf_role_change_cb), K(replica_property_meta), K(self));
KP(election), KP(log_engine), KP(mm), KP(mode_mgr), KP(palf_role_change_cb), KP(plugins),
K(replica_property_meta), K(self));
} else {
prepare_meta_= log_prepare_meta;
palf_id_ = palf_id;
election_ = election;
plugins_ = plugins;
sw_ = sw;
reconfirm_ = reconfirm;
log_engine_ = log_engine;
@ -497,6 +501,8 @@ void LogStateMgr::destroy()
log_engine_ = NULL;
mm_ = NULL;
mode_mgr_ = NULL;
election_ = NULL;
plugins_ = NULL;
scan_disk_log_finished_ = false;
update_role_and_state_(FOLLOWER, INIT);
leader_.reset();
@ -539,6 +545,8 @@ int LogStateMgr::init_to_follower_active_()
int ret = OB_SUCCESS;
reset_status_();
update_role_and_state_(FOLLOWER, ACTIVE);
plugins_->record_role_change_event(palf_id_, FOLLOWER, ObReplicaState::INIT,
FOLLOWER, ObReplicaState::ACTIVE);
PALF_EVENT("init_to_follower_active", palf_id_, K(ret), K_(self));
return ret;
}
@ -562,7 +570,11 @@ int LogStateMgr::pending_to_follower_active_()
if (OB_FAIL(to_follower_active_())) {
PALF_LOG(ERROR, "to_follower_active_ failed", K(ret), K_(palf_id));
}
PALF_EVENT("follower_pending_to_follower_active", palf_id_, K(ret), K_(self), K_(leader));
PALF_EVENT_REPORT_INFO_KV(K_(leader), K_(pending_end_lsn));
plugins_->record_role_change_event(palf_id_, FOLLOWER, ObReplicaState::PENDING,
FOLLOWER, ObReplicaState::ACTIVE, EXTRA_INFOS);
PALF_EVENT("follower_pending_to_follower_active", palf_id_, K(ret), K_(self), K_(leader),
K_(pending_end_lsn));
return ret;
}
@ -590,6 +602,9 @@ int LogStateMgr::to_reconfirm_(const int64_t new_leader_epoch)
int LogStateMgr::follower_active_to_reconfirm_(const int64_t new_leader_epoch)
{
int ret = to_reconfirm_(new_leader_epoch);
PALF_EVENT_REPORT_INFO_KV(K_(leader), K(new_leader_epoch));
plugins_->record_role_change_event(palf_id_, FOLLOWER, ObReplicaState::ACTIVE,
LEADER, ObReplicaState::RECONFIRM, EXTRA_INFOS);
PALF_EVENT("follower_active_to_reconfirm", palf_id_, K(ret), K_(self), K(new_leader_epoch),
K(leader_), K_(pending_end_lsn));
return ret;
@ -598,6 +613,9 @@ int LogStateMgr::follower_active_to_reconfirm_(const int64_t new_leader_epoch)
int LogStateMgr::follower_pending_to_reconfirm_(const int64_t new_leader_epoch)
{
int ret = to_reconfirm_(new_leader_epoch);
PALF_EVENT_REPORT_INFO_KV(K_(leader), K(new_leader_epoch), K_(pending_end_lsn));
plugins_->record_role_change_event(palf_id_, FOLLOWER, ObReplicaState::PENDING,
LEADER, ObReplicaState::RECONFIRM, EXTRA_INFOS);
PALF_EVENT("follower_pending_to_reconfirm", palf_id_, K(ret), K_(self), K(new_leader_epoch),
K(leader_), K_(pending_end_lsn));
return ret;
@ -605,14 +623,17 @@ int LogStateMgr::follower_pending_to_reconfirm_(const int64_t new_leader_epoch)
int LogStateMgr::reconfirm_to_follower_pending_()
{
PALF_EVENT("reconfirm_to_follower_pending", palf_id_, K_(self), K_(leader), "is_allow_vote",
is_allow_vote(), K(lbt()));
int ret = OB_SUCCESS;
if (OB_FAIL(to_follower_pending_())) {
PALF_LOG(WARN, "to_follower_pending_ failed, try again", K(ret), K_(palf_id));
} else {
reset_status_();
update_role_and_state_(FOLLOWER, PENDING);
PALF_EVENT_REPORT_INFO_KV(K_(leader), K_(allow_vote));
plugins_->record_role_change_event(palf_id_, LEADER, ObReplicaState::RECONFIRM,
FOLLOWER, ObReplicaState::PENDING, EXTRA_INFOS);
PALF_EVENT("reconfirm_to_follower_pending", palf_id_, K_(self), K_(leader), "is_allow_vote",
is_allow_vote(), K(lbt()));
}
return ret;
}
@ -642,6 +663,9 @@ int LogStateMgr::reconfirm_to_leader_active_()
}
const int64_t reconfirm_to_active_cost = ObTimeUtility::current_time() - reconfirm_start_time_us_;
PALF_EVENT("reconfirm_to_leader_active end", palf_id_, K(ret), K_(self), K(reconfirm_to_active_cost), K_(role), K_(state));
PALF_EVENT_REPORT_INFO_KV(K(reconfirm_stage_cost), K(reconfirm_to_active_cost));
plugins_->record_role_change_event(palf_id_, LEADER, ObReplicaState::RECONFIRM,
LEADER, ObReplicaState::ACTIVE, EXTRA_INFOS);
}
return ret;
@ -666,6 +690,9 @@ int LogStateMgr::leader_active_to_follower_pending_()
update_role_and_state_(FOLLOWER, PENDING);
}
PALF_EVENT_REPORT_INFO_KV(K_(pending_end_lsn));
plugins_->record_role_change_event(palf_id_, LEADER, ObReplicaState::ACTIVE,
FOLLOWER, ObReplicaState::PENDING, EXTRA_INFOS);
PALF_EVENT("leader_active_to_follower_pending_", palf_id_, K(ret), K_(self), K_(role), K_(state), K_(pending_end_lsn));
return ret;
}

View File

@ -56,7 +56,8 @@ public:
LogEngine *log_engine,
LogConfigMgr *mm,
LogModeMgr *mode_mgr_,
palf::PalfRoleChangeCbWrapper *palf_role_change_cb);
palf::PalfRoleChangeCbWrapper *palf_role_change_cb,
LogPlugins *plugins);
virtual bool is_state_changed();
virtual int switch_state();
virtual int handle_prepare_request(const common::ObAddr &new_leader,
@ -168,6 +169,7 @@ private:
LogModeMgr *mode_mgr_;
palf::PalfRoleChangeCbWrapper *palf_role_change_cb_;
election::Election* election_;
LogPlugins *plugins_;
union
{
int64_t role_state_val_;

View File

@ -13,6 +13,7 @@
#ifndef OCEANBASE_LOGSERVICE_PALF_CALLBACK_
#define OCEANBASE_LOGSERVICE_PALF_CALLBACK_
#include <stdint.h>
#include "common/ob_role.h"
#include "lib/utility/ob_macro_utils.h"
#include "lib/list/ob_dlink_node.h"
#include "lib/utility/ob_print_utils.h"
@ -54,6 +55,34 @@ public:
virtual int nonblock_renew_leader(const int64_t id) = 0;
};
class PalfMonitorCb
{
public:
// record events
virtual int record_role_change_event(const int64_t palf_id,
const common::ObRole &prev_role,
const palf::ObReplicaState &prev_state,
const common::ObRole &curr_role,
const palf::ObReplicaState &curr_state,
const char *extra_info = "") = 0;
// performance statistic
virtual int add_log_write_stat(const int64_t palf_id, const int64_t log_write_size) = 0;
};
class PalfLiteMonitorCb
{
public:
// @desc: record creating or deleting events
// add/remove cluster: valid cluster_id, invalid tenant_id, invalid ls_id,
// add/remove tenant: valid cluster_id, valid tenant_id, invalid ls_id,
// add/remove ls: valid cluster_id, valid tenant_id, valid ls_id,
virtual int record_create_or_delete_event(const int64_t cluster_id,
const uint64_t tenant_id,
const int64_t ls_id,
const bool is_create,
const char *extra_info) = 0;
};
} // end namespace palf
} // end namespace oceanbase
#endif

View File

@ -168,5 +168,125 @@ int PalfRebuildCbWrapper::on_rebuild(const int64_t id, const LSN &lsn)
return ret;
}
LogPlugins::LogPlugins()
: loc_lock_(),
loc_cb_(NULL),
palf_monitor_lock_(),
palf_monitor_(NULL),
palflite_monitor_lock_(),
palflite_monitor_(NULL) { }
LogPlugins::~LogPlugins()
{
destroy();
}
void LogPlugins::destroy()
{
{
common::RWLock::WLockGuard guard(loc_lock_);
loc_cb_ = NULL;
}
{
common::RWLock::WLockGuard guard(palf_monitor_lock_);
palf_monitor_ = NULL;
}
{
common::RWLock::WLockGuard guard(palflite_monitor_lock_);
palflite_monitor_ = NULL;
}
}
template<>
int LogPlugins::add_plugin(PalfLocationCacheCb *plugin)
{
int ret = OB_SUCCESS;
common::RWLock::WLockGuard guard(loc_lock_);
if (OB_ISNULL(plugin)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "Palf plugin is NULL", KP(plugin));
} else if (OB_NOT_NULL(loc_cb_)) {
ret = OB_OP_NOT_ALLOW;
PALF_LOG(INFO, "Palf plugin is not NULL", KP(plugin), KP_(loc_cb));
} else {
loc_cb_ = plugin;
PALF_LOG(INFO, "add_plugin success", KP(plugin));
}
return ret;
}
template<>
int LogPlugins::del_plugin(PalfLocationCacheCb *plugin)
{
int ret = OB_SUCCESS;
common::RWLock::WLockGuard guard(loc_lock_);
if (OB_NOT_NULL(loc_cb_)) {
PALF_LOG(INFO, "del_plugin success", KP_(loc_cb));
loc_cb_ = NULL;
}
return ret;
}
template<>
int LogPlugins::add_plugin(PalfMonitorCb *plugin)
{
int ret = OB_SUCCESS;
common::RWLock::WLockGuard guard(palf_monitor_lock_);
if (OB_ISNULL(plugin)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "Palf plugin is NULL", KP(plugin));
} else if (OB_NOT_NULL(palf_monitor_)) {
ret = OB_OP_NOT_ALLOW;
PALF_LOG(INFO, "Palf plugin is not NULL", KP(plugin), KP_(palf_monitor));
} else {
palf_monitor_ = plugin;
PALF_LOG(INFO, "add_plugin success", KP(plugin));
}
return ret;
}
template<>
int LogPlugins::del_plugin(PalfMonitorCb *plugin)
{
int ret = OB_SUCCESS;
common::RWLock::WLockGuard guard(palf_monitor_lock_);
if (OB_NOT_NULL(palf_monitor_)) {
PALF_LOG(INFO, "del_plugin success", KP_(palf_monitor));
palf_monitor_ = NULL;
}
return ret;
}
template<>
int LogPlugins::add_plugin(PalfLiteMonitorCb *plugin)
{
int ret = OB_SUCCESS;
common::RWLock::WLockGuard guard(palf_monitor_lock_);
if (OB_ISNULL(plugin)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "Palf plugin is NULL", KP(plugin));
} else if (OB_NOT_NULL(palflite_monitor_)) {
ret = OB_OP_NOT_ALLOW;
PALF_LOG(INFO, "Palf plugin is not NULL", KP(plugin), KP_(loc_cb));
} else {
palflite_monitor_ = plugin;
PALF_LOG(INFO, "add_plugin success", KP(plugin));
}
return ret;
}
template<>
int LogPlugins::del_plugin(PalfLiteMonitorCb *plugin)
{
int ret = OB_SUCCESS;
common::RWLock::WLockGuard guard(palf_monitor_lock_);
if (OB_NOT_NULL(palflite_monitor_)) {
PALF_LOG(INFO, "del_plugin success", KP_(palflite_monitor));
palflite_monitor_ = NULL;
}
return ret;
}
}; // end namespace palf
}; // end namespace oceanbase

View File

@ -16,7 +16,9 @@
#include "lib/list/ob_dlist.h"
#include "lib/ob_errno.h"
#include "lib/lock/ob_spin_lock.h"
#include "lib/lock/ob_tc_rwlock.h"
#include "palf_callback.h"
#include "share/ob_delegate.h"
namespace oceanbase
{
namespace palf
@ -83,6 +85,48 @@ private:
ObDList<PalfRebuildCbNode> list_;
ObSpinLock lock_;
};
#define PALF_PLUGINS_DELEGATE_PTR(delegate_obj, lock_, func_name) \
template <typename ...Args> \
int func_name(Args &&...args) { \
int ret = OB_SUCCESS; \
common::RWLock::RLockGuard guard(lock_); \
if (OB_UNLIKELY(OB_ISNULL(delegate_obj))) { \
ret = OB_NOT_INIT; \
} else { \
ret = delegate_obj->func_name(std::forward<Args>(args)...); \
} \
return ret; \
}
class LogPlugins
{
public:
LogPlugins();
~LogPlugins();
void destroy();
template<typename T>
int add_plugin(T *plugin);
template<typename T>
int del_plugin(T *plugin);
PALF_PLUGINS_DELEGATE_PTR(loc_cb_, loc_lock_, get_leader);
PALF_PLUGINS_DELEGATE_PTR(loc_cb_, loc_lock_, nonblock_get_leader);
PALF_PLUGINS_DELEGATE_PTR(loc_cb_, loc_lock_, nonblock_renew_leader);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_role_change_event);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, add_log_write_stat);
PALF_PLUGINS_DELEGATE_PTR(palflite_monitor_, palflite_monitor_lock_, record_create_or_delete_event);
TO_STRING_KV(KP_(loc_cb), KP_(palf_monitor), KP_(palflite_monitor));
private:
common::RWLock loc_lock_;
PalfLocationCacheCb *loc_cb_;
common::RWLock palf_monitor_lock_;
PalfMonitorCb *palf_monitor_;
common::RWLock palflite_monitor_lock_;
PalfLiteMonitorCb *palflite_monitor_;
};
} // end namespace palf
} // end namespace oceanbase
#endif

View File

@ -45,6 +45,7 @@ int PalfEnv::create_palf_env(
rpc::frame::ObReqTransport *transport,
common::ObILogAllocator *log_alloc_mgr,
ILogBlockPool *log_block_pool,
PalfMonitorCb *monitor,
PalfEnv *&palf_env)
{
int ret = OB_SUCCESS;
@ -55,7 +56,7 @@ int PalfEnv::create_palf_env(
CLOG_LOG(WARN, "delete_tmp_file_or_directory_at failed", K(ret), K(base_dir));
} else if (OB_FAIL(palf_env->palf_env_impl_.init(options, base_dir, self, obrpc::ObRpcNetHandler::CLUSTER_ID,
MTL_ID(), transport,
log_alloc_mgr, log_block_pool))) {
log_alloc_mgr, log_block_pool, monitor))) {
PALF_LOG(WARN, "PalfEnvImpl init failed", K(ret), K(base_dir));
} else if (OB_FAIL(palf_env->start_())) {
PALF_LOG(WARN, "start palf env failed", K(ret), K(base_dir));

View File

@ -55,6 +55,7 @@ public:
rpc::frame::ObReqTransport *transport,
common::ObILogAllocator *alloc_mgr,
ILogBlockPool *log_block_pool,
PalfMonitorCb *monitor,
PalfEnv *&palf_env);
// static interface
// destroy the palf env, and set "palf_env" to NULL.

View File

@ -140,6 +140,7 @@ PalfEnvImpl::PalfEnvImpl() : palf_meta_lock_(common::ObLatchIds::PALF_ENV_LOCK),
log_io_worker_(),
block_gc_timer_task_(),
log_updater_(),
monitor_(NULL),
disk_options_wrapper_(),
check_disk_print_log_interval_(OB_INVALID_TIMESTAMP),
self_(),
@ -166,7 +167,8 @@ int PalfEnvImpl::init(
const int64_t tenant_id,
rpc::frame::ObReqTransport *transport,
common::ObILogAllocator *log_alloc_mgr,
ILogBlockPool *log_block_pool)
ILogBlockPool *log_block_pool,
PalfMonitorCb *monitor)
{
int ret = OB_SUCCESS;
int pret = 0;
@ -180,10 +182,10 @@ int PalfEnvImpl::init(
ret = OB_INIT_TWICE;
PALF_LOG(ERROR, "PalfEnvImpl is inited twiced", K(ret));
} else if (OB_ISNULL(base_dir) || !self.is_valid() || NULL == transport
|| OB_ISNULL(log_alloc_mgr) || OB_ISNULL(log_block_pool)) {
|| OB_ISNULL(log_alloc_mgr) || OB_ISNULL(log_block_pool) || OB_ISNULL(monitor)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "invalid arguments", K(ret), KP(transport), K(base_dir), K(self), KP(transport),
KP(log_alloc_mgr), KP(log_block_pool));
KP(log_alloc_mgr), KP(log_block_pool), KP(monitor));
} else if (OB_FAIL(fetch_log_engine_.init(this, log_alloc_mgr))) {
PALF_LOG(ERROR, "FetchLogEngine init failed", K(ret));
} else if (OB_FAIL(log_rpc_.init(self, cluster_id, tenant_id, transport))) {
@ -221,6 +223,7 @@ int PalfEnvImpl::init(
} else {
log_alloc_mgr_ = log_alloc_mgr;
log_block_pool_ = log_block_pool;
monitor_ = monitor;
self_ = self;
tenant_id_ = tenant_id;
is_inited_ = true;
@ -300,6 +303,7 @@ void PalfEnvImpl::destroy()
log_updater_.destroy();
log_rpc_.destroy();
log_alloc_mgr_ = NULL;
monitor_ = NULL;
self_.reset();
log_dir_[0] = '\0';
tmp_log_dir_[0] = '\0';
@ -369,6 +373,8 @@ int PalfEnvImpl::create_palf_handle_impl_(const int64_t palf_id,
// NB: always insert value into hash map finally.
} else if (OB_FAIL(palf_handle_impl_map_.insert_and_get(hash_map_key, palf_handle_impl))) {
PALF_LOG(WARN, "palf_handle_impl_map_ insert_and_get failed", K(ret), K(palf_id));
} else if (OB_FAIL(palf_handle_impl->set_monitor_cb(monitor_))) {
PALF_LOG(WARN, "set_monitor_cb failed", K(ret), K(palf_id), KP_(monitor));
} else {
palf_handle_impl->set_scan_disk_log_finished();
ipalf_handle_impl = palf_handle_impl;
@ -933,6 +939,7 @@ int PalfEnvImpl::reload_palf_handle_impl_(const int64_t palf_id)
} else if (OB_FAIL(palf_handle_impl_map_.insert_and_get(hash_map_key, tmp_palf_handle_impl))) {
PALF_LOG(WARN, "palf_handle_impl_map_ insert_and_get failed", K(ret), K(palf_id), K(tmp_palf_handle_impl));
} else {
(void) tmp_palf_handle_impl->set_monitor_cb(monitor_);
(void) tmp_palf_handle_impl->set_scan_disk_log_finished();
palf_handle_impl_map_.revert(tmp_palf_handle_impl);
int64_t cost_ts = ObTimeUtility::current_time() - start_ts;

View File

@ -182,7 +182,8 @@ public:
const int64_t tenant_id,
rpc::frame::ObReqTransport *transport,
common::ObILogAllocator *alloc_mgr,
ILogBlockPool *log_block_pool);
ILogBlockPool *log_block_pool,
PalfMonitorCb *monitor);
// start函数包含两层含义:
//
@ -315,6 +316,7 @@ private:
LogIOWorker log_io_worker_;
BlockGCTimerTask block_gc_timer_task_;
LogUpdater log_updater_;
PalfMonitorCb *monitor_;
PalfDiskOptionsWrapper disk_options_wrapper_;
int64_t check_disk_print_log_interval_;

View File

@ -51,7 +51,7 @@ PalfHandleImpl::PalfHandleImpl()
fs_cb_wrapper_(),
role_change_cb_wrpper_(),
rebuild_cb_wrapper_(),
lc_cb_(NULL),
plugins_(),
last_locate_scn_(),
last_send_mode_meta_time_us_(OB_INVALID_TIMESTAMP),
last_locate_block_(LOG_INVALID_BLOCK_ID),
@ -221,7 +221,7 @@ void PalfHandleImpl::destroy()
is_inited_ = false;
diskspace_enough_ = true;
cached_is_in_sync_ = false;
lc_cb_ = NULL;
plugins_.destroy();
self_.reset();
palf_id_ = INVALID_PALF_ID;
fetch_log_engine_ = NULL;
@ -2017,17 +2017,10 @@ int PalfHandleImpl::set_location_cache_cb(PalfLocationCacheCb *lc_cb)
} else if (OB_ISNULL(lc_cb)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "lc_cb is NULL, can't register", KR(ret), KPC(this));
} else if (OB_FAIL(plugins_.add_plugin(lc_cb))) {
PALF_LOG(WARN, "add_plugin failed", KR(ret), KPC(this), KP(lc_cb), K_(plugins));
} else {
WLockGuard guard(lock_);
if (OB_NOT_NULL(lc_cb_)) {
ret = OB_NOT_SUPPORTED;
PALF_LOG(WARN, "lc_cb_ is not NULL, can't register", KR(ret), KPC(this));
} else if (OB_FAIL(sw_.set_location_cache_cb(lc_cb))) {
PALF_LOG(WARN, "sw_.set_location_cache_cb failed", KR(ret), KPC(this));
} else {
lc_cb_ = lc_cb;
PALF_LOG(INFO, "set_location_cache_cb success", KPC(this), KP_(lc_cb));
}
PALF_LOG(INFO, "set_location_cache_cb success", KPC(this), K_(plugins), KP(lc_cb));
}
return ret;
}
@ -2067,13 +2060,40 @@ int PalfHandleImpl::reset_election_priority()
int PalfHandleImpl::reset_location_cache_cb()
{
int ret = OB_SUCCESS;
PalfLocationCacheCb *loc_cb = NULL;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_FAIL(sw_.reset_location_cache_cb())) {
PALF_LOG(WARN, "sw_.reset_location_cache_cb failed", KR(ret), KPC(this));
} else if (OB_FAIL(plugins_.del_plugin(loc_cb))) {
PALF_LOG(WARN, "del_plugin failed", KR(ret), KPC(this), K_(plugins));
}
return ret;
}
int PalfHandleImpl::set_monitor_cb(PalfMonitorCb *monitor_cb)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(WARN, "not initted", KR(ret), KPC(this));
} else if (OB_ISNULL(monitor_cb)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "lc_cb is NULL, can't register", KR(ret), KPC(this));
} else if (OB_FAIL(plugins_.add_plugin(monitor_cb))) {
PALF_LOG(WARN, "add_plugin failed", KR(ret), KPC(this), KP(monitor_cb), K_(plugins));
} else {
WLockGuard guard(lock_);
lc_cb_ = NULL;
PALF_LOG(INFO, "set_monitor_cb success", KPC(this), K_(plugins), KP(monitor_cb));
}
return ret;
}
int PalfHandleImpl::reset_monitor_cb()
{
int ret = OB_SUCCESS;
PalfMonitorCb *monitor_cb = NULL;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_FAIL(plugins_.del_plugin(monitor_cb))) {
PALF_LOG(WARN, "del_plugin failed", KR(ret), KPC(this), K_(plugins));
}
return ret;
}
@ -2398,7 +2418,7 @@ int PalfHandleImpl::do_init_mem_(
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "error unexpected", K(ret), K(palf_id));
} else if (OB_FAIL(sw_.init(palf_id, self, &state_mgr_, &config_mgr_, &mode_mgr_,
&log_engine_, &fs_cb_wrapper_, alloc_mgr, palf_base_info, is_normal_replica))) {
&log_engine_, &fs_cb_wrapper_, alloc_mgr, &plugins_, palf_base_info, is_normal_replica))) {
PALF_LOG(WARN, "sw_ init failed", K(ret), K(palf_id));
} else if (OB_FAIL(election_.init_and_start(palf_id,
election_timer,
@ -2412,10 +2432,10 @@ int PalfHandleImpl::do_init_mem_(
}))) {
PALF_LOG(WARN, "election_ init failed", K(ret), K(palf_id));
} else if (OB_FAIL(state_mgr_.init(palf_id, self, log_meta.get_log_prepare_meta(), log_meta.get_log_replica_property_meta(),
&election_, &sw_, &reconfirm_, &log_engine_, &config_mgr_, &mode_mgr_, &role_change_cb_wrpper_))) {
&election_, &sw_, &reconfirm_, &log_engine_, &config_mgr_, &mode_mgr_, &role_change_cb_wrpper_, &plugins_))) {
PALF_LOG(WARN, "state_mgr_ init failed", K(ret), K(palf_id));
} else if (OB_FAIL(config_mgr_.init(palf_id, self, log_meta.get_log_config_meta(), &log_engine_,
&sw_, &state_mgr_, &election_, &mode_mgr_, &reconfirm_))) {
&sw_, &state_mgr_, &election_, &mode_mgr_, &reconfirm_, &plugins_))) {
PALF_LOG(WARN, "config_mgr_ init failed", K(ret), K(palf_id));
} else if (is_normal_replica && OB_FAIL(reconfirm_.init(palf_id, self, &sw_, &state_mgr_, &config_mgr_, &mode_mgr_, &log_engine_))) {
PALF_LOG(WARN, "reconfirm_ init failed", K(ret), K(palf_id));
@ -2426,7 +2446,6 @@ int PalfHandleImpl::do_init_mem_(
fetch_log_engine_ = fetch_log_engine;
allocator_ = alloc_mgr;
self_ = self;
lc_cb_ = NULL;
has_set_deleted_ = false;
palf_env_impl_ = palf_env_impl;
is_inited_ = true;
@ -4274,9 +4293,7 @@ int PalfHandleImpl::get_leader_max_scn_(SCN &max_scn, LSN &end_lsn)
end_lsn.reset();
// use lc_cb_ in here without rlock is safe, because we don't reset lc_cb_
// until this PalfHandleImpl is destoryed.
if (OB_ISNULL(lc_cb_)) {
CLOG_LOG(TRACE, "lc_cb_ is NULL, skip", K(ret), K_(self), K_(palf_id));
} else if (OB_FAIL(lc_cb_->nonblock_get_leader(palf_id_, leader))) {
if (OB_FAIL(plugins_.nonblock_get_leader(palf_id_, leader))) {
CLOG_LOG(WARN, "get_leader failed", K(ret), K_(self), K_(palf_id));
need_renew_leader = true;
} else if (false == leader.is_valid()) {
@ -4289,7 +4306,7 @@ int PalfHandleImpl::get_leader_max_scn_(SCN &max_scn, LSN &end_lsn)
end_lsn = resp.end_lsn_;
}
if (need_renew_leader && palf_reach_time_interval(500 * 1000, last_renew_loc_time_us_)) {
(void) lc_cb_->nonblock_renew_leader(palf_id_);
(void) plugins_.nonblock_renew_leader(palf_id_);
}
return ret;
}

View File

@ -780,6 +780,8 @@ public:
int unregister_rebuild_cb(palf::PalfRebuildCbNode *rebuild_cb) override final;
int set_location_cache_cb(PalfLocationCacheCb *lc_cb) override final;
int reset_location_cache_cb() override final;
int set_monitor_cb(PalfMonitorCb *monitor_cb);
int reset_monitor_cb();
int set_election_priority(election::ElectionPriority *priority) override final;
int reset_election_priority() override final;
// ==================== Callback end ========================
@ -1107,7 +1109,7 @@ private:
palf::PalfFSCbWrapper fs_cb_wrapper_;
palf::PalfRoleChangeCbWrapper role_change_cb_wrpper_;
palf::PalfRebuildCbWrapper rebuild_cb_wrapper_;
PalfLocationCacheCb *lc_cb_;
LogPlugins plugins_;
// ======optimization for locate_by_scn_coarsely=========
mutable SpinLock last_locate_lock_;
share::SCN last_locate_scn_;

View File

@ -22,10 +22,11 @@ using namespace share;
int ObAllServerEventHistoryTableOperator::init(common::ObMySQLProxy &proxy, const common::ObAddr &self_addr)
{
int ret = OB_SUCCESS;
const bool is_rs_event = false;
const bool is_server_event = true;
set_addr(self_addr, is_rs_event, is_server_event);
if (OB_FAIL(ObEventHistoryTableOperator::init(proxy))) {
} else {
const bool is_rs_event = false;
set_addr(self_addr, is_rs_event);
set_event_table(share::OB_ALL_SERVER_EVENT_HISTORY_TNAME);
}
return ret;

View File

@ -41,5 +41,7 @@ private:
SERVER_EVENT_INSTANCE.add_event(args)
#define SERVER_EVENT_SYNC_ADD(args...) \
SERVER_EVENT_INSTANCE.sync_add_event(args)
#define SERVER_EVENT_ADD_WITH_RETRY(args...) \
SERVER_EVENT_INSTANCE.add_event_with_retry(args)
#endif /* _OB_SERVER_EVENT_HISTORY_TABLE_OPERATOR_H */

View File

@ -23,10 +23,11 @@ int ObRsEventHistoryTableOperator::init(common::ObMySQLProxy &proxy,
const common::ObAddr &self_addr)
{
int ret = OB_SUCCESS;
const bool is_rs_event = true;
const bool is_server_event = false;
set_addr(self_addr, is_rs_event, is_server_event);
if (OB_FAIL(ObEventHistoryTableOperator::init(proxy))) {
} else {
const bool is_rs_event = true;
set_addr(self_addr, is_rs_event);
set_event_table(share::OB_ALL_ROOTSERVICE_EVENT_HISTORY_TNAME);
}
return ret;

View File

@ -166,7 +166,9 @@ int ObEventHistoryTableOperator::ObEventTableUpdateTask::process()
ObEventHistoryTableOperator::ObEventHistoryTableOperator()
: inited_(false), stopped_(false), last_event_ts_(0),
lock_(ObLatchIds::RS_EVENT_TS_LOCK), proxy_(NULL), event_queue_(),
event_table_name_(NULL), self_addr_(), is_rootservice_event_history_(false)
event_table_name_(NULL), self_addr_(), is_rootservice_event_history_(false),
is_server_event_history_(false),
timer_()
{
}
@ -182,11 +184,15 @@ int ObEventHistoryTableOperator::init(common::ObMySQLProxy &proxy)
LOG_WARN("init twice", K(ret));
} else {
const int64_t thread_count = 1;
const int64_t queue_size_square_of_2 = 14;
if (OB_FAIL(event_queue_.init(thread_count, "EvtHisUpdTask", TASK_QUEUE_SIZE, TASK_MAP_SIZE,
TOTAL_LIMIT, HOLD_LIMIT, PAGE_SIZE))) {
LOG_WARN("task_queue_ init failed", K(thread_count), LITERAL_K(TASK_QUEUE_SIZE),
LITERAL_K(TASK_MAP_SIZE), LITERAL_K(TOTAL_LIMIT), LITERAL_K(HOLD_LIMIT),
LITERAL_K(PAGE_SIZE), K(ret));
} else if (is_server_event_history_ &&
OB_FAIL(timer_.init_and_start(thread_count, 5_s, "EventTimer", queue_size_square_of_2))) {
LOG_WARN("int global event report timer failed", KR(ret));
} else {
event_queue_.set_label(ObModIds::OB_RS_EVENT_QUEUE);
proxy_ = &proxy;
@ -201,6 +207,7 @@ void ObEventHistoryTableOperator::destroy()
{
stopped_ = true;
event_queue_.destroy();
timer_.stop_and_wait();
// allocator should destroy after event_queue_ destroy
inited_ = false;
}
@ -325,5 +332,50 @@ int ObEventHistoryTableOperator::process_task(const ObString &sql, const bool is
return ret;
}
int ObEventHistoryTableOperator::add_event_to_timer_(const common::ObSqlString &sql)
{
int ret = OB_SUCCESS;
int retry_times = 0;
ObAddr self_addr = self_addr_;
common::ObMySQLProxy *proxy = proxy_;
ObUniqueGuard<ObStringHolder> uniq_holder;
if (OB_FAIL(ob_make_unique(uniq_holder))) {
SHARE_LOG(WARN, "fail to make unique guard");
} else if (OB_FAIL(uniq_holder->assign(sql.string()))) {
SHARE_LOG(WARN, "fail to create unique ownership of string");
} else if (OB_FAIL(timer_.schedule_task_ignore_handle_repeat_and_immediately(15_s, [retry_times, self_addr, uniq_holder, proxy]() mutable -> bool {
int ret = OB_SUCCESS;
bool stop_flag = false;
char ip[64] = {0};
int64_t affected_rows = 0;
const char *sql = to_cstring(uniq_holder->get_ob_string());
if (OB_ISNULL(proxy)) {
SHARE_LOG(WARN, "proxy_ is NULL", KP(proxy));
} else if (!self_addr.ip_to_string(ip, sizeof(ip))) {
ret = OB_ERR_UNEXPECTED;
SHARE_LOG(WARN, "ip to string failed");
} else if (OB_FAIL(proxy->write(sql, affected_rows))) {
SHARE_LOG(WARN, "sync execute sql failed", K(sql), K(ret));
} else if (!is_single_row(affected_rows)) {
ret = OB_ERR_UNEXPECTED;
SHARE_LOG(WARN, "affected_rows expected to be one", K(affected_rows), K(ret));
} else {
ObTaskController::get().allow_next_syslog();
SHARE_LOG(INFO, "event table sync add event success", K(ret), K(sql));
}
if (++retry_times > MAX_RETRY_COUNT || OB_SUCC(ret)) {
// this may happened cause inner sql may not work for a long time,
// but i have tried my very best, so let it miss.
if (retry_times > MAX_RETRY_COUNT) {
SHARE_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "fail to schedule report event task cause retry too much times");
}
stop_flag = true;
}
return stop_flag;
}))) {
SHARE_LOG(ERROR, "fail to schedule report event task");
}
return ret;
}
}//end namespace rootserver
}//end namespace oceanbase

View File

@ -15,13 +15,16 @@
#include "lib/oblog/ob_log.h"
#include "lib/time/ob_time_utility.h"
#include "lib/guard/ob_unique_guard.h"
#include "lib/string/ob_sql_string.h"
#include "lib/string/ob_string_holder.h"
#include "lib/queue/ob_dedup_queue.h"
#include "lib/thread/ob_work_queue.h"
#include "lib/lock/ob_mutex.h"
#include "lib/mysqlclient/ob_mysql_proxy.h"
#include "share/inner_table/ob_inner_table_schema.h"
#include "share/ob_dml_sql_splicer.h"
#include "share/ob_occam_timer.h"
#include "share/ob_task_define.h"
#include <utility>
@ -127,6 +130,10 @@ public:
// if number of others is not 13, should be even, every odd of them are name, every even of them are value
template <typename ...Rest>
int sync_add_event(const char *module, const char *event, Rest &&...others);
// number of others should not less than 0, or more than 13
// if number of others is not 13, should be even, every odd of them are name, every even of them are value
template <typename ...Rest>
int add_event_with_retry(const char *module, const char *event, Rest &&...others);
virtual int async_delete() = 0;
protected:
@ -139,7 +146,15 @@ protected:
// recursive end if there is an extra_info
template <int Floor, typename Value>
int sync_add_event_helper_(share::ObDMLSqlSplicer &dml, Value &&extro_info);
void set_addr(const common::ObAddr self_addr, bool is_rs_ev) { self_addr_ = self_addr; is_rootservice_event_history_ = is_rs_ev; }
int add_event_to_timer_(const common::ObSqlString &sql);
void set_addr(const common::ObAddr self_addr,
bool is_rs_ev,
bool is_server_ev)
{
self_addr_ = self_addr;
is_rootservice_event_history_ = is_rs_ev;
is_server_event_history_ = is_server_ev;
}
const common::ObAddr &get_addr() const { return self_addr_; }
void set_event_table(const char* tname) { event_table_name_ = tname; }
const char *get_event_table() const { return event_table_name_; }
@ -156,6 +171,7 @@ protected:
static const int64_t RS_EVENT_HISTORY_DELETE_TIME = 7L * 24L * 3600L * 1000L * 1000L; // 7DAY
static const int64_t SERVER_EVENT_HISTORY_DELETE_TIME = 2L * 24L * 3600L * 1000L * 1000L; // 2DAY
static const int64_t UNIT_LOAD_HISTORY_DELETE_TIME = 7L * 24L * 3600L * 1000L * 1000L; // 2DAY
static const int64_t MAX_RETRY_COUNT = 12;
virtual int process_task(const common::ObString &sql, const bool is_delete);
private:
@ -168,6 +184,9 @@ private:
const char* event_table_name_;
common::ObAddr self_addr_;
bool is_rootservice_event_history_;
bool is_server_event_history_;
// timer for add_event_with_retry
common::ObOccamTimer timer_;
protected:
ObEventHistoryTableOperator();
private:
@ -458,6 +477,62 @@ int ObEventHistoryTableOperator::sync_add_event(const char *module, const char *
return ret;
}
template <typename ...Rest>
int ObEventHistoryTableOperator::add_event_with_retry(const char *module, const char *event, Rest &&...others)
{
static_assert(sizeof...(others) >= 0 && sizeof...(others) <= 13 &&
(sizeof...(others) == 13 || (sizeof...(others) % 2 == 0)),
"max support 6 pair of name-value args and 1 extra info, if number of others is not 13, should be even");
int ret = common::OB_SUCCESS;
int64_t event_ts = 0;
common::ObSqlString sql;
share::ObDMLSqlSplicer dml;
char ip_buf[common::MAX_IP_ADDR_LENGTH];
TIMEGUARD_INIT(EVENT_HISTORY, 100_ms, 5_s); \
if (!inited_) {
ret = common::OB_NOT_INIT;
SHARE_LOG(WARN, "not init", K(ret));
} else if (NULL == module || NULL == event) {
ret = common::OB_INVALID_ARGUMENT;
SHARE_LOG(WARN, "neither module or event can be NULL", KP(module), KP(event), K(ret));
} else if (stopped_) {
ret = OB_CANCELED;
SHARE_LOG(WARN, "observer is stopped, cancel add_event", K(sql), K(ret));
} else if (CLICK_FAIL(gen_event_ts(event_ts))) {
SHARE_LOG(WARN, "gen_event_ts failed", K(ret));
} else if (CLICK_FAIL(dml.add_gmt_create(event_ts)) ||
CLICK_FAIL(dml.add_column("module", module)) ||
CLICK_FAIL(dml.add_column("event", event))) {
SHARE_LOG(WARN, "add column failed", K(ret));
} else if (CLICK_FAIL(sync_add_event_helper_<0>(dml, std::forward<Rest>(others)...))) {// recursive call
} else if (common::OB_SUCCESS == ret && self_addr_.is_valid()) {
if (is_rootservice_event_history_) {
//Add rs_svr_ip, rs_svr_port to the __all_rootservice_event_history table
(void)self_addr_.ip_to_string(ip_buf, common::MAX_IP_ADDR_LENGTH);
if (CLICK_FAIL(dml.add_column("rs_svr_ip", ip_buf))
|| CLICK_FAIL(dml.add_column("rs_svr_port", self_addr_.get_port()))) {
SHARE_LOG(WARN, "add column failed", K(ret));
}
} else {
// __all_server_event_history has svr_ip and svr_port, but __all_rootservice_event_history hasn't.
(void)self_addr_.ip_to_string(ip_buf, common::MAX_IP_ADDR_LENGTH);
if (CLICK_FAIL(dml.add_column("svr_ip", ip_buf))
|| CLICK_FAIL(dml.add_column("svr_port", self_addr_.get_port()))) {
SHARE_LOG(WARN, "add column failed", K(ret));
}
}
}
if (OB_SUCC(ret)) {
if (CLICK_FAIL(dml.splice_insert_sql(event_table_name_, sql))) {
SHARE_LOG(WARN, "splice_insert_sql failed", K(ret));
} else if (CLICK_FAIL(add_event_to_timer_(sql))) {
SHARE_LOG(WARN, "add_event_to_timer_ failed", K(ret), K(sql));
} else {
SHARE_LOG(INFO, "add_event_with_retry success", K(ret), K(sql));
}
}
return ret;
}
// recursive begin
template <int Floor, typename Name, typename Value, typename ...Rest>
int ObEventHistoryTableOperator::sync_add_event_helper_(share::ObDMLSqlSplicer &dml, Name &&name, Value &&value, Rest &&...others)
@ -484,7 +559,7 @@ int ObEventHistoryTableOperator::sync_add_event_helper_(share::ObDMLSqlSplicer &
template <int Floor, typename Value>
int ObEventHistoryTableOperator::sync_add_event_helper_(share::ObDMLSqlSplicer &dml, Value &&extra_info)
{
static_assert(Floor == 7, "if there is an extra_info column, it must be 13th args in this row, no more, no less");
static_assert(Floor == 6, "if there is an extra_info column, it must be 13th args in this row, no more, no less");
int ret = OB_SUCCESS;
if (OB_FAIL(dml.add_column(names[Floor], extra_info))) {
SHARE_LOG(WARN, "add column failed", K(ret), K(Floor));

View File

@ -57,6 +57,7 @@ public:
mock_log_engine_ = OB_NEW(MockLogEngine, "TestLog");
mock_mode_mgr_ = OB_NEW(MockLogModeMgr, "TestLog");
mock_reconfirm_ = OB_NEW(MockLogReconfirm, "TestLog");
mock_plugins_ = OB_NEW(LogPlugins, "TestLog");
}
~TestLogConfigMgr()
{
@ -66,6 +67,7 @@ public:
OB_DELETE(MockLogEngine, "TestLog", mock_log_engine_);
OB_DELETE(MockLogModeMgr, "TestLog", mock_mode_mgr_);
OB_DELETE(MockLogReconfirm, "TestLog", mock_reconfirm_);
OB_DELETE(LogPlugins, "TestLog", mock_plugins_);
}
void init_test_log_config_env(const common::ObAddr &self,
const LogConfigInfo &config_info,
@ -88,7 +90,7 @@ public:
LogConfigMeta config_meta;
EXPECT_EQ(OB_SUCCESS, config_meta.generate(init_pid, config_info, config_info, 1, LSN(0), 1));
EXPECT_EQ(OB_SUCCESS, cm.init(1, self, config_meta, mock_log_engine_, mock_sw_, mock_state_mgr_, mock_election_,
mock_mode_mgr_, mock_reconfirm_));
mock_mode_mgr_, mock_reconfirm_, mock_plugins_));
LogMemberRegionMap region_map;
EXPECT_EQ(OB_SUCCESS, region_map.init("localmap", OB_MAX_MEMBER_NUMBER));
for (int i = 0; i < cm.alive_paxos_memberlist_.get_member_number(); ++i) {
@ -105,6 +107,7 @@ public:
palf::MockLogEngine *mock_log_engine_;
palf::MockLogModeMgr *mock_mode_mgr_;
palf::MockLogReconfirm *mock_reconfirm_;
palf::LogPlugins *mock_plugins_;
};
TEST_F(TestLogConfigMgr, test_remove_child_is_not_learner)

View File

@ -80,6 +80,7 @@ public:
MockLogEngine mock_log_engine_;
MockPalfFSCbWrapper palf_fs_cb_;
ObTenantMutilAllocator *alloc_mgr_;
LogPlugins *plugins_;
MockPublicLogSlidingWindow log_sw_;
char *data_buf_;
};
@ -102,6 +103,7 @@ void TestLogSlidingWindow::SetUp()
OB_ASSERT(FALSE);
}
alloc_mgr_ = new (buf) common::ObTenantMutilAllocator(tenant_id);
plugins_ = new LogPlugins();
data_buf_ = (char*)ob_malloc(64 * 1024 * 1024, attr);
// init MTL
ObTenantBase tbase(tenant_id);
@ -148,19 +150,19 @@ TEST_F(TestLogSlidingWindow, test_init)
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, NULL,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, NULL, &palf_fs_cb_, NULL, base_info, true));
&mock_mm_, &mock_mode_mgr_, NULL, &palf_fs_cb_, NULL, plugins_, base_info, true));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
NULL, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
NULL, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, NULL, NULL, &palf_fs_cb_, NULL, base_info, true));
&mock_mm_, NULL, NULL, &palf_fs_cb_, NULL, plugins_, base_info, true));
// init succ
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
// init twice
EXPECT_EQ(OB_INIT_TWICE, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
}
TEST_F(TestLogSlidingWindow, test_private_func_batch_01)
@ -175,7 +177,7 @@ TEST_F(TestLogSlidingWindow, test_private_func_batch_01)
gen_default_palf_base_info_(base_info);
// init succ
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
log_id = 10 + PALF_SLIDING_WINDOW_SIZE;
EXPECT_EQ(false, log_sw_.can_receive_larger_log_(log_id));
EXPECT_EQ(false, log_sw_.leader_can_submit_larger_log_(log_id));
@ -194,7 +196,7 @@ TEST_F(TestLogSlidingWindow, test_to_follower_pending)
gen_default_palf_base_info_(base_info);
// init succ
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
char *buf = data_buf_;
int64_t buf_len = 1 * 1024 * 1024;
share::SCN ref_scn;
@ -220,8 +222,10 @@ TEST_F(TestLogSlidingWindow, test_fetch_log)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
// init succ
MockLocCb cb;
EXPECT_EQ(OB_SUCCESS, plugins_->add_plugin(static_cast<PalfLocationCacheCb*>(&cb)));
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
prev_lsn.val_ = 1;
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.try_fetch_log(fetch_log_type, prev_lsn, fetch_start_lsn, fetch_start_log_id));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.try_fetch_log_for_reconfirm(dest, fetch_end_lsn, is_fetched));
@ -244,7 +248,7 @@ TEST_F(TestLogSlidingWindow, test_report_log_task_trace)
gen_default_palf_base_info_(base_info);
// init succ
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
EXPECT_EQ(OB_SUCCESS, log_sw_.report_log_task_trace(1));
char *buf = data_buf_;
int64_t buf_len = 2 * 1024 * 1024;
@ -260,26 +264,10 @@ TEST_F(TestLogSlidingWindow, test_report_log_task_trace)
TEST_F(TestLogSlidingWindow, test_set_location_cache_cb)
{
MockLocCb cb;
EXPECT_EQ(OB_NOT_INIT, log_sw_.set_location_cache_cb(NULL));
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
// init succ
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.set_location_cache_cb(NULL));
EXPECT_EQ(OB_SUCCESS, log_sw_.set_location_cache_cb(&cb));
EXPECT_EQ(OB_NOT_SUPPORTED, log_sw_.set_location_cache_cb(&cb));
}
TEST_F(TestLogSlidingWindow, test_reset_location_cache_cb)
{
EXPECT_EQ(OB_NOT_INIT, log_sw_.reset_location_cache_cb());
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
// init succ
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
EXPECT_EQ(OB_SUCCESS, log_sw_.reset_location_cache_cb());
MockLocCb *null_cb = NULL;
EXPECT_EQ(OB_INVALID_ARGUMENT, plugins_->add_plugin(static_cast<PalfLocationCacheCb*>(null_cb)));
EXPECT_EQ(OB_SUCCESS, plugins_->add_plugin(static_cast<PalfLocationCacheCb*>(&cb)));
EXPECT_EQ(OB_OP_NOT_ALLOW, plugins_->add_plugin(static_cast<PalfLocationCacheCb*>(&cb)));
}
TEST_F(TestLogSlidingWindow, test_submit_log)
@ -295,7 +283,7 @@ TEST_F(TestLogSlidingWindow, test_submit_log)
share::SCN scn;
EXPECT_EQ(OB_NOT_INIT, log_sw_.submit_log(buf, buf_len, ref_scn, lsn, scn));
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.submit_log(NULL, buf_len, ref_scn, lsn, scn));
buf_len = 0;
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.submit_log(buf, buf_len, ref_scn, lsn, scn));
@ -327,7 +315,7 @@ TEST_F(TestLogSlidingWindow, test_submit_group_log)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
mock_state_mgr_.mock_proposal_id_ = 100;
LSN lsn(10);
EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.submit_group_log(lsn, NULL, 1024));
@ -386,7 +374,7 @@ TEST_F(TestLogSlidingWindow, test_receive_log)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
char *buf = data_buf_;
int64_t buf_len = 2 * 1024 * 1024;
@ -549,7 +537,7 @@ TEST_F(TestLogSlidingWindow, test_after_flush_log)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
int64_t curr_proposal_id = 10;
// set default config meta
@ -612,7 +600,7 @@ TEST_F(TestLogSlidingWindow, test_truncate_log)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
int64_t curr_proposal_id = 10;
mock_state_mgr_.mock_proposal_id_ = curr_proposal_id;
@ -720,7 +708,7 @@ TEST_F(TestLogSlidingWindow, test_ack_log)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
int64_t curr_proposal_id = 10;
mock_state_mgr_.mock_proposal_id_ = curr_proposal_id;
log_sw_.self_ = self_;
@ -778,7 +766,7 @@ TEST_F(TestLogSlidingWindow, test_truncate_for_rebuild)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
int64_t curr_proposal_id = 10;
mock_state_mgr_.mock_proposal_id_ = curr_proposal_id;
@ -889,7 +877,7 @@ TEST_F(TestLogSlidingWindow, test_append_disk_log)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
int64_t curr_proposal_id = 10;
mock_state_mgr_.mock_proposal_id_ = curr_proposal_id;
// generate new group entry
@ -1011,7 +999,7 @@ TEST_F(TestLogSlidingWindow, test_group_entry_truncate)
PalfBaseInfo base_info;
gen_default_palf_base_info_(base_info);
EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_,
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true));
&mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true));
int64_t curr_proposal_id = 10;
mock_state_mgr_.mock_proposal_id_ = curr_proposal_id;
// generate new group entry

View File

@ -53,6 +53,7 @@ public:
MockLogConfigMgr mock_config_mgr_;
MockLogModeMgr mock_mode_mgr_;
MockPalfRoleChangeCbWrapper mock_role_change_cb_;
LogPlugins plugins_;
LogStateMgr state_mgr_;
};
@ -74,19 +75,19 @@ void TestLogStateMgr::TearDown()
TEST_F(TestLogStateMgr, test_init)
{
EXPECT_EQ(OB_INVALID_ARGUMENT, state_mgr_.init(palf_id_, self_, log_prepare_meta_, log_replica_property_meta_, NULL, &mock_sw_,
&mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_));
&mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_, &plugins_));
EXPECT_EQ(OB_INVALID_ARGUMENT, state_mgr_.init(palf_id_, self_, log_prepare_meta_, log_replica_property_meta_, &mock_election_, NULL,
&mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_));
&mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_, &plugins_));
EXPECT_EQ(OB_SUCCESS, state_mgr_.init(palf_id_, self_, log_prepare_meta_, log_replica_property_meta_, &mock_election_, &mock_sw_,
&mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_));
&mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_, &plugins_));
EXPECT_EQ(OB_INIT_TWICE, state_mgr_.init(palf_id_, self_, log_prepare_meta_, log_replica_property_meta_, &mock_election_, &mock_sw_,
&mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_));
&mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_, &plugins_));
}
TEST_F(TestLogStateMgr, replay_to_leader_active)
{
EXPECT_EQ(OB_SUCCESS, state_mgr_.init(palf_id_, self_, log_prepare_meta_, log_replica_property_meta_, &mock_election_, &mock_sw_,
&mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_));
&mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_, &plugins_));
// set default config meta
ObMemberList default_mlist;
default_mlist.add_server(self_);