From 1d918d8dbf2f05a1c6544ea1eddcacc4412a4ccc Mon Sep 17 00:00:00 2001 From: BinChenn Date: Sat, 22 Apr 2023 14:34:34 +0000 Subject: [PATCH] reporting events of PALF to history table asynchronously --- src/logservice/CMakeLists.txt | 1 + src/logservice/ob_log_monitor.cpp | 78 ++++++++++++ src/logservice/ob_log_monitor.h | 71 +++++++++++ src/logservice/ob_log_service.cpp | 6 +- src/logservice/ob_log_service.h | 2 + src/logservice/palf/log_config_mgr.cpp | 13 +- src/logservice/palf/log_config_mgr.h | 4 +- src/logservice/palf/log_define.h | 6 + src/logservice/palf/log_sliding_window.cpp | 51 ++------ src/logservice/palf/log_sliding_window.h | 8 +- src/logservice/palf/log_state_mgr.cpp | 39 +++++- src/logservice/palf/log_state_mgr.h | 4 +- src/logservice/palf/palf_callback.h | 29 +++++ src/logservice/palf/palf_callback_wrapper.cpp | 120 ++++++++++++++++++ src/logservice/palf/palf_callback_wrapper.h | 44 +++++++ src/logservice/palf/palf_env.cpp | 3 +- src/logservice/palf/palf_env.h | 1 + src/logservice/palf/palf_env_impl.cpp | 13 +- src/logservice/palf/palf_env_impl.h | 4 +- src/logservice/palf/palf_handle_impl.cpp | 65 ++++++---- src/logservice/palf/palf_handle_impl.h | 4 +- ...ob_server_event_history_table_operator.cpp | 5 +- .../ob_server_event_history_table_operator.h | 2 + .../ob_rs_event_history_table_operator.cpp | 5 +- src/share/ob_event_history_table_operator.cpp | 54 +++++++- src/share/ob_event_history_table_operator.h | 79 +++++++++++- unittest/logservice/test_log_config_mgr.cpp | 5 +- .../logservice/test_log_sliding_window.cpp | 66 ++++------ unittest/logservice/test_log_state_mgr.cpp | 11 +- 29 files changed, 650 insertions(+), 143 deletions(-) create mode 100644 src/logservice/ob_log_monitor.cpp create mode 100644 src/logservice/ob_log_monitor.h diff --git a/src/logservice/CMakeLists.txt b/src/logservice/CMakeLists.txt index 1cacff917..37bb312f9 100644 --- a/src/logservice/CMakeLists.txt +++ b/src/logservice/CMakeLists.txt @@ -49,6 +49,7 @@ ob_set_subtarget(ob_logservice common ob_server_log_block_mgr.cpp ob_log_flashback_service.cpp ob_net_keepalive_adapter.cpp + ob_log_monitor.cpp ) ob_set_subtarget(ob_logservice common_mixed diff --git a/src/logservice/ob_log_monitor.cpp b/src/logservice/ob_log_monitor.cpp new file mode 100644 index 000000000..237afa3f9 --- /dev/null +++ b/src/logservice/ob_log_monitor.cpp @@ -0,0 +1,78 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#include "ob_log_monitor.h" +#include "observer/ob_server_event_history_table_operator.h" // SERVER_EVENT_ADD_WITH_RETRY + +namespace oceanbase +{ +namespace logservice +{ +#define LOG_MONITOR_EVENT_FMT_PREFIX "LOG", type_to_string_(event), "TENANT_ID", mtl_id, "LS_ID", palf_id + +// =========== PALF Event Reporting =========== +int ObLogMonitor::record_role_change_event(const int64_t palf_id, + const common::ObRole &prev_role, + const palf::ObReplicaState &prev_state, + const common::ObRole &curr_role, + const palf::ObReplicaState &curr_state, + const char *extra_info) +{ + int ret = OB_SUCCESS; + int pret = OB_SUCCESS; + const int64_t mtl_id = MTL_ID(); + const EventType event = EventType::ROLE_TRANSITION; + const int64_t MAX_BUF_LEN = 50; + char prev_str[MAX_BUF_LEN] = {'\0'}; + char curr_str[MAX_BUF_LEN] = {'\0'}; + TIMEGUARD_INIT(LOG_MONITOR, 100_ms, 5_s); \ + if (0 >= (pret = snprintf(prev_str, MAX_BUF_LEN, "%s %s", role_to_string(prev_role), + replica_state_to_string(prev_state)))) { + ret = OB_ERR_UNEXPECTED; + CLOG_LOG(ERROR, "snprintf failed", KR(ret), K(prev_role), K(prev_state)); + } else if (0 >= (pret = snprintf(curr_str, MAX_BUF_LEN, "%s %s", role_to_string(curr_role), + replica_state_to_string(curr_state)))) { + ret = OB_ERR_UNEXPECTED; + CLOG_LOG(ERROR, "snprintf failed", KR(ret), K(curr_role), K(curr_state)); + } else { + CLICK(); + if (OB_NOT_NULL(extra_info)) { + SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX, + "PREVIOUS", prev_str, + "CURRENT", curr_str, + "", NULL, + "", NULL, + extra_info); + } else { + SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX, + "PREVIOUS", prev_str, + "CURRENT", curr_str); + } + CLICK(); + } + return ret; +} +// =========== PALF Event Reporting =========== + +// =========== PALF Performance Statistic =========== +int ObLogMonitor::add_log_write_stat(const int64_t palf_id, const int64_t log_write_size) +{ + int ret = OB_SUCCESS; + // TODO + return ret; +} +// =========== PALF Performance Statistic =========== + + +#undef LOG_MONITOR_EVENT_FMT_PREFIX +} // end namespace logservice +} // end namespace oceanbase \ No newline at end of file diff --git a/src/logservice/ob_log_monitor.h b/src/logservice/ob_log_monitor.h new file mode 100644 index 000000000..839dc9d90 --- /dev/null +++ b/src/logservice/ob_log_monitor.h @@ -0,0 +1,71 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_LOGSERVICE_OB_LOG_MONITOR_H_ +#define OCEANBASE_LOGSERVICE_OB_LOG_MONITOR_H_ + +#include "palf/palf_callback.h" + +namespace oceanbase +{ +namespace logservice +{ + +class ObLogMonitor : public palf::PalfMonitorCb +{ +public: + ObLogMonitor() { } + virtual ~ObLogMonitor() { } +public: + // =========== PALF Event Reporting =========== + int record_role_change_event(const int64_t palf_id, + const common::ObRole &prev_role, + const palf::ObReplicaState &prev_state, + const common::ObRole &curr_role, + const palf::ObReplicaState &curr_state, + const char *extra_info = NULL) override final; + // =========== PALF Event Reporting =========== +public: + // =========== PALF Performance Statistic =========== + int add_log_write_stat(const int64_t palf_id, const int64_t log_write_size) override final; + // =========== PALF Performance Statistic =========== +private: + enum EventType + { + UNKNOWN = 0, + DEGRADE, + UPGRADE, + ROLE_TRANSITION, + }; + + const char *type_to_string_(const EventType &event) const + { + #define CHECK_LOG_EVENT_TYPE_STR(x) case(EventType::x): return #x + switch (event) + { + CHECK_LOG_EVENT_TYPE_STR(DEGRADE); + CHECK_LOG_EVENT_TYPE_STR(UPGRADE); + case (EventType::ROLE_TRANSITION): + return "ROLE TRANSITION"; + default: + return "UNKNOWN"; + } + #undef CHECK_LOG_EVENT_TYPE_STR + } +private: + DISALLOW_COPY_AND_ASSIGN(ObLogMonitor); +}; + +} // logservice +} // oceanbase + +#endif diff --git a/src/logservice/ob_log_service.cpp b/src/logservice/ob_log_service.cpp index da6f8a1f7..525fce648 100644 --- a/src/logservice/ob_log_service.cpp +++ b/src/logservice/ob_log_service.cpp @@ -59,7 +59,9 @@ ObLogService::ObLogService() : ls_adapter_(), rpc_proxy_(), reporter_(), - restore_service_() + restore_service_(), + flashback_service_(), + monitor_() {} ObLogService::~ObLogService() @@ -225,7 +227,7 @@ int ObLogService::init(const PalfOptions &options, KP(alloc_mgr), KP(transport), KP(ls_service), KP(location_service), KP(reporter), KP(log_block_pool), KP(sql_proxy), KP(net_keepalive_adapter)); } else if (OB_FAIL(PalfEnv::create_palf_env(options, base_dir, self, transport, - alloc_mgr, log_block_pool, palf_env_))) { + alloc_mgr, log_block_pool, &monitor_, palf_env_))) { CLOG_LOG(WARN, "failed to create_palf_env", K(base_dir), K(ret)); } else if (OB_ISNULL(palf_env_)) { ret = OB_ERR_UNEXPECTED; diff --git a/src/logservice/ob_log_service.h b/src/logservice/ob_log_service.h index f7f19e5c3..2ed9aaf78 100644 --- a/src/logservice/ob_log_service.h +++ b/src/logservice/ob_log_service.h @@ -31,6 +31,7 @@ #include "ob_location_adapter.h" #include "ob_log_flashback_service.h" // ObLogFlashbackService #include "ob_log_handler.h" +#include "ob_log_monitor.h" namespace oceanbase { @@ -228,6 +229,7 @@ private: cdc::ObCdcService cdc_service_; ObLogRestoreService restore_service_; ObLogFlashbackService flashback_service_; + ObLogMonitor monitor_; ObSpinLock update_palf_opts_lock_; private: DISALLOW_COPY_AND_ASSIGN(ObLogService); diff --git a/src/logservice/palf/log_config_mgr.cpp b/src/logservice/palf/log_config_mgr.cpp index e139f8f9c..671191fba 100644 --- a/src/logservice/palf/log_config_mgr.cpp +++ b/src/logservice/palf/log_config_mgr.cpp @@ -79,6 +79,7 @@ LogConfigMgr::LogConfigMgr() election_(NULL), mode_mgr_(NULL), reconfirm_(NULL), + plugins_(NULL), is_inited_(false) {} @@ -96,7 +97,8 @@ int LogConfigMgr::init(const int64_t palf_id, LogStateMgr *state_mgr, election::Election* election, LogModeMgr *mode_mgr, - LogReconfirm *reconfirm) + LogReconfirm *reconfirm, + LogPlugins *plugins) { int ret = OB_SUCCESS; if (is_inited_) { @@ -109,10 +111,13 @@ int LogConfigMgr::init(const int64_t palf_id, OB_ISNULL(sw) || OB_ISNULL(state_mgr) || OB_ISNULL(election) || - OB_ISNULL(mode_mgr)) { + OB_ISNULL(mode_mgr) || + OB_ISNULL(reconfirm) || + OB_ISNULL(plugins)) { ret = OB_INVALID_ARGUMENT; PALF_LOG(WARN, "invalid argument", KR(ret), K(palf_id), K(self), - K(log_ms_meta), K(log_engine), K(sw), K(state_mgr), K(election), K(mode_mgr)); + K(log_ms_meta), KP(log_engine), KP(sw), KP(state_mgr), KP(election), KP(mode_mgr), + KP(reconfirm), KP(plugins)); } else if (OB_FAIL(paxos_member_region_map_.init("LogRegionMap", OB_MAX_MEMBER_NUMBER))) { PALF_LOG(WARN, "paxos_member_region_map_ init failed", K(palf_id)); } else { @@ -126,6 +131,7 @@ int LogConfigMgr::init(const int64_t palf_id, election_ = election; mode_mgr_ = mode_mgr; reconfirm_ = reconfirm; + plugins_ = plugins; if (true == log_ms_meta.curr_.is_valid()) { if (OB_FAIL(append_config_info_(log_ms_meta.curr_))) { PALF_LOG(WARN, "append_config_info_ failed", K(ret), K(palf_id), K(log_ms_meta)); @@ -156,6 +162,7 @@ void LogConfigMgr::destroy() sw_ = NULL; log_engine_ = NULL; reconfirm_ = NULL; + plugins_ = NULL; register_time_us_ = OB_INVALID_TIMESTAMP; parent_.reset(); parent_keepalive_time_us_ = OB_INVALID_TIMESTAMP; diff --git a/src/logservice/palf/log_config_mgr.h b/src/logservice/palf/log_config_mgr.h index 1067333fe..152c25298 100644 --- a/src/logservice/palf/log_config_mgr.h +++ b/src/logservice/palf/log_config_mgr.h @@ -215,7 +215,8 @@ public: LogStateMgr *state_mgr, election::Election *election, LogModeMgr *mode_mgr, - LogReconfirm *reconfirm); + LogReconfirm *reconfirm, + LogPlugins *plugins); virtual void destroy(); // require caller holds WLock in PalfHandleImpl @@ -557,6 +558,7 @@ private: election::Election* election_; LogModeMgr *mode_mgr_; LogReconfirm *reconfirm_; + LogPlugins *plugins_; bool is_inited_; DISALLOW_COPY_AND_ASSIGN(LogConfigMgr); }; diff --git a/src/logservice/palf/log_define.h b/src/logservice/palf/log_define.h index 52272ec87..099a2c2a5 100644 --- a/src/logservice/palf/log_define.h +++ b/src/logservice/palf/log_define.h @@ -39,6 +39,12 @@ namespace palf #define PALF_EVENT(info_string, palf_id, args...) FLOG_INFO("[PALF_EVENT] "info_string, "palf_id", palf_id, args) +#define PALF_EVENT_REPORT_INFO_KV(args...) \ +const int64_t MAX_INFO_LENGTH = 512; \ +char EXTRA_INFOS[MAX_INFO_LENGTH]; \ +int64_t pos = 0; \ +::oceanbase::common::databuff_print_kv(EXTRA_INFOS, MAX_INFO_LENGTH, pos, ##args); \ + typedef int FileDesc; typedef uint64_t block_id_t ; typedef uint64_t offset_t; diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index abb6d6cdb..faeee6168 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -76,6 +76,7 @@ LogSlidingWindow::LogSlidingWindow() mm_(NULL), mode_mgr_(NULL), log_engine_(NULL), + plugins_(NULL), lsn_allocator_(), group_buffer_(), last_submit_info_lock_(common::ObLatchIds::PALF_SW_SUBMIT_INFO_LOCK), @@ -113,8 +114,6 @@ LogSlidingWindow::LogSlidingWindow() submit_log_handling_lease_(), last_fetch_log_renew_leader_ts_us_(OB_INVALID_TIMESTAMP), end_lsn_stat_time_us_(OB_INVALID_TIMESTAMP), - lc_cb_lock_(common::ObLatchIds::PALF_SW_LOC_CB_LOCK), - lc_cb_(NULL), reconfirm_fetch_dest_(), is_truncating_(false), is_rebuilding_(false), @@ -142,10 +141,10 @@ void LogSlidingWindow::destroy() sw_.destroy(); group_buffer_.destroy(); match_lsn_map_.destroy(); - lc_cb_ = NULL; reconfirm_fetch_dest_.reset(); state_mgr_ = NULL; log_engine_ = NULL; + plugins_ = NULL; mm_ = NULL; mode_mgr_ = NULL; } @@ -208,6 +207,7 @@ int LogSlidingWindow::init(const int64_t palf_id, LogEngine *log_engine, palf::PalfFSCbWrapper *palf_fs_cb, common::ObILogAllocator *alloc_mgr, + LogPlugins *plugins, const PalfBaseInfo &palf_base_info, const bool is_normal_replica) { @@ -222,10 +222,11 @@ int LogSlidingWindow::init(const int64_t palf_id, || NULL == mm || NULL == mode_mgr || NULL == log_engine - || NULL == palf_fs_cb) { + || NULL == palf_fs_cb + || NULL == plugins) { ret = OB_INVALID_ARGUMENT; PALF_LOG(WARN, "invalid argumetns", K(ret), K(palf_id), K(self), K(palf_base_info), - KP(state_mgr), KP(mm), KP(mode_mgr), KP(log_engine), KP(palf_fs_cb)); + KP(state_mgr), KP(mm), KP(mode_mgr), KP(log_engine), KP(palf_fs_cb), KP(plugins)); } else if (is_normal_replica && OB_FAIL(do_init_mem_(palf_id, palf_base_info, alloc_mgr))) { PALF_LOG(WARN, "do_init_mem_ failed", K(ret), K(palf_id)); } else { @@ -236,6 +237,7 @@ int LogSlidingWindow::init(const int64_t palf_id, mode_mgr_ = mode_mgr; log_engine_ = log_engine; palf_fs_cb_ = palf_fs_cb; + plugins_ = plugins; last_submit_lsn_ = prev_log_info.lsn_; last_submit_end_lsn_ = palf_base_info.curr_lsn_; @@ -1951,12 +1953,10 @@ int LogSlidingWindow::get_fetch_log_dst_(common::ObAddr &fetch_dst) const fetch_dst = parent; } else if (state_mgr_leader.is_valid()) { fetch_dst = state_mgr_leader; - } else if (OB_ISNULL(lc_cb_)) { - PALF_LOG(TRACE, "lc_cb_ is NULL", K(ret), K_(palf_id), K_(self)); } else if (palf_reach_time_interval(PALF_FETCH_LOG_RENEW_LEADER_INTERVAL_US, last_fetch_log_renew_leader_ts_us_) && - OB_FAIL(lc_cb_->nonblock_renew_leader(palf_id_))) { + OB_FAIL(plugins_->nonblock_renew_leader(palf_id_))) { PALF_LOG(WARN, "nonblock_renew_leader failed", KR(ret), K_(palf_id), K_(self)); - } else if (OB_FAIL(lc_cb_->nonblock_get_leader(palf_id_, fetch_dst))) { + } else if (OB_FAIL(plugins_->nonblock_get_leader(palf_id_, fetch_dst))) { if (palf_reach_time_interval(5 * 1000 * 1000, lc_cb_get_warn_time_)) { PALF_LOG(WARN, "nonblock_get_leader failed", KR(ret), K_(palf_id), K_(self)); } @@ -4122,39 +4122,6 @@ void LogSlidingWindow::LogTaskGuard::revert_log_task() { log_id_ = -1; } -int LogSlidingWindow::set_location_cache_cb(PalfLocationCacheCb *lc_cb) -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - } else if (OB_ISNULL(lc_cb)) { - ret = OB_INVALID_ARGUMENT; - PALF_LOG(WARN, "lc_cb is NULL, can't register", KR(ret), K_(palf_id), K_(self)); - } else { - ObSpinLockGuard guard(lc_cb_lock_); - if (OB_NOT_NULL(lc_cb_)) { - ret = OB_NOT_SUPPORTED; - PALF_LOG(WARN, "lc_cb_ is not NULL, can't register", KR(ret), K_(palf_id), K_(self)); - } else { - lc_cb_ = lc_cb; - PALF_LOG(INFO, "set_location_cache_cb success", K_(palf_id), K_(self), KP_(lc_cb)); - } - } - return ret; -} - -int LogSlidingWindow::reset_location_cache_cb() -{ - int ret = OB_SUCCESS; - if (IS_NOT_INIT) { - ret = OB_NOT_INIT; - } else { - ObSpinLockGuard guard(lc_cb_lock_); - lc_cb_ = NULL; - } - return ret; -} - int LogSlidingWindow::get_min_scn_from_buf_(const LogGroupEntryHeader &header, const char *buf, const int64_t buf_len, diff --git a/src/logservice/palf/log_sliding_window.h b/src/logservice/palf/log_sliding_window.h index bc89bb25a..c5663c083 100644 --- a/src/logservice/palf/log_sliding_window.h +++ b/src/logservice/palf/log_sliding_window.h @@ -159,6 +159,7 @@ public: LogEngine *log_engine, palf::PalfFSCbWrapper *palf_fs_cb, common::ObILogAllocator *alloc_mgr, + LogPlugins *plugins, const PalfBaseInfo &palf_base_info, const bool is_normal_replica); virtual int sliding_cb(const int64_t sn, const FixedSlidingWindowSlot *data); @@ -248,9 +249,6 @@ public: virtual int get_ack_info_array(LogMemberAckInfoList &ack_info_array) const; virtual int pre_check_before_degrade_upgrade(const LogMemberAckInfoList &servers, bool is_degrade); - // location cache will be removed TODO by yunlong - virtual int set_location_cache_cb(PalfLocationCacheCb *lc_cb); - virtual int reset_location_cache_cb(); virtual int advance_reuse_lsn(const LSN &flush_log_end_lsn); virtual int try_send_committed_info(const common::ObAddr &server, const LSN &log_lsn, @@ -436,6 +434,7 @@ private: LogConfigMgr *mm_; LogModeMgr *mode_mgr_; LogEngine *log_engine_; + LogPlugins *plugins_; palf::PalfFSCbWrapper *palf_fs_cb_; LSNAllocator lsn_allocator_; LogGroupBuffer group_buffer_; @@ -509,9 +508,6 @@ private: // last_renew_leader_ts in fetch_log mutable int64_t last_fetch_log_renew_leader_ts_us_; int64_t end_lsn_stat_time_us_; - // location ptr, will be removed. TODO by yunlong - mutable common::ObSpinLock lc_cb_lock_; - PalfLocationCacheCb *lc_cb_; // fetch log dest server for reconfirm common::ObAddr reconfirm_fetch_dest_; // whether sw is executing trucnation diff --git a/src/logservice/palf/log_state_mgr.cpp b/src/logservice/palf/log_state_mgr.cpp index ee98d8281..ddba2511d 100644 --- a/src/logservice/palf/log_state_mgr.cpp +++ b/src/logservice/palf/log_state_mgr.cpp @@ -41,6 +41,7 @@ LogStateMgr::LogStateMgr() mode_mgr_(NULL), palf_role_change_cb_(NULL), election_(NULL), + plugins_(NULL), role_state_val_(0), leader_(), leader_epoch_(OB_INVALID_TIMESTAMP), @@ -73,7 +74,8 @@ int LogStateMgr::init(const int64_t palf_id, LogEngine *log_engine, LogConfigMgr *mm, LogModeMgr *mode_mgr, - palf::PalfRoleChangeCbWrapper *palf_role_change_cb) + palf::PalfRoleChangeCbWrapper *palf_role_change_cb, + LogPlugins *plugins) { int ret = OB_SUCCESS; if (is_inited_) { @@ -81,14 +83,16 @@ int LogStateMgr::init(const int64_t palf_id, PALF_LOG(ERROR, "init twice", K(ret), K_(palf_id)); } else if (false == is_valid_palf_id(palf_id) || NULL == sw || NULL == reconfirm || NULL == log_engine || NULL == election || NULL == mm || NULL == mode_mgr || NULL == palf_role_change_cb || - !self.is_valid() || !replica_property_meta.is_valid()) { + NULL == plugins || !self.is_valid() || !replica_property_meta.is_valid()) { ret = OB_INVALID_ARGUMENT; PALF_LOG(WARN, "invalid arguments", K(ret), K(palf_id), KP(sw), KP(reconfirm), K(log_prepare_meta), - KP(election), KP(log_engine), KP(mm), KP(mode_mgr), KP(palf_role_change_cb), K(replica_property_meta), K(self)); + KP(election), KP(log_engine), KP(mm), KP(mode_mgr), KP(palf_role_change_cb), KP(plugins), + K(replica_property_meta), K(self)); } else { prepare_meta_= log_prepare_meta; palf_id_ = palf_id; election_ = election; + plugins_ = plugins; sw_ = sw; reconfirm_ = reconfirm; log_engine_ = log_engine; @@ -497,6 +501,8 @@ void LogStateMgr::destroy() log_engine_ = NULL; mm_ = NULL; mode_mgr_ = NULL; + election_ = NULL; + plugins_ = NULL; scan_disk_log_finished_ = false; update_role_and_state_(FOLLOWER, INIT); leader_.reset(); @@ -539,6 +545,8 @@ int LogStateMgr::init_to_follower_active_() int ret = OB_SUCCESS; reset_status_(); update_role_and_state_(FOLLOWER, ACTIVE); + plugins_->record_role_change_event(palf_id_, FOLLOWER, ObReplicaState::INIT, + FOLLOWER, ObReplicaState::ACTIVE); PALF_EVENT("init_to_follower_active", palf_id_, K(ret), K_(self)); return ret; } @@ -562,7 +570,11 @@ int LogStateMgr::pending_to_follower_active_() if (OB_FAIL(to_follower_active_())) { PALF_LOG(ERROR, "to_follower_active_ failed", K(ret), K_(palf_id)); } - PALF_EVENT("follower_pending_to_follower_active", palf_id_, K(ret), K_(self), K_(leader)); + PALF_EVENT_REPORT_INFO_KV(K_(leader), K_(pending_end_lsn)); + plugins_->record_role_change_event(palf_id_, FOLLOWER, ObReplicaState::PENDING, + FOLLOWER, ObReplicaState::ACTIVE, EXTRA_INFOS); + PALF_EVENT("follower_pending_to_follower_active", palf_id_, K(ret), K_(self), K_(leader), + K_(pending_end_lsn)); return ret; } @@ -590,6 +602,9 @@ int LogStateMgr::to_reconfirm_(const int64_t new_leader_epoch) int LogStateMgr::follower_active_to_reconfirm_(const int64_t new_leader_epoch) { int ret = to_reconfirm_(new_leader_epoch); + PALF_EVENT_REPORT_INFO_KV(K_(leader), K(new_leader_epoch)); + plugins_->record_role_change_event(palf_id_, FOLLOWER, ObReplicaState::ACTIVE, + LEADER, ObReplicaState::RECONFIRM, EXTRA_INFOS); PALF_EVENT("follower_active_to_reconfirm", palf_id_, K(ret), K_(self), K(new_leader_epoch), K(leader_), K_(pending_end_lsn)); return ret; @@ -598,6 +613,9 @@ int LogStateMgr::follower_active_to_reconfirm_(const int64_t new_leader_epoch) int LogStateMgr::follower_pending_to_reconfirm_(const int64_t new_leader_epoch) { int ret = to_reconfirm_(new_leader_epoch); + PALF_EVENT_REPORT_INFO_KV(K_(leader), K(new_leader_epoch), K_(pending_end_lsn)); + plugins_->record_role_change_event(palf_id_, FOLLOWER, ObReplicaState::PENDING, + LEADER, ObReplicaState::RECONFIRM, EXTRA_INFOS); PALF_EVENT("follower_pending_to_reconfirm", palf_id_, K(ret), K_(self), K(new_leader_epoch), K(leader_), K_(pending_end_lsn)); return ret; @@ -605,14 +623,17 @@ int LogStateMgr::follower_pending_to_reconfirm_(const int64_t new_leader_epoch) int LogStateMgr::reconfirm_to_follower_pending_() { - PALF_EVENT("reconfirm_to_follower_pending", palf_id_, K_(self), K_(leader), "is_allow_vote", - is_allow_vote(), K(lbt())); int ret = OB_SUCCESS; if (OB_FAIL(to_follower_pending_())) { PALF_LOG(WARN, "to_follower_pending_ failed, try again", K(ret), K_(palf_id)); } else { reset_status_(); update_role_and_state_(FOLLOWER, PENDING); + PALF_EVENT_REPORT_INFO_KV(K_(leader), K_(allow_vote)); + plugins_->record_role_change_event(palf_id_, LEADER, ObReplicaState::RECONFIRM, + FOLLOWER, ObReplicaState::PENDING, EXTRA_INFOS); + PALF_EVENT("reconfirm_to_follower_pending", palf_id_, K_(self), K_(leader), "is_allow_vote", + is_allow_vote(), K(lbt())); } return ret; } @@ -642,6 +663,9 @@ int LogStateMgr::reconfirm_to_leader_active_() } const int64_t reconfirm_to_active_cost = ObTimeUtility::current_time() - reconfirm_start_time_us_; PALF_EVENT("reconfirm_to_leader_active end", palf_id_, K(ret), K_(self), K(reconfirm_to_active_cost), K_(role), K_(state)); + PALF_EVENT_REPORT_INFO_KV(K(reconfirm_stage_cost), K(reconfirm_to_active_cost)); + plugins_->record_role_change_event(palf_id_, LEADER, ObReplicaState::RECONFIRM, + LEADER, ObReplicaState::ACTIVE, EXTRA_INFOS); } return ret; @@ -666,6 +690,9 @@ int LogStateMgr::leader_active_to_follower_pending_() update_role_and_state_(FOLLOWER, PENDING); } + PALF_EVENT_REPORT_INFO_KV(K_(pending_end_lsn)); + plugins_->record_role_change_event(palf_id_, LEADER, ObReplicaState::ACTIVE, + FOLLOWER, ObReplicaState::PENDING, EXTRA_INFOS); PALF_EVENT("leader_active_to_follower_pending_", palf_id_, K(ret), K_(self), K_(role), K_(state), K_(pending_end_lsn)); return ret; } diff --git a/src/logservice/palf/log_state_mgr.h b/src/logservice/palf/log_state_mgr.h index b2bafe614..dbc958cdf 100644 --- a/src/logservice/palf/log_state_mgr.h +++ b/src/logservice/palf/log_state_mgr.h @@ -56,7 +56,8 @@ public: LogEngine *log_engine, LogConfigMgr *mm, LogModeMgr *mode_mgr_, - palf::PalfRoleChangeCbWrapper *palf_role_change_cb); + palf::PalfRoleChangeCbWrapper *palf_role_change_cb, + LogPlugins *plugins); virtual bool is_state_changed(); virtual int switch_state(); virtual int handle_prepare_request(const common::ObAddr &new_leader, @@ -168,6 +169,7 @@ private: LogModeMgr *mode_mgr_; palf::PalfRoleChangeCbWrapper *palf_role_change_cb_; election::Election* election_; + LogPlugins *plugins_; union { int64_t role_state_val_; diff --git a/src/logservice/palf/palf_callback.h b/src/logservice/palf/palf_callback.h index 1812cbfc4..3b72d2719 100644 --- a/src/logservice/palf/palf_callback.h +++ b/src/logservice/palf/palf_callback.h @@ -13,6 +13,7 @@ #ifndef OCEANBASE_LOGSERVICE_PALF_CALLBACK_ #define OCEANBASE_LOGSERVICE_PALF_CALLBACK_ #include +#include "common/ob_role.h" #include "lib/utility/ob_macro_utils.h" #include "lib/list/ob_dlink_node.h" #include "lib/utility/ob_print_utils.h" @@ -54,6 +55,34 @@ public: virtual int nonblock_renew_leader(const int64_t id) = 0; }; +class PalfMonitorCb +{ +public: + // record events + virtual int record_role_change_event(const int64_t palf_id, + const common::ObRole &prev_role, + const palf::ObReplicaState &prev_state, + const common::ObRole &curr_role, + const palf::ObReplicaState &curr_state, + const char *extra_info = "") = 0; + // performance statistic + virtual int add_log_write_stat(const int64_t palf_id, const int64_t log_write_size) = 0; +}; + +class PalfLiteMonitorCb +{ +public: + // @desc: record creating or deleting events + // add/remove cluster: valid cluster_id, invalid tenant_id, invalid ls_id, + // add/remove tenant: valid cluster_id, valid tenant_id, invalid ls_id, + // add/remove ls: valid cluster_id, valid tenant_id, valid ls_id, + virtual int record_create_or_delete_event(const int64_t cluster_id, + const uint64_t tenant_id, + const int64_t ls_id, + const bool is_create, + const char *extra_info) = 0; +}; + } // end namespace palf } // end namespace oceanbase #endif diff --git a/src/logservice/palf/palf_callback_wrapper.cpp b/src/logservice/palf/palf_callback_wrapper.cpp index e94181a0e..c5b4cda8a 100644 --- a/src/logservice/palf/palf_callback_wrapper.cpp +++ b/src/logservice/palf/palf_callback_wrapper.cpp @@ -168,5 +168,125 @@ int PalfRebuildCbWrapper::on_rebuild(const int64_t id, const LSN &lsn) return ret; } + +LogPlugins::LogPlugins() + : loc_lock_(), + loc_cb_(NULL), + palf_monitor_lock_(), + palf_monitor_(NULL), + palflite_monitor_lock_(), + palflite_monitor_(NULL) { } + +LogPlugins::~LogPlugins() +{ + destroy(); +} + +void LogPlugins::destroy() +{ + { + common::RWLock::WLockGuard guard(loc_lock_); + loc_cb_ = NULL; + } + { + common::RWLock::WLockGuard guard(palf_monitor_lock_); + palf_monitor_ = NULL; + } + { + common::RWLock::WLockGuard guard(palflite_monitor_lock_); + palflite_monitor_ = NULL; + } +} + +template<> +int LogPlugins::add_plugin(PalfLocationCacheCb *plugin) +{ + int ret = OB_SUCCESS; + common::RWLock::WLockGuard guard(loc_lock_); + if (OB_ISNULL(plugin)) { + ret = OB_INVALID_ARGUMENT; + PALF_LOG(WARN, "Palf plugin is NULL", KP(plugin)); + } else if (OB_NOT_NULL(loc_cb_)) { + ret = OB_OP_NOT_ALLOW; + PALF_LOG(INFO, "Palf plugin is not NULL", KP(plugin), KP_(loc_cb)); + } else { + loc_cb_ = plugin; + PALF_LOG(INFO, "add_plugin success", KP(plugin)); + } + return ret; +} + +template<> +int LogPlugins::del_plugin(PalfLocationCacheCb *plugin) +{ + int ret = OB_SUCCESS; + common::RWLock::WLockGuard guard(loc_lock_); + if (OB_NOT_NULL(loc_cb_)) { + PALF_LOG(INFO, "del_plugin success", KP_(loc_cb)); + loc_cb_ = NULL; + } + return ret; +} + +template<> +int LogPlugins::add_plugin(PalfMonitorCb *plugin) +{ + int ret = OB_SUCCESS; + common::RWLock::WLockGuard guard(palf_monitor_lock_); + if (OB_ISNULL(plugin)) { + ret = OB_INVALID_ARGUMENT; + PALF_LOG(WARN, "Palf plugin is NULL", KP(plugin)); + } else if (OB_NOT_NULL(palf_monitor_)) { + ret = OB_OP_NOT_ALLOW; + PALF_LOG(INFO, "Palf plugin is not NULL", KP(plugin), KP_(palf_monitor)); + } else { + palf_monitor_ = plugin; + PALF_LOG(INFO, "add_plugin success", KP(plugin)); + } + return ret; +} + +template<> +int LogPlugins::del_plugin(PalfMonitorCb *plugin) +{ + int ret = OB_SUCCESS; + common::RWLock::WLockGuard guard(palf_monitor_lock_); + if (OB_NOT_NULL(palf_monitor_)) { + PALF_LOG(INFO, "del_plugin success", KP_(palf_monitor)); + palf_monitor_ = NULL; + } + return ret; +} + +template<> +int LogPlugins::add_plugin(PalfLiteMonitorCb *plugin) +{ + int ret = OB_SUCCESS; + common::RWLock::WLockGuard guard(palf_monitor_lock_); + if (OB_ISNULL(plugin)) { + ret = OB_INVALID_ARGUMENT; + PALF_LOG(WARN, "Palf plugin is NULL", KP(plugin)); + } else if (OB_NOT_NULL(palflite_monitor_)) { + ret = OB_OP_NOT_ALLOW; + PALF_LOG(INFO, "Palf plugin is not NULL", KP(plugin), KP_(loc_cb)); + } else { + palflite_monitor_ = plugin; + PALF_LOG(INFO, "add_plugin success", KP(plugin)); + } + return ret; +} + +template<> +int LogPlugins::del_plugin(PalfLiteMonitorCb *plugin) +{ + int ret = OB_SUCCESS; + common::RWLock::WLockGuard guard(palf_monitor_lock_); + if (OB_NOT_NULL(palflite_monitor_)) { + PALF_LOG(INFO, "del_plugin success", KP_(palflite_monitor)); + palflite_monitor_ = NULL; + } + return ret; +} + }; // end namespace palf }; // end namespace oceanbase diff --git a/src/logservice/palf/palf_callback_wrapper.h b/src/logservice/palf/palf_callback_wrapper.h index 261c4472c..01f334c0b 100644 --- a/src/logservice/palf/palf_callback_wrapper.h +++ b/src/logservice/palf/palf_callback_wrapper.h @@ -16,7 +16,9 @@ #include "lib/list/ob_dlist.h" #include "lib/ob_errno.h" #include "lib/lock/ob_spin_lock.h" +#include "lib/lock/ob_tc_rwlock.h" #include "palf_callback.h" +#include "share/ob_delegate.h" namespace oceanbase { namespace palf @@ -83,6 +85,48 @@ private: ObDList list_; ObSpinLock lock_; }; + +#define PALF_PLUGINS_DELEGATE_PTR(delegate_obj, lock_, func_name) \ + template \ + int func_name(Args &&...args) { \ + int ret = OB_SUCCESS; \ + common::RWLock::RLockGuard guard(lock_); \ + if (OB_UNLIKELY(OB_ISNULL(delegate_obj))) { \ + ret = OB_NOT_INIT; \ + } else { \ + ret = delegate_obj->func_name(std::forward(args)...); \ + } \ + return ret; \ + } + +class LogPlugins +{ +public: + LogPlugins(); + ~LogPlugins(); + void destroy(); + template + int add_plugin(T *plugin); + template + int del_plugin(T *plugin); + + PALF_PLUGINS_DELEGATE_PTR(loc_cb_, loc_lock_, get_leader); + PALF_PLUGINS_DELEGATE_PTR(loc_cb_, loc_lock_, nonblock_get_leader); + PALF_PLUGINS_DELEGATE_PTR(loc_cb_, loc_lock_, nonblock_renew_leader); + + PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_role_change_event); + PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, add_log_write_stat); + + PALF_PLUGINS_DELEGATE_PTR(palflite_monitor_, palflite_monitor_lock_, record_create_or_delete_event); + TO_STRING_KV(KP_(loc_cb), KP_(palf_monitor), KP_(palflite_monitor)); +private: + common::RWLock loc_lock_; + PalfLocationCacheCb *loc_cb_; + common::RWLock palf_monitor_lock_; + PalfMonitorCb *palf_monitor_; + common::RWLock palflite_monitor_lock_; + PalfLiteMonitorCb *palflite_monitor_; +}; } // end namespace palf } // end namespace oceanbase #endif diff --git a/src/logservice/palf/palf_env.cpp b/src/logservice/palf/palf_env.cpp index 4099ebb21..92e8453b9 100644 --- a/src/logservice/palf/palf_env.cpp +++ b/src/logservice/palf/palf_env.cpp @@ -45,6 +45,7 @@ int PalfEnv::create_palf_env( rpc::frame::ObReqTransport *transport, common::ObILogAllocator *log_alloc_mgr, ILogBlockPool *log_block_pool, + PalfMonitorCb *monitor, PalfEnv *&palf_env) { int ret = OB_SUCCESS; @@ -55,7 +56,7 @@ int PalfEnv::create_palf_env( CLOG_LOG(WARN, "delete_tmp_file_or_directory_at failed", K(ret), K(base_dir)); } else if (OB_FAIL(palf_env->palf_env_impl_.init(options, base_dir, self, obrpc::ObRpcNetHandler::CLUSTER_ID, MTL_ID(), transport, - log_alloc_mgr, log_block_pool))) { + log_alloc_mgr, log_block_pool, monitor))) { PALF_LOG(WARN, "PalfEnvImpl init failed", K(ret), K(base_dir)); } else if (OB_FAIL(palf_env->start_())) { PALF_LOG(WARN, "start palf env failed", K(ret), K(base_dir)); diff --git a/src/logservice/palf/palf_env.h b/src/logservice/palf/palf_env.h index 4c3d8cc69..493fa977e 100644 --- a/src/logservice/palf/palf_env.h +++ b/src/logservice/palf/palf_env.h @@ -55,6 +55,7 @@ public: rpc::frame::ObReqTransport *transport, common::ObILogAllocator *alloc_mgr, ILogBlockPool *log_block_pool, + PalfMonitorCb *monitor, PalfEnv *&palf_env); // static interface // destroy the palf env, and set "palf_env" to NULL. diff --git a/src/logservice/palf/palf_env_impl.cpp b/src/logservice/palf/palf_env_impl.cpp index 72e78ec12..c027ee4c3 100644 --- a/src/logservice/palf/palf_env_impl.cpp +++ b/src/logservice/palf/palf_env_impl.cpp @@ -140,6 +140,7 @@ PalfEnvImpl::PalfEnvImpl() : palf_meta_lock_(common::ObLatchIds::PALF_ENV_LOCK), log_io_worker_(), block_gc_timer_task_(), log_updater_(), + monitor_(NULL), disk_options_wrapper_(), check_disk_print_log_interval_(OB_INVALID_TIMESTAMP), self_(), @@ -166,7 +167,8 @@ int PalfEnvImpl::init( const int64_t tenant_id, rpc::frame::ObReqTransport *transport, common::ObILogAllocator *log_alloc_mgr, - ILogBlockPool *log_block_pool) + ILogBlockPool *log_block_pool, + PalfMonitorCb *monitor) { int ret = OB_SUCCESS; int pret = 0; @@ -180,10 +182,10 @@ int PalfEnvImpl::init( ret = OB_INIT_TWICE; PALF_LOG(ERROR, "PalfEnvImpl is inited twiced", K(ret)); } else if (OB_ISNULL(base_dir) || !self.is_valid() || NULL == transport - || OB_ISNULL(log_alloc_mgr) || OB_ISNULL(log_block_pool)) { + || OB_ISNULL(log_alloc_mgr) || OB_ISNULL(log_block_pool) || OB_ISNULL(monitor)) { ret = OB_INVALID_ARGUMENT; PALF_LOG(ERROR, "invalid arguments", K(ret), KP(transport), K(base_dir), K(self), KP(transport), - KP(log_alloc_mgr), KP(log_block_pool)); + KP(log_alloc_mgr), KP(log_block_pool), KP(monitor)); } else if (OB_FAIL(fetch_log_engine_.init(this, log_alloc_mgr))) { PALF_LOG(ERROR, "FetchLogEngine init failed", K(ret)); } else if (OB_FAIL(log_rpc_.init(self, cluster_id, tenant_id, transport))) { @@ -221,6 +223,7 @@ int PalfEnvImpl::init( } else { log_alloc_mgr_ = log_alloc_mgr; log_block_pool_ = log_block_pool; + monitor_ = monitor; self_ = self; tenant_id_ = tenant_id; is_inited_ = true; @@ -300,6 +303,7 @@ void PalfEnvImpl::destroy() log_updater_.destroy(); log_rpc_.destroy(); log_alloc_mgr_ = NULL; + monitor_ = NULL; self_.reset(); log_dir_[0] = '\0'; tmp_log_dir_[0] = '\0'; @@ -369,6 +373,8 @@ int PalfEnvImpl::create_palf_handle_impl_(const int64_t palf_id, // NB: always insert value into hash map finally. } else if (OB_FAIL(palf_handle_impl_map_.insert_and_get(hash_map_key, palf_handle_impl))) { PALF_LOG(WARN, "palf_handle_impl_map_ insert_and_get failed", K(ret), K(palf_id)); + } else if (OB_FAIL(palf_handle_impl->set_monitor_cb(monitor_))) { + PALF_LOG(WARN, "set_monitor_cb failed", K(ret), K(palf_id), KP_(monitor)); } else { palf_handle_impl->set_scan_disk_log_finished(); ipalf_handle_impl = palf_handle_impl; @@ -933,6 +939,7 @@ int PalfEnvImpl::reload_palf_handle_impl_(const int64_t palf_id) } else if (OB_FAIL(palf_handle_impl_map_.insert_and_get(hash_map_key, tmp_palf_handle_impl))) { PALF_LOG(WARN, "palf_handle_impl_map_ insert_and_get failed", K(ret), K(palf_id), K(tmp_palf_handle_impl)); } else { + (void) tmp_palf_handle_impl->set_monitor_cb(monitor_); (void) tmp_palf_handle_impl->set_scan_disk_log_finished(); palf_handle_impl_map_.revert(tmp_palf_handle_impl); int64_t cost_ts = ObTimeUtility::current_time() - start_ts; diff --git a/src/logservice/palf/palf_env_impl.h b/src/logservice/palf/palf_env_impl.h index e45089601..295dc193c 100644 --- a/src/logservice/palf/palf_env_impl.h +++ b/src/logservice/palf/palf_env_impl.h @@ -182,7 +182,8 @@ public: const int64_t tenant_id, rpc::frame::ObReqTransport *transport, common::ObILogAllocator *alloc_mgr, - ILogBlockPool *log_block_pool); + ILogBlockPool *log_block_pool, + PalfMonitorCb *monitor); // start函数包含两层含义: // @@ -315,6 +316,7 @@ private: LogIOWorker log_io_worker_; BlockGCTimerTask block_gc_timer_task_; LogUpdater log_updater_; + PalfMonitorCb *monitor_; PalfDiskOptionsWrapper disk_options_wrapper_; int64_t check_disk_print_log_interval_; diff --git a/src/logservice/palf/palf_handle_impl.cpp b/src/logservice/palf/palf_handle_impl.cpp index c8d4cd528..0e292e6e9 100644 --- a/src/logservice/palf/palf_handle_impl.cpp +++ b/src/logservice/palf/palf_handle_impl.cpp @@ -51,7 +51,7 @@ PalfHandleImpl::PalfHandleImpl() fs_cb_wrapper_(), role_change_cb_wrpper_(), rebuild_cb_wrapper_(), - lc_cb_(NULL), + plugins_(), last_locate_scn_(), last_send_mode_meta_time_us_(OB_INVALID_TIMESTAMP), last_locate_block_(LOG_INVALID_BLOCK_ID), @@ -221,7 +221,7 @@ void PalfHandleImpl::destroy() is_inited_ = false; diskspace_enough_ = true; cached_is_in_sync_ = false; - lc_cb_ = NULL; + plugins_.destroy(); self_.reset(); palf_id_ = INVALID_PALF_ID; fetch_log_engine_ = NULL; @@ -2017,17 +2017,10 @@ int PalfHandleImpl::set_location_cache_cb(PalfLocationCacheCb *lc_cb) } else if (OB_ISNULL(lc_cb)) { ret = OB_INVALID_ARGUMENT; PALF_LOG(WARN, "lc_cb is NULL, can't register", KR(ret), KPC(this)); + } else if (OB_FAIL(plugins_.add_plugin(lc_cb))) { + PALF_LOG(WARN, "add_plugin failed", KR(ret), KPC(this), KP(lc_cb), K_(plugins)); } else { - WLockGuard guard(lock_); - if (OB_NOT_NULL(lc_cb_)) { - ret = OB_NOT_SUPPORTED; - PALF_LOG(WARN, "lc_cb_ is not NULL, can't register", KR(ret), KPC(this)); - } else if (OB_FAIL(sw_.set_location_cache_cb(lc_cb))) { - PALF_LOG(WARN, "sw_.set_location_cache_cb failed", KR(ret), KPC(this)); - } else { - lc_cb_ = lc_cb; - PALF_LOG(INFO, "set_location_cache_cb success", KPC(this), KP_(lc_cb)); - } + PALF_LOG(INFO, "set_location_cache_cb success", KPC(this), K_(plugins), KP(lc_cb)); } return ret; } @@ -2067,13 +2060,40 @@ int PalfHandleImpl::reset_election_priority() int PalfHandleImpl::reset_location_cache_cb() { int ret = OB_SUCCESS; + PalfLocationCacheCb *loc_cb = NULL; if (IS_NOT_INIT) { ret = OB_NOT_INIT; - } else if (OB_FAIL(sw_.reset_location_cache_cb())) { - PALF_LOG(WARN, "sw_.reset_location_cache_cb failed", KR(ret), KPC(this)); + } else if (OB_FAIL(plugins_.del_plugin(loc_cb))) { + PALF_LOG(WARN, "del_plugin failed", KR(ret), KPC(this), K_(plugins)); + } + return ret; +} + +int PalfHandleImpl::set_monitor_cb(PalfMonitorCb *monitor_cb) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + PALF_LOG(WARN, "not initted", KR(ret), KPC(this)); + } else if (OB_ISNULL(monitor_cb)) { + ret = OB_INVALID_ARGUMENT; + PALF_LOG(WARN, "lc_cb is NULL, can't register", KR(ret), KPC(this)); + } else if (OB_FAIL(plugins_.add_plugin(monitor_cb))) { + PALF_LOG(WARN, "add_plugin failed", KR(ret), KPC(this), KP(monitor_cb), K_(plugins)); } else { - WLockGuard guard(lock_); - lc_cb_ = NULL; + PALF_LOG(INFO, "set_monitor_cb success", KPC(this), K_(plugins), KP(monitor_cb)); + } + return ret; +} + +int PalfHandleImpl::reset_monitor_cb() +{ + int ret = OB_SUCCESS; + PalfMonitorCb *monitor_cb = NULL; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (OB_FAIL(plugins_.del_plugin(monitor_cb))) { + PALF_LOG(WARN, "del_plugin failed", KR(ret), KPC(this), K_(plugins)); } return ret; } @@ -2398,7 +2418,7 @@ int PalfHandleImpl::do_init_mem_( ret = OB_ERR_UNEXPECTED; PALF_LOG(ERROR, "error unexpected", K(ret), K(palf_id)); } else if (OB_FAIL(sw_.init(palf_id, self, &state_mgr_, &config_mgr_, &mode_mgr_, - &log_engine_, &fs_cb_wrapper_, alloc_mgr, palf_base_info, is_normal_replica))) { + &log_engine_, &fs_cb_wrapper_, alloc_mgr, &plugins_, palf_base_info, is_normal_replica))) { PALF_LOG(WARN, "sw_ init failed", K(ret), K(palf_id)); } else if (OB_FAIL(election_.init_and_start(palf_id, election_timer, @@ -2412,10 +2432,10 @@ int PalfHandleImpl::do_init_mem_( }))) { PALF_LOG(WARN, "election_ init failed", K(ret), K(palf_id)); } else if (OB_FAIL(state_mgr_.init(palf_id, self, log_meta.get_log_prepare_meta(), log_meta.get_log_replica_property_meta(), - &election_, &sw_, &reconfirm_, &log_engine_, &config_mgr_, &mode_mgr_, &role_change_cb_wrpper_))) { + &election_, &sw_, &reconfirm_, &log_engine_, &config_mgr_, &mode_mgr_, &role_change_cb_wrpper_, &plugins_))) { PALF_LOG(WARN, "state_mgr_ init failed", K(ret), K(palf_id)); } else if (OB_FAIL(config_mgr_.init(palf_id, self, log_meta.get_log_config_meta(), &log_engine_, - &sw_, &state_mgr_, &election_, &mode_mgr_, &reconfirm_))) { + &sw_, &state_mgr_, &election_, &mode_mgr_, &reconfirm_, &plugins_))) { PALF_LOG(WARN, "config_mgr_ init failed", K(ret), K(palf_id)); } else if (is_normal_replica && OB_FAIL(reconfirm_.init(palf_id, self, &sw_, &state_mgr_, &config_mgr_, &mode_mgr_, &log_engine_))) { PALF_LOG(WARN, "reconfirm_ init failed", K(ret), K(palf_id)); @@ -2426,7 +2446,6 @@ int PalfHandleImpl::do_init_mem_( fetch_log_engine_ = fetch_log_engine; allocator_ = alloc_mgr; self_ = self; - lc_cb_ = NULL; has_set_deleted_ = false; palf_env_impl_ = palf_env_impl; is_inited_ = true; @@ -4274,9 +4293,7 @@ int PalfHandleImpl::get_leader_max_scn_(SCN &max_scn, LSN &end_lsn) end_lsn.reset(); // use lc_cb_ in here without rlock is safe, because we don't reset lc_cb_ // until this PalfHandleImpl is destoryed. - if (OB_ISNULL(lc_cb_)) { - CLOG_LOG(TRACE, "lc_cb_ is NULL, skip", K(ret), K_(self), K_(palf_id)); - } else if (OB_FAIL(lc_cb_->nonblock_get_leader(palf_id_, leader))) { + if (OB_FAIL(plugins_.nonblock_get_leader(palf_id_, leader))) { CLOG_LOG(WARN, "get_leader failed", K(ret), K_(self), K_(palf_id)); need_renew_leader = true; } else if (false == leader.is_valid()) { @@ -4289,7 +4306,7 @@ int PalfHandleImpl::get_leader_max_scn_(SCN &max_scn, LSN &end_lsn) end_lsn = resp.end_lsn_; } if (need_renew_leader && palf_reach_time_interval(500 * 1000, last_renew_loc_time_us_)) { - (void) lc_cb_->nonblock_renew_leader(palf_id_); + (void) plugins_.nonblock_renew_leader(palf_id_); } return ret; } diff --git a/src/logservice/palf/palf_handle_impl.h b/src/logservice/palf/palf_handle_impl.h index 69aaf5248..e6db41476 100644 --- a/src/logservice/palf/palf_handle_impl.h +++ b/src/logservice/palf/palf_handle_impl.h @@ -780,6 +780,8 @@ public: int unregister_rebuild_cb(palf::PalfRebuildCbNode *rebuild_cb) override final; int set_location_cache_cb(PalfLocationCacheCb *lc_cb) override final; int reset_location_cache_cb() override final; + int set_monitor_cb(PalfMonitorCb *monitor_cb); + int reset_monitor_cb(); int set_election_priority(election::ElectionPriority *priority) override final; int reset_election_priority() override final; // ==================== Callback end ======================== @@ -1107,7 +1109,7 @@ private: palf::PalfFSCbWrapper fs_cb_wrapper_; palf::PalfRoleChangeCbWrapper role_change_cb_wrpper_; palf::PalfRebuildCbWrapper rebuild_cb_wrapper_; - PalfLocationCacheCb *lc_cb_; + LogPlugins plugins_; // ======optimization for locate_by_scn_coarsely========= mutable SpinLock last_locate_lock_; share::SCN last_locate_scn_; diff --git a/src/observer/ob_server_event_history_table_operator.cpp b/src/observer/ob_server_event_history_table_operator.cpp index e81c316cf..d4d6ffa47 100644 --- a/src/observer/ob_server_event_history_table_operator.cpp +++ b/src/observer/ob_server_event_history_table_operator.cpp @@ -22,10 +22,11 @@ using namespace share; int ObAllServerEventHistoryTableOperator::init(common::ObMySQLProxy &proxy, const common::ObAddr &self_addr) { int ret = OB_SUCCESS; + const bool is_rs_event = false; + const bool is_server_event = true; + set_addr(self_addr, is_rs_event, is_server_event); if (OB_FAIL(ObEventHistoryTableOperator::init(proxy))) { } else { - const bool is_rs_event = false; - set_addr(self_addr, is_rs_event); set_event_table(share::OB_ALL_SERVER_EVENT_HISTORY_TNAME); } return ret; diff --git a/src/observer/ob_server_event_history_table_operator.h b/src/observer/ob_server_event_history_table_operator.h index 579be63a0..a44b6d0dc 100644 --- a/src/observer/ob_server_event_history_table_operator.h +++ b/src/observer/ob_server_event_history_table_operator.h @@ -41,5 +41,7 @@ private: SERVER_EVENT_INSTANCE.add_event(args) #define SERVER_EVENT_SYNC_ADD(args...) \ SERVER_EVENT_INSTANCE.sync_add_event(args) +#define SERVER_EVENT_ADD_WITH_RETRY(args...) \ + SERVER_EVENT_INSTANCE.add_event_with_retry(args) #endif /* _OB_SERVER_EVENT_HISTORY_TABLE_OPERATOR_H */ diff --git a/src/rootserver/ob_rs_event_history_table_operator.cpp b/src/rootserver/ob_rs_event_history_table_operator.cpp index b7aeb9cd7..a47803ff9 100644 --- a/src/rootserver/ob_rs_event_history_table_operator.cpp +++ b/src/rootserver/ob_rs_event_history_table_operator.cpp @@ -23,10 +23,11 @@ int ObRsEventHistoryTableOperator::init(common::ObMySQLProxy &proxy, const common::ObAddr &self_addr) { int ret = OB_SUCCESS; + const bool is_rs_event = true; + const bool is_server_event = false; + set_addr(self_addr, is_rs_event, is_server_event); if (OB_FAIL(ObEventHistoryTableOperator::init(proxy))) { } else { - const bool is_rs_event = true; - set_addr(self_addr, is_rs_event); set_event_table(share::OB_ALL_ROOTSERVICE_EVENT_HISTORY_TNAME); } return ret; diff --git a/src/share/ob_event_history_table_operator.cpp b/src/share/ob_event_history_table_operator.cpp index 7c558ad53..947f48d36 100644 --- a/src/share/ob_event_history_table_operator.cpp +++ b/src/share/ob_event_history_table_operator.cpp @@ -166,7 +166,9 @@ int ObEventHistoryTableOperator::ObEventTableUpdateTask::process() ObEventHistoryTableOperator::ObEventHistoryTableOperator() : inited_(false), stopped_(false), last_event_ts_(0), lock_(ObLatchIds::RS_EVENT_TS_LOCK), proxy_(NULL), event_queue_(), - event_table_name_(NULL), self_addr_(), is_rootservice_event_history_(false) + event_table_name_(NULL), self_addr_(), is_rootservice_event_history_(false), + is_server_event_history_(false), + timer_() { } @@ -182,11 +184,15 @@ int ObEventHistoryTableOperator::init(common::ObMySQLProxy &proxy) LOG_WARN("init twice", K(ret)); } else { const int64_t thread_count = 1; + const int64_t queue_size_square_of_2 = 14; if (OB_FAIL(event_queue_.init(thread_count, "EvtHisUpdTask", TASK_QUEUE_SIZE, TASK_MAP_SIZE, TOTAL_LIMIT, HOLD_LIMIT, PAGE_SIZE))) { LOG_WARN("task_queue_ init failed", K(thread_count), LITERAL_K(TASK_QUEUE_SIZE), LITERAL_K(TASK_MAP_SIZE), LITERAL_K(TOTAL_LIMIT), LITERAL_K(HOLD_LIMIT), LITERAL_K(PAGE_SIZE), K(ret)); + } else if (is_server_event_history_ && + OB_FAIL(timer_.init_and_start(thread_count, 5_s, "EventTimer", queue_size_square_of_2))) { + LOG_WARN("int global event report timer failed", KR(ret)); } else { event_queue_.set_label(ObModIds::OB_RS_EVENT_QUEUE); proxy_ = &proxy; @@ -201,6 +207,7 @@ void ObEventHistoryTableOperator::destroy() { stopped_ = true; event_queue_.destroy(); + timer_.stop_and_wait(); // allocator should destroy after event_queue_ destroy inited_ = false; } @@ -325,5 +332,50 @@ int ObEventHistoryTableOperator::process_task(const ObString &sql, const bool is return ret; } +int ObEventHistoryTableOperator::add_event_to_timer_(const common::ObSqlString &sql) +{ + int ret = OB_SUCCESS; + int retry_times = 0; + ObAddr self_addr = self_addr_; + common::ObMySQLProxy *proxy = proxy_; + ObUniqueGuard uniq_holder; + if (OB_FAIL(ob_make_unique(uniq_holder))) { + SHARE_LOG(WARN, "fail to make unique guard"); + } else if (OB_FAIL(uniq_holder->assign(sql.string()))) { + SHARE_LOG(WARN, "fail to create unique ownership of string"); + } else if (OB_FAIL(timer_.schedule_task_ignore_handle_repeat_and_immediately(15_s, [retry_times, self_addr, uniq_holder, proxy]() mutable -> bool { + int ret = OB_SUCCESS; + bool stop_flag = false; + char ip[64] = {0}; + int64_t affected_rows = 0; + const char *sql = to_cstring(uniq_holder->get_ob_string()); + if (OB_ISNULL(proxy)) { + SHARE_LOG(WARN, "proxy_ is NULL", KP(proxy)); + } else if (!self_addr.ip_to_string(ip, sizeof(ip))) { + ret = OB_ERR_UNEXPECTED; + SHARE_LOG(WARN, "ip to string failed"); + } else if (OB_FAIL(proxy->write(sql, affected_rows))) { + SHARE_LOG(WARN, "sync execute sql failed", K(sql), K(ret)); + } else if (!is_single_row(affected_rows)) { + ret = OB_ERR_UNEXPECTED; + SHARE_LOG(WARN, "affected_rows expected to be one", K(affected_rows), K(ret)); + } else { + ObTaskController::get().allow_next_syslog(); + SHARE_LOG(INFO, "event table sync add event success", K(ret), K(sql)); + } + if (++retry_times > MAX_RETRY_COUNT || OB_SUCC(ret)) { + // this may happened cause inner sql may not work for a long time, + // but i have tried my very best, so let it miss. + if (retry_times > MAX_RETRY_COUNT) { + SHARE_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "fail to schedule report event task cause retry too much times"); + } + stop_flag = true; + } + return stop_flag; + }))) { + SHARE_LOG(ERROR, "fail to schedule report event task"); + } + return ret; +} }//end namespace rootserver }//end namespace oceanbase diff --git a/src/share/ob_event_history_table_operator.h b/src/share/ob_event_history_table_operator.h index 36e3d004e..c2bc54c7c 100644 --- a/src/share/ob_event_history_table_operator.h +++ b/src/share/ob_event_history_table_operator.h @@ -15,13 +15,16 @@ #include "lib/oblog/ob_log.h" #include "lib/time/ob_time_utility.h" +#include "lib/guard/ob_unique_guard.h" #include "lib/string/ob_sql_string.h" +#include "lib/string/ob_string_holder.h" #include "lib/queue/ob_dedup_queue.h" #include "lib/thread/ob_work_queue.h" #include "lib/lock/ob_mutex.h" #include "lib/mysqlclient/ob_mysql_proxy.h" #include "share/inner_table/ob_inner_table_schema.h" #include "share/ob_dml_sql_splicer.h" +#include "share/ob_occam_timer.h" #include "share/ob_task_define.h" #include @@ -127,6 +130,10 @@ public: // if number of others is not 13, should be even, every odd of them are name, every even of them are value template int sync_add_event(const char *module, const char *event, Rest &&...others); + // number of others should not less than 0, or more than 13 + // if number of others is not 13, should be even, every odd of them are name, every even of them are value + template + int add_event_with_retry(const char *module, const char *event, Rest &&...others); virtual int async_delete() = 0; protected: @@ -139,7 +146,15 @@ protected: // recursive end if there is an extra_info template int sync_add_event_helper_(share::ObDMLSqlSplicer &dml, Value &&extro_info); - void set_addr(const common::ObAddr self_addr, bool is_rs_ev) { self_addr_ = self_addr; is_rootservice_event_history_ = is_rs_ev; } + int add_event_to_timer_(const common::ObSqlString &sql); + void set_addr(const common::ObAddr self_addr, + bool is_rs_ev, + bool is_server_ev) + { + self_addr_ = self_addr; + is_rootservice_event_history_ = is_rs_ev; + is_server_event_history_ = is_server_ev; + } const common::ObAddr &get_addr() const { return self_addr_; } void set_event_table(const char* tname) { event_table_name_ = tname; } const char *get_event_table() const { return event_table_name_; } @@ -156,6 +171,7 @@ protected: static const int64_t RS_EVENT_HISTORY_DELETE_TIME = 7L * 24L * 3600L * 1000L * 1000L; // 7DAY static const int64_t SERVER_EVENT_HISTORY_DELETE_TIME = 2L * 24L * 3600L * 1000L * 1000L; // 2DAY static const int64_t UNIT_LOAD_HISTORY_DELETE_TIME = 7L * 24L * 3600L * 1000L * 1000L; // 2DAY + static const int64_t MAX_RETRY_COUNT = 12; virtual int process_task(const common::ObString &sql, const bool is_delete); private: @@ -168,6 +184,9 @@ private: const char* event_table_name_; common::ObAddr self_addr_; bool is_rootservice_event_history_; + bool is_server_event_history_; + // timer for add_event_with_retry + common::ObOccamTimer timer_; protected: ObEventHistoryTableOperator(); private: @@ -458,6 +477,62 @@ int ObEventHistoryTableOperator::sync_add_event(const char *module, const char * return ret; } +template +int ObEventHistoryTableOperator::add_event_with_retry(const char *module, const char *event, Rest &&...others) +{ + static_assert(sizeof...(others) >= 0 && sizeof...(others) <= 13 && + (sizeof...(others) == 13 || (sizeof...(others) % 2 == 0)), + "max support 6 pair of name-value args and 1 extra info, if number of others is not 13, should be even"); + int ret = common::OB_SUCCESS; + int64_t event_ts = 0; + common::ObSqlString sql; + share::ObDMLSqlSplicer dml; + char ip_buf[common::MAX_IP_ADDR_LENGTH]; + TIMEGUARD_INIT(EVENT_HISTORY, 100_ms, 5_s); \ + if (!inited_) { + ret = common::OB_NOT_INIT; + SHARE_LOG(WARN, "not init", K(ret)); + } else if (NULL == module || NULL == event) { + ret = common::OB_INVALID_ARGUMENT; + SHARE_LOG(WARN, "neither module or event can be NULL", KP(module), KP(event), K(ret)); + } else if (stopped_) { + ret = OB_CANCELED; + SHARE_LOG(WARN, "observer is stopped, cancel add_event", K(sql), K(ret)); + } else if (CLICK_FAIL(gen_event_ts(event_ts))) { + SHARE_LOG(WARN, "gen_event_ts failed", K(ret)); + } else if (CLICK_FAIL(dml.add_gmt_create(event_ts)) || + CLICK_FAIL(dml.add_column("module", module)) || + CLICK_FAIL(dml.add_column("event", event))) { + SHARE_LOG(WARN, "add column failed", K(ret)); + } else if (CLICK_FAIL(sync_add_event_helper_<0>(dml, std::forward(others)...))) {// recursive call + } else if (common::OB_SUCCESS == ret && self_addr_.is_valid()) { + if (is_rootservice_event_history_) { + //Add rs_svr_ip, rs_svr_port to the __all_rootservice_event_history table + (void)self_addr_.ip_to_string(ip_buf, common::MAX_IP_ADDR_LENGTH); + if (CLICK_FAIL(dml.add_column("rs_svr_ip", ip_buf)) + || CLICK_FAIL(dml.add_column("rs_svr_port", self_addr_.get_port()))) { + SHARE_LOG(WARN, "add column failed", K(ret)); + } + } else { + // __all_server_event_history has svr_ip and svr_port, but __all_rootservice_event_history hasn't. + (void)self_addr_.ip_to_string(ip_buf, common::MAX_IP_ADDR_LENGTH); + if (CLICK_FAIL(dml.add_column("svr_ip", ip_buf)) + || CLICK_FAIL(dml.add_column("svr_port", self_addr_.get_port()))) { + SHARE_LOG(WARN, "add column failed", K(ret)); + } + } + } + if (OB_SUCC(ret)) { + if (CLICK_FAIL(dml.splice_insert_sql(event_table_name_, sql))) { + SHARE_LOG(WARN, "splice_insert_sql failed", K(ret)); + } else if (CLICK_FAIL(add_event_to_timer_(sql))) { + SHARE_LOG(WARN, "add_event_to_timer_ failed", K(ret), K(sql)); + } else { + SHARE_LOG(INFO, "add_event_with_retry success", K(ret), K(sql)); + } + } + return ret; +} // recursive begin template int ObEventHistoryTableOperator::sync_add_event_helper_(share::ObDMLSqlSplicer &dml, Name &&name, Value &&value, Rest &&...others) @@ -484,7 +559,7 @@ int ObEventHistoryTableOperator::sync_add_event_helper_(share::ObDMLSqlSplicer & template int ObEventHistoryTableOperator::sync_add_event_helper_(share::ObDMLSqlSplicer &dml, Value &&extra_info) { - static_assert(Floor == 7, "if there is an extra_info column, it must be 13th args in this row, no more, no less"); + static_assert(Floor == 6, "if there is an extra_info column, it must be 13th args in this row, no more, no less"); int ret = OB_SUCCESS; if (OB_FAIL(dml.add_column(names[Floor], extra_info))) { SHARE_LOG(WARN, "add column failed", K(ret), K(Floor)); diff --git a/unittest/logservice/test_log_config_mgr.cpp b/unittest/logservice/test_log_config_mgr.cpp index aeb0474c9..40d2b73aa 100644 --- a/unittest/logservice/test_log_config_mgr.cpp +++ b/unittest/logservice/test_log_config_mgr.cpp @@ -57,6 +57,7 @@ public: mock_log_engine_ = OB_NEW(MockLogEngine, "TestLog"); mock_mode_mgr_ = OB_NEW(MockLogModeMgr, "TestLog"); mock_reconfirm_ = OB_NEW(MockLogReconfirm, "TestLog"); + mock_plugins_ = OB_NEW(LogPlugins, "TestLog"); } ~TestLogConfigMgr() { @@ -66,6 +67,7 @@ public: OB_DELETE(MockLogEngine, "TestLog", mock_log_engine_); OB_DELETE(MockLogModeMgr, "TestLog", mock_mode_mgr_); OB_DELETE(MockLogReconfirm, "TestLog", mock_reconfirm_); + OB_DELETE(LogPlugins, "TestLog", mock_plugins_); } void init_test_log_config_env(const common::ObAddr &self, const LogConfigInfo &config_info, @@ -88,7 +90,7 @@ public: LogConfigMeta config_meta; EXPECT_EQ(OB_SUCCESS, config_meta.generate(init_pid, config_info, config_info, 1, LSN(0), 1)); EXPECT_EQ(OB_SUCCESS, cm.init(1, self, config_meta, mock_log_engine_, mock_sw_, mock_state_mgr_, mock_election_, - mock_mode_mgr_, mock_reconfirm_)); + mock_mode_mgr_, mock_reconfirm_, mock_plugins_)); LogMemberRegionMap region_map; EXPECT_EQ(OB_SUCCESS, region_map.init("localmap", OB_MAX_MEMBER_NUMBER)); for (int i = 0; i < cm.alive_paxos_memberlist_.get_member_number(); ++i) { @@ -105,6 +107,7 @@ public: palf::MockLogEngine *mock_log_engine_; palf::MockLogModeMgr *mock_mode_mgr_; palf::MockLogReconfirm *mock_reconfirm_; + palf::LogPlugins *mock_plugins_; }; TEST_F(TestLogConfigMgr, test_remove_child_is_not_learner) diff --git a/unittest/logservice/test_log_sliding_window.cpp b/unittest/logservice/test_log_sliding_window.cpp index e898bc464..499271063 100644 --- a/unittest/logservice/test_log_sliding_window.cpp +++ b/unittest/logservice/test_log_sliding_window.cpp @@ -80,6 +80,7 @@ public: MockLogEngine mock_log_engine_; MockPalfFSCbWrapper palf_fs_cb_; ObTenantMutilAllocator *alloc_mgr_; + LogPlugins *plugins_; MockPublicLogSlidingWindow log_sw_; char *data_buf_; }; @@ -102,6 +103,7 @@ void TestLogSlidingWindow::SetUp() OB_ASSERT(FALSE); } alloc_mgr_ = new (buf) common::ObTenantMutilAllocator(tenant_id); + plugins_ = new LogPlugins(); data_buf_ = (char*)ob_malloc(64 * 1024 * 1024, attr); // init MTL ObTenantBase tbase(tenant_id); @@ -148,19 +150,19 @@ TEST_F(TestLogSlidingWindow, test_init) gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, NULL, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, NULL, &palf_fs_cb_, NULL, base_info, true)); + &mock_mm_, &mock_mode_mgr_, NULL, &palf_fs_cb_, NULL, plugins_, base_info, true)); EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - NULL, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + NULL, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, NULL, NULL, &palf_fs_cb_, NULL, base_info, true)); + &mock_mm_, NULL, NULL, &palf_fs_cb_, NULL, plugins_, base_info, true)); // init succ EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); // init twice EXPECT_EQ(OB_INIT_TWICE, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); } TEST_F(TestLogSlidingWindow, test_private_func_batch_01) @@ -175,7 +177,7 @@ TEST_F(TestLogSlidingWindow, test_private_func_batch_01) gen_default_palf_base_info_(base_info); // init succ EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); log_id = 10 + PALF_SLIDING_WINDOW_SIZE; EXPECT_EQ(false, log_sw_.can_receive_larger_log_(log_id)); EXPECT_EQ(false, log_sw_.leader_can_submit_larger_log_(log_id)); @@ -194,7 +196,7 @@ TEST_F(TestLogSlidingWindow, test_to_follower_pending) gen_default_palf_base_info_(base_info); // init succ EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); char *buf = data_buf_; int64_t buf_len = 1 * 1024 * 1024; share::SCN ref_scn; @@ -220,8 +222,10 @@ TEST_F(TestLogSlidingWindow, test_fetch_log) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); // init succ + MockLocCb cb; + EXPECT_EQ(OB_SUCCESS, plugins_->add_plugin(static_cast(&cb))); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); prev_lsn.val_ = 1; EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.try_fetch_log(fetch_log_type, prev_lsn, fetch_start_lsn, fetch_start_log_id)); EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.try_fetch_log_for_reconfirm(dest, fetch_end_lsn, is_fetched)); @@ -244,7 +248,7 @@ TEST_F(TestLogSlidingWindow, test_report_log_task_trace) gen_default_palf_base_info_(base_info); // init succ EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); EXPECT_EQ(OB_SUCCESS, log_sw_.report_log_task_trace(1)); char *buf = data_buf_; int64_t buf_len = 2 * 1024 * 1024; @@ -260,26 +264,10 @@ TEST_F(TestLogSlidingWindow, test_report_log_task_trace) TEST_F(TestLogSlidingWindow, test_set_location_cache_cb) { MockLocCb cb; - EXPECT_EQ(OB_NOT_INIT, log_sw_.set_location_cache_cb(NULL)); - PalfBaseInfo base_info; - gen_default_palf_base_info_(base_info); - // init succ - EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); - EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.set_location_cache_cb(NULL)); - EXPECT_EQ(OB_SUCCESS, log_sw_.set_location_cache_cb(&cb)); - EXPECT_EQ(OB_NOT_SUPPORTED, log_sw_.set_location_cache_cb(&cb)); -} - -TEST_F(TestLogSlidingWindow, test_reset_location_cache_cb) -{ - EXPECT_EQ(OB_NOT_INIT, log_sw_.reset_location_cache_cb()); - PalfBaseInfo base_info; - gen_default_palf_base_info_(base_info); - // init succ - EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); - EXPECT_EQ(OB_SUCCESS, log_sw_.reset_location_cache_cb()); + MockLocCb *null_cb = NULL; + EXPECT_EQ(OB_INVALID_ARGUMENT, plugins_->add_plugin(static_cast(null_cb))); + EXPECT_EQ(OB_SUCCESS, plugins_->add_plugin(static_cast(&cb))); + EXPECT_EQ(OB_OP_NOT_ALLOW, plugins_->add_plugin(static_cast(&cb))); } TEST_F(TestLogSlidingWindow, test_submit_log) @@ -295,7 +283,7 @@ TEST_F(TestLogSlidingWindow, test_submit_log) share::SCN scn; EXPECT_EQ(OB_NOT_INIT, log_sw_.submit_log(buf, buf_len, ref_scn, lsn, scn)); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.submit_log(NULL, buf_len, ref_scn, lsn, scn)); buf_len = 0; EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.submit_log(buf, buf_len, ref_scn, lsn, scn)); @@ -327,7 +315,7 @@ TEST_F(TestLogSlidingWindow, test_submit_group_log) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); mock_state_mgr_.mock_proposal_id_ = 100; LSN lsn(10); EXPECT_EQ(OB_INVALID_ARGUMENT, log_sw_.submit_group_log(lsn, NULL, 1024)); @@ -386,7 +374,7 @@ TEST_F(TestLogSlidingWindow, test_receive_log) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); char *buf = data_buf_; int64_t buf_len = 2 * 1024 * 1024; @@ -549,7 +537,7 @@ TEST_F(TestLogSlidingWindow, test_after_flush_log) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); int64_t curr_proposal_id = 10; // set default config meta @@ -612,7 +600,7 @@ TEST_F(TestLogSlidingWindow, test_truncate_log) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); int64_t curr_proposal_id = 10; mock_state_mgr_.mock_proposal_id_ = curr_proposal_id; @@ -720,7 +708,7 @@ TEST_F(TestLogSlidingWindow, test_ack_log) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); int64_t curr_proposal_id = 10; mock_state_mgr_.mock_proposal_id_ = curr_proposal_id; log_sw_.self_ = self_; @@ -778,7 +766,7 @@ TEST_F(TestLogSlidingWindow, test_truncate_for_rebuild) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); int64_t curr_proposal_id = 10; mock_state_mgr_.mock_proposal_id_ = curr_proposal_id; @@ -889,7 +877,7 @@ TEST_F(TestLogSlidingWindow, test_append_disk_log) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); int64_t curr_proposal_id = 10; mock_state_mgr_.mock_proposal_id_ = curr_proposal_id; // generate new group entry @@ -1011,7 +999,7 @@ TEST_F(TestLogSlidingWindow, test_group_entry_truncate) PalfBaseInfo base_info; gen_default_palf_base_info_(base_info); EXPECT_EQ(OB_SUCCESS, log_sw_.init(palf_id_, self_, &mock_state_mgr_, - &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, base_info, true)); + &mock_mm_, &mock_mode_mgr_, &mock_log_engine_, &palf_fs_cb_, alloc_mgr_, plugins_, base_info, true)); int64_t curr_proposal_id = 10; mock_state_mgr_.mock_proposal_id_ = curr_proposal_id; // generate new group entry diff --git a/unittest/logservice/test_log_state_mgr.cpp b/unittest/logservice/test_log_state_mgr.cpp index 3a7d03bc5..ee197b8a1 100644 --- a/unittest/logservice/test_log_state_mgr.cpp +++ b/unittest/logservice/test_log_state_mgr.cpp @@ -53,6 +53,7 @@ public: MockLogConfigMgr mock_config_mgr_; MockLogModeMgr mock_mode_mgr_; MockPalfRoleChangeCbWrapper mock_role_change_cb_; + LogPlugins plugins_; LogStateMgr state_mgr_; }; @@ -74,19 +75,19 @@ void TestLogStateMgr::TearDown() TEST_F(TestLogStateMgr, test_init) { EXPECT_EQ(OB_INVALID_ARGUMENT, state_mgr_.init(palf_id_, self_, log_prepare_meta_, log_replica_property_meta_, NULL, &mock_sw_, - &mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_)); + &mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_, &plugins_)); EXPECT_EQ(OB_INVALID_ARGUMENT, state_mgr_.init(palf_id_, self_, log_prepare_meta_, log_replica_property_meta_, &mock_election_, NULL, - &mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_)); + &mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_, &plugins_)); EXPECT_EQ(OB_SUCCESS, state_mgr_.init(palf_id_, self_, log_prepare_meta_, log_replica_property_meta_, &mock_election_, &mock_sw_, - &mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_)); + &mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_, &plugins_)); EXPECT_EQ(OB_INIT_TWICE, state_mgr_.init(palf_id_, self_, log_prepare_meta_, log_replica_property_meta_, &mock_election_, &mock_sw_, - &mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_)); + &mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_, &plugins_)); } TEST_F(TestLogStateMgr, replay_to_leader_active) { EXPECT_EQ(OB_SUCCESS, state_mgr_.init(palf_id_, self_, log_prepare_meta_, log_replica_property_meta_, &mock_election_, &mock_sw_, - &mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_)); + &mock_reconfirm_, &mock_log_engine_, &mock_config_mgr_, &mock_mode_mgr_, &mock_role_change_cb_, &plugins_)); // set default config meta ObMemberList default_mlist; default_mlist.add_server(self_);