Report key events of palf

This commit is contained in:
obdev 2023-05-16 09:41:27 +00:00 committed by ob-robot
parent e1ac52296d
commit 6ff52a2fd8
14 changed files with 726 additions and 35 deletions

View File

@ -17,6 +17,7 @@
#include "lib/net/ob_addr.h"
#include "lib/utility/ob_print_utils.h"
#include "lib/ob_define.h"
#include "lib/string/ob_sql_string.h"
#include "common/ob_region.h"
#include "lib/json/ob_yson.h"
@ -68,6 +69,19 @@ inline bool operator<(const ObMember &lhs, const ObMember &rhs)
return lhs.server_ < rhs.server_;
}
inline int member_to_string(const common::ObMember &member, ObSqlString &member_buf)
{
int ret = OB_SUCCESS;
member_buf.reset();
char ip_port[MAX_IP_PORT_LENGTH];
if (OB_FAIL(member.get_server().ip_port_to_string(ip_port, sizeof(ip_port)))) {
COMMON_LOG(WARN, "convert server to string failed", K(ret), K(member));
} else if (OB_FAIL(member_buf.append_fmt("%.*s:%ld", static_cast<int>(sizeof(ip_port)), ip_port, member.get_timestamp()))) {
COMMON_LOG(WARN, "failed to append ip_port to string", K(ret), K(member));
}
return ret;
}
class ObReplicaMember : public ObMember
{
public:

View File

@ -16,6 +16,7 @@
#include "lib/ob_define.h"
#include "lib/utility/ob_unify_serialize.h"
#include "common/ob_member.h"
namespace oceanbase
{
namespace common
@ -66,6 +67,34 @@ inline bool is_valid_replica_num(const int64_t replica_num)
typedef ObMemberListBase<OB_MAX_MEMBER_NUMBER> ObMemberList;
typedef ObSEArray<ObReplicaMember, OB_MAX_CHILD_MEMBER_NUMBER> ObChildReplicaList;
inline int member_list_to_string(const common::ObMemberList &member_list, ObSqlString &member_list_buf)
{
int ret = OB_SUCCESS;
member_list_buf.reset();
if (0 > member_list.get_member_number()) {
ret = OB_INVALID_ARGUMENT;
COMMON_LOG(WARN, "invalid argument", K(ret), "member count", member_list.get_member_number());
} else {
bool need_comma = false;
char ip_port[MAX_IP_PORT_LENGTH];
for (int64_t i = 0; OB_SUCC(ret) && i < member_list.get_member_number(); i++) {
ObMember member;
member_list.get_member_by_index(i, member);
if (OB_FAIL(member.get_server().ip_port_to_string(ip_port, sizeof(ip_port)))) {
COMMON_LOG(WARN, "convert server to string failed", K(ret), K(member));
} else if (need_comma && OB_FAIL(member_list_buf.append(","))) {
COMMON_LOG(WARN, "failed to append comma to string", K(ret));
} else if (OB_FAIL(member_list_buf.append_fmt("%.*s:%ld", static_cast<int>(sizeof(ip_port)), ip_port, member.get_timestamp()))) {
COMMON_LOG(WARN, "failed to append ip_port to string", K(ret), K(member));
} else {
need_comma = true;
}
}
COMMON_LOG(INFO, "member_list_to_string success", K(member_list), K(member_list_buf));
}
return ret;
}
} // namespace common
} // namespace oceanbase

View File

@ -71,6 +71,7 @@ public:
ObILogAllocator *alloc_mgr = log_engine_->alloc_mgr_;
LogRpc *log_rpc = log_engine_->log_net_service_.log_rpc_;
LogIOWorker *log_io_worker = log_engine_->log_io_worker_;
LogPlugins *plugins = log_engine_->plugins_;
LogEngine log_engine;
ILogBlockPool *log_block_pool = log_engine_->log_storage_.block_mgr_.log_block_pool_;
if (OB_FAIL(log_engine.load(leader_.palf_handle_impl_->palf_id_,
@ -80,6 +81,7 @@ public:
&(leader_.palf_handle_impl_->hot_cache_),
log_rpc,
log_io_worker,
plugins,
entry_header,
palf_epoch_,
is_integrity,

View File

@ -20,6 +20,235 @@ namespace logservice
#define LOG_MONITOR_EVENT_FMT_PREFIX "LOG", type_to_string_(event), "TENANT_ID", mtl_id, "LS_ID", palf_id
// =========== PALF Event Reporting ===========
int ObLogMonitor::record_set_initial_member_list_event(const int64_t palf_id,
const int64_t replica_num,
const char *member_list,
const char *extra_info)
{
int ret = OB_SUCCESS;
const int64_t mtl_id = MTL_ID();
const EventType event = EventType::SET_INITIAL_MEMBER_LIST;
if (OB_NOT_NULL(extra_info)) {
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX,
"MEMBER_LIST", member_list,
"REPLICA_NUM", replica_num,
"", NULL,
"", NULL,
extra_info);
} else {
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX,
"MEMBER_LIST", member_list,
"REPLICA_NUM", replica_num);
}
return ret;
}
int ObLogMonitor::record_election_leader_change_event(const int64_t palf_id, const common::ObAddr &dest_addr)
{
int ret = OB_SUCCESS;
const int64_t mtl_id = MTL_ID();
const EventType event = EventType::ELECTION_LEADER_CHANGE;
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX, "LEADER", dest_addr);
return ret;
}
int ObLogMonitor::record_reconfiguration_event(const char *sub_event,
const int64_t palf_id,
const palf::LogConfigVersion& config_version,
const int64_t prev_replica_num,
const int64_t curr_replica_num,
const char* extra_info)
{
int ret = OB_SUCCESS;
const int64_t mtl_id = MTL_ID();
const EventType event = EventType::RECONFIGURATION;
const int64_t MAX_BUF_LEN = 50;
char event_str[MAX_BUF_LEN] = {'\0'};
if (0 >= snprintf(event_str, MAX_BUF_LEN, "%s:%s", type_to_string_(event), sub_event)) {
ret = OB_ERR_UNEXPECTED;
CLOG_LOG(ERROR, "snprintf failed", KR(ret), K(event), K(sub_event));
} else if (OB_NOT_NULL(extra_info)) {
SERVER_EVENT_ADD_WITH_RETRY("LOG", event_str,
"TENANT_ID", mtl_id,
"LS_ID", palf_id,
"CONFIG_VERSION", config_version,
"PREV_REPLICA_NUM", prev_replica_num,
"CURR_REPLICA_NUM", curr_replica_num,
"", NULL,
extra_info);
} else {
SERVER_EVENT_ADD_WITH_RETRY("LOG", event_str,
"TENANT_ID", mtl_id,
"LS_ID", palf_id,
"CONFIG_VERSION", config_version,
"PREV_REPLICA_NUM", prev_replica_num,
"CURR_REPLICA_NUM", curr_replica_num);
}
return ret;
}
int ObLogMonitor::record_replica_type_change_event(const int64_t palf_id,
const palf::LogConfigVersion& config_version,
const char *prev_replica_type,
const char *curr_replica_type,
const char *extra_info)
{
int ret = OB_SUCCESS;
const int64_t mtl_id = MTL_ID();
const EventType event = EventType::REPLICA_TYPE_TRANSITION;
if (OB_NOT_NULL(extra_info)) {
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX,
"CONFIG_VERSION", config_version,
"PREV_REPLICA_TYPE", prev_replica_type,
"CURR_REPLICA_TYPE", curr_replica_type,
"", NULL,
extra_info);
} else {
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX,
"CONFIG_VERSION", config_version,
"PREV_REPLICA_TYPE", prev_replica_type,
"CURR_REPLICA_TYPE", curr_replica_type);
}
return ret;
}
int ObLogMonitor::record_access_mode_change_event(const int64_t palf_id,
const int64_t prev_mode_version,
const int64_t curr_mode_verion,
const palf::AccessMode& prev_access_mode,
const palf::AccessMode& curr_access_mode,
const char *extra_info)
{
int ret = OB_SUCCESS;
const int64_t mtl_id = MTL_ID();
const EventType event = EventType::ACCESS_MODE_TRANSITION;
const int64_t MAX_BUF_LEN = 32;
char prev_access_mode_str[MAX_BUF_LEN] = {'\0'};
char curr_access_mode_str[MAX_BUF_LEN] = {'\0'};
if (OB_FAIL(palf::access_mode_to_string(prev_access_mode, prev_access_mode_str, sizeof(prev_access_mode_str)))) {
PALF_LOG(WARN, "access_mode_to_string failed", K(prev_access_mode));
} else if (OB_FAIL(palf::access_mode_to_string(curr_access_mode, curr_access_mode_str, sizeof(curr_access_mode_str)))) {
PALF_LOG(WARN, "access_mode_to_string failed", K(prev_access_mode));
} else if (OB_NOT_NULL(extra_info)) {
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX,
"PREV_MODE_VERSION", prev_mode_version,
"CURR_MODE_VERSION", curr_mode_verion,
"PREV_ACCESS_MODE", prev_access_mode_str,
"CURR_ACCESS_MODE", curr_access_mode_str,
extra_info);
} else {
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX,
"PREV_MODE_VERSION", prev_mode_version,
"CURR_MODE_VERSION", curr_mode_verion,
"PREV_ACCESS_MODE", prev_access_mode_str,
"CURR_ACCESS_MODE", curr_access_mode_str);
}
return ret;
}
int ObLogMonitor::record_set_base_lsn_event(const int64_t palf_id, const palf::LSN &new_base_lsn)
{
int ret = OB_SUCCESS;
const int64_t mtl_id = MTL_ID();
const EventType event = EventType::SET_BASE_LSN;
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX,
"NEW_BASE_LSN", new_base_lsn);
return ret;
}
int ObLogMonitor::record_enable_sync_event(const int64_t palf_id)
{
int ret = OB_SUCCESS;
const int64_t mtl_id = MTL_ID();
const EventType event = EventType::ENABLE_SYNC;
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX);
return ret;
}
int ObLogMonitor::record_disable_sync_event(const int64_t palf_id)
{
int ret = OB_SUCCESS;
const int64_t mtl_id = MTL_ID();
const EventType event = EventType::DISABLE_SYNC;
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX);
return ret;
}
int ObLogMonitor::record_enable_vote_event(const int64_t palf_id)
{
int ret = OB_SUCCESS;
const int64_t mtl_id = MTL_ID();
const EventType event = EventType::ENABLE_VOTE;
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX);
return ret;
}
int ObLogMonitor::record_disable_vote_event(const int64_t palf_id)
{
int ret = OB_SUCCESS;
const int64_t mtl_id = MTL_ID();
const EventType event = EventType::DISABLE_VOTE;
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX);
return ret;
}
int ObLogMonitor::record_advance_base_info_event(const int64_t palf_id, const palf::PalfBaseInfo &palf_base_info)
{
int ret = OB_SUCCESS;
const int64_t mtl_id = MTL_ID();
const EventType event = EventType::ADVANCE_BASE_INFO;
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX,
"PALF_BASE_INFO", palf_base_info);
return ret;
}
int ObLogMonitor::record_rebuild_event(const int64_t palf_id,
const common::ObAddr &server,
const palf::LSN &base_lsn)
{
int ret = OB_SUCCESS;
const int64_t mtl_id = MTL_ID();
const EventType event = EventType::REBUILD;
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX,
"SOURCE_SERVER", server,
"BASE_LSN", base_lsn);
return ret;
}
int ObLogMonitor::record_flashback_event(const int64_t palf_id,
const int64_t mode_version,
const share::SCN &flashback_scn,
const share::SCN &curr_end_scn,
const share::SCN &curr_max_scn)
{
int ret = OB_SUCCESS;
const int64_t mtl_id = MTL_ID();
const EventType event = EventType::FLASHBACK;
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX,
"MODE_VERSION", mode_version,
"FLASHBACK_SCN", flashback_scn,
"CURR_END_SCN", curr_end_scn,
"CURR_MAX_SCN", curr_max_scn);
return ret;
}
int ObLogMonitor::record_truncate_event(const int64_t palf_id,
const palf::LSN &lsn,
const int64_t min_block_id,
const int64_t max_block_id,
const int64_t truncate_end_block_id)
{
int ret = OB_SUCCESS;
const int64_t mtl_id = MTL_ID();
const EventType event = EventType::TRUNCATE;
SERVER_EVENT_ADD_WITH_RETRY(LOG_MONITOR_EVENT_FMT_PREFIX,
"LSN", lsn,
"MIN_BLOCK_ID", min_block_id,
"MAX_BLOCK_ID", max_block_id,
"TRUNCATE_END_BLOCK_ID", truncate_end_block_id);
return ret;
}
int ObLogMonitor::record_role_change_event(const int64_t palf_id,
const common::ObRole &prev_role,
const palf::ObReplicaState &prev_state,

View File

@ -27,6 +27,47 @@ public:
virtual ~ObLogMonitor() { }
public:
// =========== PALF Event Reporting ===========
int record_set_initial_member_list_event(const int64_t palf_id,
const int64_t replica_num,
const char *member_list = NULL,
const char *extra_info = NULL) override final;
int record_election_leader_change_event(const int64_t palf_id, const common::ObAddr &dest_addr) override final;
int record_reconfiguration_event(const char *sub_event,
const int64_t palf_id,
const palf::LogConfigVersion& config_version,
const int64_t prev_replica_num,
const int64_t curr_replica_num,
const char *extra_info = NULL) override final;
int record_replica_type_change_event(const int64_t palf_id,
const palf::LogConfigVersion& config_version,
const char *prev_replica_type,
const char *curr_replica_type,
const char *extra_info = NULL) override final;
int record_access_mode_change_event(const int64_t palf_id,
const int64_t prev_mode_version,
const int64_t curr_mode_verion,
const palf::AccessMode& prev_access_mode,
const palf::AccessMode& curr_access_mode,
const char *extra_info = NULL) override final;
int record_set_base_lsn_event(const int64_t palf_id, const palf::LSN &new_base_lsn) override final;
int record_enable_sync_event(const int64_t palf_id) override final;
int record_disable_sync_event(const int64_t palf_id) override final;
int record_enable_vote_event(const int64_t palf_id) override final;
int record_disable_vote_event(const int64_t palf_id) override final;
int record_advance_base_info_event(const int64_t palf_id, const palf::PalfBaseInfo &palf_base_info) override final;
int record_rebuild_event(const int64_t palf_id,
const common::ObAddr &server,
const palf::LSN &base_lsn) override final;
int record_flashback_event(const int64_t palf_id,
const int64_t mode_version,
const share::SCN &flashback_scn,
const share::SCN &curr_end_scn,
const share::SCN &curr_max_scn) override final;
int record_truncate_event(const int64_t palf_id,
const palf::LSN &lsn,
const int64_t min_block_id,
const int64_t max_block_id,
const int64_t truncate_end_block_id) override final;
int record_role_change_event(const int64_t palf_id,
const common::ObRole &prev_role,
const palf::ObReplicaState &prev_state,
@ -44,7 +85,21 @@ private:
UNKNOWN = 0,
DEGRADE,
UPGRADE,
SET_INITIAL_MEMBER_LIST,
ELECTION_LEADER_CHANGE,
ROLE_TRANSITION,
RECONFIGURATION,
REPLICA_TYPE_TRANSITION,
ACCESS_MODE_TRANSITION,
SET_BASE_LSN,
ENABLE_SYNC,
DISABLE_SYNC,
ENABLE_VOTE,
DISABLE_VOTE,
ADVANCE_BASE_INFO,
REBUILD,
FLASHBACK,
TRUNCATE
};
const char *type_to_string_(const EventType &event) const
@ -54,8 +109,33 @@ private:
{
CHECK_LOG_EVENT_TYPE_STR(DEGRADE);
CHECK_LOG_EVENT_TYPE_STR(UPGRADE);
case (EventType::SET_INITIAL_MEMBER_LIST):
return "SET INITIAL MEMBER LIST";
case (EventType::ELECTION_LEADER_CHANGE):
return "ELECTION LEADER CHANGE";
case (EventType::ROLE_TRANSITION):
return "ROLE TRANSITION";
case (EventType::RECONFIGURATION):
return "RECONFIGURATION";
case (EventType::REPLICA_TYPE_TRANSITION):
return "REPLICA TYPE TRANSITION";
case (EventType::ACCESS_MODE_TRANSITION):
return "ACCESS MODE TRANSITION";
case (EventType::SET_BASE_LSN):
return "SET BASE LSN";
case (EventType::ENABLE_SYNC):
return "ENABLE SYNC";
case (EventType::DISABLE_SYNC):
return "DISABLE SYNC";
case (EventType::ENABLE_VOTE):
return "ENABLE VOTE";
case (EventType::DISABLE_VOTE):
return "DISABLE VOTE";
case (EventType::ADVANCE_BASE_INFO):
return "ADVANCE BASE INFO";
CHECK_LOG_EVENT_TYPE_STR(REBUILD);
CHECK_LOG_EVENT_TYPE_STR(FLASHBACK);
CHECK_LOG_EVENT_TYPE_STR(TRUNCATE);
default:
return "UNKNOWN";
}

View File

@ -70,6 +70,31 @@ enum LogConfigChangeType
FORCE_SINGLE_MEMBER,
};
inline const char *LogConfigChangeType2Str(const LogConfigChangeType state)
{
#define CHECK_LOG_CONFIG_TYPE_STR(x) case(LogConfigChangeType::x): return #x
switch(state)
{
CHECK_LOG_CONFIG_TYPE_STR(CHANGE_REPLICA_NUM);
CHECK_LOG_CONFIG_TYPE_STR(ADD_MEMBER);
CHECK_LOG_CONFIG_TYPE_STR(ADD_ARB_MEMBER);
CHECK_LOG_CONFIG_TYPE_STR(REMOVE_MEMBER);
CHECK_LOG_CONFIG_TYPE_STR(REMOVE_ARB_MEMBER);
CHECK_LOG_CONFIG_TYPE_STR(ADD_MEMBER_AND_NUM);
CHECK_LOG_CONFIG_TYPE_STR(REMOVE_MEMBER_AND_NUM);
CHECK_LOG_CONFIG_TYPE_STR(ADD_LEARNER);
CHECK_LOG_CONFIG_TYPE_STR(REMOVE_LEARNER);
CHECK_LOG_CONFIG_TYPE_STR(SWITCH_LEARNER_TO_ACCEPTOR);
CHECK_LOG_CONFIG_TYPE_STR(SWITCH_ACCEPTOR_TO_LEARNER);
CHECK_LOG_CONFIG_TYPE_STR(DEGRADE_ACCEPTOR_TO_LEARNER);
CHECK_LOG_CONFIG_TYPE_STR(UPGRADE_LEARNER_TO_ACCEPTOR);
CHECK_LOG_CONFIG_TYPE_STR(STARTWORKING);
default:
return "Invalid";
}
#undef CHECK_LOG_CONFIG_TYPE_STR
}
typedef common::ObArrayHashMap<common::ObAddr, common::ObRegion> LogMemberRegionMap;
inline bool is_add_log_sync_member_list(const LogConfigChangeType type)
@ -167,32 +192,8 @@ public:
}
bool is_valid() const;
void reset();
const char *Type2Str(const LogConfigChangeType state) const
{
#define CHECK_LOG_CONFIG_TYPE_STR(x) case(LogConfigChangeType::x): return #x
switch(state)
{
CHECK_LOG_CONFIG_TYPE_STR(CHANGE_REPLICA_NUM);
CHECK_LOG_CONFIG_TYPE_STR(ADD_MEMBER);
CHECK_LOG_CONFIG_TYPE_STR(ADD_ARB_MEMBER);
CHECK_LOG_CONFIG_TYPE_STR(REMOVE_MEMBER);
CHECK_LOG_CONFIG_TYPE_STR(REMOVE_ARB_MEMBER);
CHECK_LOG_CONFIG_TYPE_STR(ADD_MEMBER_AND_NUM);
CHECK_LOG_CONFIG_TYPE_STR(REMOVE_MEMBER_AND_NUM);
CHECK_LOG_CONFIG_TYPE_STR(ADD_LEARNER);
CHECK_LOG_CONFIG_TYPE_STR(REMOVE_LEARNER);
CHECK_LOG_CONFIG_TYPE_STR(SWITCH_LEARNER_TO_ACCEPTOR);
CHECK_LOG_CONFIG_TYPE_STR(SWITCH_ACCEPTOR_TO_LEARNER);
CHECK_LOG_CONFIG_TYPE_STR(DEGRADE_ACCEPTOR_TO_LEARNER);
CHECK_LOG_CONFIG_TYPE_STR(UPGRADE_LEARNER_TO_ACCEPTOR);
CHECK_LOG_CONFIG_TYPE_STR(STARTWORKING);
default:
return "Invalid";
}
#undef CHECK_LOG_CONFIG_TYPE_STR
}
TO_STRING_KV(K_(server), K_(curr_member_list), K_(curr_replica_num), K_(new_replica_num),
K_(config_version), K_(ref_scn), "type", Type2Str(type_));
K_(config_version), K_(ref_scn), "type", LogConfigChangeType2Str(type_));
common::ObMember server_;
common::ObMemberList curr_member_list_;
int64_t curr_replica_num_;

View File

@ -50,6 +50,7 @@ LogEngine::LogEngine() :
log_net_service_(),
alloc_mgr_(NULL),
log_io_worker_(NULL),
plugins_(NULL),
palf_id_(INVALID_PALF_ID),
palf_epoch_(-1),
last_purge_throttling_ts_(OB_INVALID_TIMESTAMP),
@ -88,6 +89,7 @@ int LogEngine::init(const int64_t palf_id,
LogHotCache *hot_cache,
LogRpc *log_rpc,
LogIOWorker *log_io_worker,
LogPlugins *plugins,
const int64_t palf_epoch,
const int64_t log_storage_block_size,
const int64_t log_meta_storage_block_ize)
@ -104,7 +106,7 @@ int LogEngine::init(const int64_t palf_id,
ret = OB_INIT_TWICE;
PALF_LOG(ERROR, "LogEngine has inited!!!", K(ret), K(palf_id));
} else if (false == is_valid_palf_id(palf_id) || OB_ISNULL(base_dir) || OB_ISNULL(alloc_mgr)
|| OB_ISNULL(log_rpc) || OB_ISNULL(log_io_worker)) {
|| OB_ISNULL(log_rpc) || OB_ISNULL(log_io_worker) || OB_ISNULL(plugins)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR,
"Invalid argument!!!",
@ -114,7 +116,8 @@ int LogEngine::init(const int64_t palf_id,
K(log_meta),
K(hot_cache),
K(alloc_mgr),
K(log_io_worker));
K(log_io_worker),
K(plugins));
// NB: Nowday, LSN is strongly dependent on physical block,
} else if (OB_FAIL(log_meta_storage_.init(base_dir,
"meta",
@ -125,6 +128,7 @@ int LogEngine::init(const int64_t palf_id,
LOG_DIO_ALIGNED_BUF_SIZE_META,
log_meta_storage_update_manifest_cb,
log_block_pool,
plugins,
NULL /*set hot_cache to NULL for meta storage*/))) {
PALF_LOG(ERROR, "LogMetaStorage init failed", K(ret), K(palf_id), K(base_dir));
} else if(0 != log_storage_block_size
@ -137,6 +141,7 @@ int LogEngine::init(const int64_t palf_id,
LOG_DIO_ALIGNED_BUF_SIZE_REDO,
log_storage_update_manifest_cb,
log_block_pool,
plugins,
hot_cache))) {
PALF_LOG(ERROR, "LogStorage init failed!!!", K(ret), K(palf_id), K(base_dir), K(log_meta));
} else if (OB_FAIL(log_net_service_.init(palf_id, log_rpc))) {
@ -148,6 +153,7 @@ int LogEngine::init(const int64_t palf_id,
log_meta_ = log_meta;
alloc_mgr_ = alloc_mgr;
log_io_worker_ = log_io_worker;
plugins_ = plugins;
palf_epoch_ = palf_epoch;
base_lsn_for_block_gc_ = log_meta.get_log_snapshot_meta().base_lsn_;
is_inited_ = true;
@ -185,6 +191,7 @@ int LogEngine::load(const int64_t palf_id,
LogHotCache *hot_cache,
LogRpc *log_rpc,
LogIOWorker *log_io_worker,
LogPlugins *plugins,
LogGroupEntryHeader &entry_header,
const int64_t palf_epoch,
bool &is_integrity,
@ -232,6 +239,7 @@ int LogEngine::load(const int64_t palf_id,
LOG_DIO_ALIGNED_BUF_SIZE_META,
log_meta_storage_update_manifest_cb,
log_block_pool,
plugins,
NULL, /*set hot_cache to NULL for meta storage*/
unused_meta_entry_header,
last_meta_entry_start_lsn))) {
@ -244,7 +252,7 @@ int LogEngine::load(const int64_t palf_id,
log_meta_.get_log_snapshot_meta().base_lsn_, palf_id,
log_storage_block_size, LOG_DIO_ALIGN_SIZE,
LOG_DIO_ALIGNED_BUF_SIZE_REDO,
log_storage_update_manifest_cb, log_block_pool,
log_storage_update_manifest_cb, log_block_pool, plugins,
hot_cache, entry_header, last_group_entry_header_lsn)))) {
PALF_LOG(ERROR, "LogStorage load failed", K(ret), K(palf_id), K(base_dir));
} else if (FALSE_IT(guard.click("load log_storage"))

View File

@ -103,6 +103,7 @@ public:
LogHotCache *hot_cache,
LogRpc *log_rpc,
LogIOWorker *log_io_worker,
LogPlugins *plugins,
const int64_t palf_epoch,
const int64_t log_storage_block_size,
const int64_t log_meta_storage_block_size);
@ -115,6 +116,7 @@ public:
LogHotCache *hot_cache,
LogRpc *log_rpc,
LogIOWorker *log_io_worker,
LogPlugins *plugins,
LogGroupEntryHeader &entry_header,
const int64_t palf_epoch,
bool &is_integrity,
@ -481,6 +483,7 @@ private:
LogNetService log_net_service_;
common::ObILogAllocator *alloc_mgr_;
LogIOWorker *log_io_worker_;
LogPlugins *plugins_;
// Except for LogNetService, this field is just only used for debug
int64_t palf_id_;
// palf_epoch_ is used for identifying an uniq palf instance.

View File

@ -36,6 +36,7 @@ LogStorage::LogStorage() :
tail_info_lock_(common::ObLatchIds::PALF_LOG_ENGINE_LOCK),
delete_block_lock_(common::ObLatchIds::PALF_LOG_ENGINE_LOCK),
update_manifest_cb_(),
plugins_(NULL),
hot_cache_(NULL),
is_inited_(false)
{}
@ -49,7 +50,7 @@ int LogStorage::init(const char *base_dir, const char *sub_dir, const LSN &base_
const int64_t palf_id, const int64_t logical_block_size,
const int64_t align_size, const int64_t align_buf_size,
const UpdateManifestCallback &update_manifest_cb,
ILogBlockPool *log_block_pool,
ILogBlockPool *log_block_pool, LogPlugins *plugins,
LogHotCache *hot_cache)
{
int ret = OB_SUCCESS;
@ -64,6 +65,7 @@ int LogStorage::init(const char *base_dir, const char *sub_dir, const LSN &base_
align_buf_size,
update_manifest_cb,
log_block_pool,
plugins,
hot_cache))) {
PALF_LOG(WARN, "LogStorage do_init_ failed", K(ret), K(base_dir), K(sub_dir), K(palf_id));
} else {
@ -390,9 +392,10 @@ int LogStorage::truncate_prefix_blocks(const LSN &lsn)
reset_log_tail_for_last_block_(lsn, false);
block_mgr_.reset(lsn_2_block(lsn, logical_block_size_));
}
PALF_EVENT("LogStorage truncate_prefix_blocks finihsed", palf_id_, K(ret), KPC(this),
PALF_EVENT("truncate_prefix_blocks success", palf_id_, K(ret), KPC(this),
K(lsn), K(block_id), K(min_block_id), K(max_block_id),
K(truncate_end_block_id));
plugins_->record_truncate_event(palf_id_, lsn, min_block_id, max_block_id, truncate_end_block_id);
return ret;
}
@ -593,6 +596,7 @@ int LogStorage::do_init_(const char *base_dir,
const int64_t align_buf_size,
const UpdateManifestCallback &update_manifest_cb,
ILogBlockPool *log_block_pool,
LogPlugins *plugins,
LogHotCache *hot_cache)
{
int ret = OB_SUCCESS;
@ -620,6 +624,7 @@ int LogStorage::do_init_(const char *base_dir,
palf_id_ = palf_id;
logical_block_size_ = logical_block_size;
update_manifest_cb_ = update_manifest_cb;
plugins_ = plugins;
hot_cache_ = hot_cache;
is_inited_ = true;
}

View File

@ -22,6 +22,7 @@
#include "log_writer_utils.h" // LogWriteBuf
#include "lsn.h" // LSN
#include "palf_iterator.h" // PalfIteraor
#include "palf_callback_wrapper.h"
namespace oceanbase
{
@ -52,6 +53,7 @@ public:
const int64_t align_buf_size,
const UpdateManifestCallback &update_manifest_cb,
ILogBlockPool *log_block_pool,
LogPlugins *plugins,
LogHotCache *hot_cache);
template <class EntryHeaderType>
@ -64,6 +66,7 @@ public:
const int64_t align_buf_size,
const UpdateManifestCallback &update_manifest_cb,
ILogBlockPool *log_block_pool,
LogPlugins *plugins,
LogHotCache *hot_cache,
EntryHeaderType &entry_header,
LSN &lsn);
@ -130,6 +133,7 @@ private:
const int64_t align_buf_size,
const UpdateManifestCallback &update_manifest_cb,
ILogBlockPool *log_block_pool,
LogPlugins *plugins,
LogHotCache *hot_cache);
// @ret val:
// OB_SUCCESS
@ -190,6 +194,7 @@ private:
mutable ObSpinLock tail_info_lock_;
mutable ObSpinLock delete_block_lock_;
UpdateManifestCallback update_manifest_cb_;
LogPlugins *plugins_;
char block_header_serialize_buf_[MAX_INFO_BLOCK_SIZE];
LogHotCache *hot_cache_;
bool is_inited_;
@ -211,6 +216,7 @@ int LogStorage::load(const char *base_dir,
const int64_t align_buf_size,
const UpdateManifestCallback &update_manifest_cb,
ILogBlockPool *log_block_pool,
LogPlugins *plugins,
LogHotCache *hot_cache,
EntryHeaderType &entry_header,
LSN &lsn)
@ -231,6 +237,7 @@ int LogStorage::load(const char *base_dir,
align_buf_size,
update_manifest_cb,
log_block_pool,
plugins,
hot_cache))) {
PALF_LOG(WARN, "LogStorage do_init_ failed", K(ret), K(base_dir), K(sub_dir), K(palf_id));
// NB: if there is no valid data on disk, no need to load last block

View File

@ -17,6 +17,7 @@
#include "lib/utility/ob_macro_utils.h"
#include "lib/list/ob_dlink_node.h"
#include "lib/utility/ob_print_utils.h"
#include "log_meta_info.h"
#include "lsn.h"
namespace oceanbase
{
@ -59,12 +60,54 @@ class PalfMonitorCb
{
public:
// record events
virtual int record_set_initial_member_list_event(const int64_t palf_id,
const int64_t replica_num,
const char *member_list = NULL,
const char *extra_info = NULL) = 0;
virtual int record_election_leader_change_event(const int64_t palf_id, const common::ObAddr &dest_addr) = 0;
virtual int record_reconfiguration_event(const char *sub_event,
const int64_t palf_id,
const LogConfigVersion& config_version,
const int64_t prev_replica_num,
const int64_t curr_replica_num,
const char *extra_info = NULL) = 0;
virtual int record_replica_type_change_event(const int64_t palf_id,
const LogConfigVersion& config_version,
const char *prev_replica_type,
const char *curr_replica_type,
const char *extra_info = NULL) = 0;
virtual int record_access_mode_change_event(const int64_t palf_id,
const int64_t prev_mode_version,
const int64_t curr_mode_verion,
const AccessMode& prev_access_mode,
const AccessMode& curr_access_mode,
const char *extra_info = NULL) = 0;
virtual int record_set_base_lsn_event(const int64_t palf_id, const LSN &new_base_lsn) = 0;
virtual int record_enable_sync_event(const int64_t palf_id) = 0;
virtual int record_disable_sync_event(const int64_t palf_id) = 0;
virtual int record_enable_vote_event(const int64_t palf_id) = 0;
virtual int record_disable_vote_event(const int64_t palf_id) = 0;
virtual int record_advance_base_info_event(const int64_t palf_id, const PalfBaseInfo &palf_base_info) = 0;
virtual int record_rebuild_event(const int64_t palf_id,
const common::ObAddr &server,
const LSN &base_lsn) = 0;
virtual int record_flashback_event(const int64_t palf_id,
const int64_t mode_version,
const share::SCN &flashback_scn,
const share::SCN &curr_end_scn,
const share::SCN &curr_max_scn) = 0;
virtual int record_truncate_event(const int64_t palf_id,
const LSN &lsn,
const int64_t min_block_id,
const int64_t max_block_id,
const int64_t truncate_end_block_id) = 0;
virtual int record_role_change_event(const int64_t palf_id,
const common::ObRole &prev_role,
const palf::ObReplicaState &prev_state,
const common::ObRole &curr_role,
const palf::ObReplicaState &curr_state,
const char *extra_info = "") = 0;
const char *extra_info = NULL) = 0;
// performance statistic
virtual int add_log_write_stat(const int64_t palf_id, const int64_t log_write_size) = 0;
};

View File

@ -114,6 +114,20 @@ public:
PALF_PLUGINS_DELEGATE_PTR(loc_cb_, loc_lock_, nonblock_get_leader);
PALF_PLUGINS_DELEGATE_PTR(loc_cb_, loc_lock_, nonblock_renew_leader);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_set_initial_member_list_event);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_election_leader_change_event);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_reconfiguration_event);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_replica_type_change_event);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_access_mode_change_event);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_set_base_lsn_event);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_enable_sync_event);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_disable_sync_event);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_enable_vote_event);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_disable_vote_event);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_advance_base_info_event);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_rebuild_event);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_flashback_event);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_truncate_event);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, record_role_change_event);
PALF_PLUGINS_DELEGATE_PTR(palf_monitor_, palf_monitor_lock_, add_log_write_stat);

View File

@ -141,7 +141,7 @@ int PalfHandleImpl::init(const int64_t palf_id,
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "error unexpected", K(ret), K(palf_id));
} else if (OB_FAIL(log_engine_.init(palf_id, log_dir, log_meta, alloc_mgr, log_block_pool, &hot_cache_, \
log_rpc, log_io_worker, palf_epoch, PALF_BLOCK_SIZE, PALF_META_BLOCK_SIZE))) {
log_rpc, log_io_worker, &plugins_, palf_epoch, PALF_BLOCK_SIZE, PALF_META_BLOCK_SIZE))) {
PALF_LOG(WARN, "LogEngine init failed", K(ret), K(palf_id), K(log_dir), K(alloc_mgr),
K(log_rpc), K(log_io_worker));
} else if (OB_FAIL(do_init_mem_(palf_id, palf_base_info, log_meta, log_dir, self, fetch_log_engine,
@ -194,7 +194,7 @@ int PalfHandleImpl::load(const int64_t palf_id,
PALF_LOG(ERROR, "Invalid argument!!!", K(ret), K(palf_id), K(log_dir), K(alloc_mgr),
K(log_rpc), K(log_io_worker));
} else if (OB_FAIL(log_engine_.load(palf_id, log_dir, alloc_mgr, log_block_pool, &hot_cache_, log_rpc,
log_io_worker, entry_header, palf_epoch, is_integrity, PALF_BLOCK_SIZE, PALF_META_BLOCK_SIZE))) {
log_io_worker, &plugins_, entry_header, palf_epoch, is_integrity, PALF_BLOCK_SIZE, PALF_META_BLOCK_SIZE))) {
PALF_LOG(WARN, "LogEngine load failed", K(ret), K(palf_id));
// NB: when 'entry_header' is invalid, means that there is no data on disk, and set max_committed_end_lsn
// to 'base_lsn_', we will generate default PalfBaseInfo or get it from LogSnapshotMeta(rebuild).
@ -281,6 +281,7 @@ int PalfHandleImpl::set_initial_member_list(
} else {
PALF_EVENT("set_initial_member_list success", palf_id_, K(ret), K(member_list),
K(learner_list), K(paxos_replica_num));
report_set_initial_member_list_(paxos_replica_num, member_list);
}
}
return ret;
@ -488,6 +489,7 @@ int PalfHandleImpl::change_leader_to(const common::ObAddr &dest_addr)
PALF_LOG(WARN, "election can not change leader", K(ret), KPC(this), K(dest_addr));
} else {
PALF_EVENT("election leader will be changed", palf_id_, K(ret), KPC(this), K(dest_addr));
plugins_.record_election_leader_change_event(palf_id_, dest_addr);
}
return ret;
}
@ -614,10 +616,13 @@ int PalfHandleImpl::force_set_as_single_replica()
const int64_t new_replica_num = 1;
LogConfigChangeArgs args(member, new_replica_num, FORCE_SINGLE_MEMBER);
const int64_t timeout_us = 10 * 1000 * 1000L;
int64_t prev_replica_num;
(void) config_mgr_.get_replica_num(prev_replica_num);
if (OB_FAIL(one_stage_config_change_(args, timeout_us))) {
PALF_LOG(WARN, "one_stage_config_change_ failed", KR(ret), KPC(this), K(member), K(new_replica_num));
} else {
PALF_EVENT("force_set_as_single_replica success", palf_id_, KR(ret), KPC(this), K(member), K(new_replica_num));
report_force_set_as_single_replica_(prev_replica_num, new_replica_num, member);
}
}
return ret;
@ -648,6 +653,7 @@ int PalfHandleImpl::change_replica_num(
} else {
PALF_EVENT("change_replica_num success", palf_id_, KR(ret), KPC(this), K(member_list),
K(curr_replica_num), K(new_replica_num));
report_change_replica_num_(curr_replica_num, new_replica_num, member_list);
}
}
return ret;
@ -659,6 +665,7 @@ int PalfHandleImpl::add_member(
const int64_t timeout_us)
{
int ret = OB_SUCCESS;
int64_t prev_replica_num;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(WARN, "PalfHandleImpl not init", KR(ret), KPC(this));
@ -667,12 +674,15 @@ int PalfHandleImpl::add_member(
timeout_us <= 0) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "invalid argument", KR(ret), KPC(this), K(member), K(new_replica_num), K(timeout_us));
} else if (OB_FAIL(config_mgr_.get_replica_num(prev_replica_num))) {
PALF_LOG(WARN, "get prev_replica_num failed", KR(ret), KPC(this));
} else {
LogConfigChangeArgs args(member, new_replica_num, ADD_MEMBER);
if (OB_FAIL(one_stage_config_change_(args, timeout_us))) {
PALF_LOG(WARN, "add_member failed", KR(ret), KPC(this), K(member), K(new_replica_num));
} else {
PALF_EVENT("add_member success", palf_id_, KR(ret), KPC(this), K(member), K(new_replica_num));
report_add_member_(prev_replica_num, new_replica_num, member);
}
}
return ret;
@ -684,6 +694,7 @@ int PalfHandleImpl::remove_member(
const int64_t timeout_us)
{
int ret = OB_SUCCESS;
int64_t prev_replica_num;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
PALF_LOG(WARN, "PalfHandleImpl not init", KR(ret), KPC(this));
@ -692,12 +703,15 @@ int PalfHandleImpl::remove_member(
timeout_us <= 0) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(WARN, "invalid argument", KR(ret), KPC(this), K(member), K(new_replica_num), K(timeout_us));
} else if (OB_FAIL(config_mgr_.get_replica_num(prev_replica_num))) {
PALF_LOG(WARN, "get prev_replica_num failed", KR(ret), KPC(this));
} else {
LogConfigChangeArgs args(member, new_replica_num, REMOVE_MEMBER);
if (OB_FAIL(one_stage_config_change_(args, timeout_us))) {
PALF_LOG(WARN, "remove_member failed", KR(ret), KPC(this), K(member), K(new_replica_num));
} else {
PALF_EVENT("remove_member success", palf_id_, KR(ret), KPC(this), K(member), K(new_replica_num));
report_remove_member_(prev_replica_num, new_replica_num, member);
}
}
return ret;
@ -739,6 +753,7 @@ int PalfHandleImpl::replace_member(
K(removed_member), K(timeout_us), K(old_member_list), K(old_replica_num),
K(curr_member_list), K(curr_replica_num),
"leader replace_member cost time(us)", common::ObTimeUtility::current_time() - begin_time_us);
report_replace_member_(added_member, removed_member, curr_member_list);
}
}
return ret;
@ -757,6 +772,7 @@ int PalfHandleImpl::add_learner(const common::ObMember &added_learner, const int
PALF_LOG(WARN, "add_learner failed", KR(ret), KPC(this), K(args), K(timeout_us));
} else {
PALF_EVENT("add_learner success", palf_id_, K(ret), KPC(this), K(args), K(timeout_us));
report_add_learner_(added_learner);
}
}
return ret;
@ -775,6 +791,7 @@ int PalfHandleImpl::remove_learner(const common::ObMember &removed_learner, cons
PALF_LOG(WARN, "remove_learner failed", KR(ret), KPC(this), K(args), K(timeout_us));
} else {
PALF_EVENT("remove_learner success", palf_id_, K(ret), KPC(this), K(args), K(timeout_us));
report_remove_learner_(removed_learner);
}
}
return ret;
@ -795,6 +812,7 @@ int PalfHandleImpl::switch_learner_to_acceptor(const common::ObMember &learner,
PALF_LOG(WARN, "switch_learner_to_acceptor failed", KR(ret), KPC(this), K(args), K(timeout_us));
} else {
PALF_EVENT("switch_learner_to_acceptor success", palf_id_, K(ret), KPC(this), K(args), K(timeout_us));
report_switch_learner_to_acceptor_(learner);
}
}
return ret;
@ -815,6 +833,7 @@ int PalfHandleImpl::switch_acceptor_to_learner(const common::ObMember &member,
PALF_LOG(WARN, "switch_acceptor_to_learner failed", KR(ret), KPC(this), K(args), K(timeout_us));
} else {
PALF_EVENT("switch_acceptor_to_learner success", palf_id_, K(ret), KPC(this), K(args), K(timeout_us));
report_switch_acceptor_to_learner_(member);
}
}
return ret;
@ -828,6 +847,7 @@ int PalfHandleImpl::change_access_mode(const int64_t proposal_id,
{
int ret = OB_SUCCESS;
const int64_t curr_time_us = common::ObTimeUtility::current_time();
AccessMode prev_access_mode;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (INVALID_PROPOSAL_ID == proposal_id ||
@ -850,6 +870,9 @@ int PalfHandleImpl::change_access_mode(const int64_t proposal_id,
ret = OB_EAGAIN;
PALF_LOG(WARN, "reconfiguration is running, try again", K(ret), K_(palf_id),
K_(self), K(proposal_id), K(access_mode), K(ref_scn));
} else if (OB_FAIL(mode_mgr_.get_access_mode(prev_access_mode))) {
PALF_LOG(WARN, "get old change_access mode failed", K(ret), K_(palf_id),
K_(self), K(proposal_id),K(access_mode), K(ref_scn));
} else {
PALF_EVENT("start change_access_mode", palf_id_, K(ret), KPC(this),
K(proposal_id), K(access_mode), K(ref_scn), K_(sw));
@ -885,6 +908,12 @@ int PalfHandleImpl::change_access_mode(const int64_t proposal_id,
if (OB_SUCCESS != (tmp_ret = role_change_cb_wrpper_.on_role_change(palf_id_))) {
PALF_LOG(WARN, "on_role_change failed", K(tmp_ret), K_(palf_id), K_(self));
}
PALF_EVENT("change_access_mode success", palf_id_, K(ret), KPC(this),
K(proposal_id), K(access_mode), K(ref_scn), K(time_guard), K_(sw));
int64_t curr_mode_version;
mode_mgr_.get_mode_version(curr_mode_version);
PALF_REPORT_INFO_KV(K(proposal_id), K(ref_scn));
plugins_.record_access_mode_change_event(palf_id_, mode_version, curr_mode_version, prev_access_mode, access_mode, EXTRA_INFOS);
}
if (OB_EAGAIN == ret) {
ob_usleep(1000);
@ -1212,6 +1241,7 @@ int PalfHandleImpl::set_base_lsn(
} else {
PALF_EVENT("set_base_lsn success", palf_id_, K(ret), K_(palf_id), K(self_), K(lsn),
K(log_snapshot_meta), K(new_base_lsn), K(flush_meta_cb_ctx));
plugins_.record_set_base_lsn_event(palf_id_, new_base_lsn);
}
}
return ret;
@ -1220,7 +1250,6 @@ int PalfHandleImpl::set_base_lsn(
bool PalfHandleImpl::is_sync_enabled() const
{
bool bool_ret = false;
RLockGuard guard(lock_);
if (IS_NOT_INIT) {
} else {
bool_ret = state_mgr_.is_sync_enabled();
@ -1232,12 +1261,16 @@ int PalfHandleImpl::enable_sync()
{
int ret = OB_SUCCESS;
WLockGuard guard(lock_);
bool is_sync_enabled = state_mgr_.is_sync_enabled();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (is_sync_enabled) {
PALF_LOG(INFO, "sync has been enabled", KPC(this));
} else if (OB_FAIL(state_mgr_.enable_sync())) {
PALF_LOG(WARN, "enable_sync failed", K(ret), KPC(this));
} else {
PALF_EVENT("enable_sync success", palf_id_, K(ret), KPC(this));
plugins_.record_enable_sync_event(palf_id_);
}
return ret;
}
@ -1246,12 +1279,16 @@ int PalfHandleImpl::disable_sync()
{
int ret = OB_SUCCESS;
WLockGuard guard(lock_);
bool is_sync_enabled = state_mgr_.is_sync_enabled();
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (!is_sync_enabled) {
PALF_LOG(INFO, "sync has been disabled", KPC(this));
} else if (OB_FAIL(state_mgr_.disable_sync())) {
PALF_LOG(WARN, "disable_sync failed", K(ret), KPC(this));
} else {
PALF_EVENT("disable_sync success", palf_id_, K(ret), KPC(this));
plugins_.record_disable_sync_event(palf_id_);
}
return ret;
}
@ -1302,6 +1339,8 @@ int PalfHandleImpl::disable_vote(const bool need_check_log_missing)
}
} else {
PALF_EVENT("disable_vote success", palf_id_, KPC(this), K(need_check_log_missing));
PALF_REPORT_INFO_KV(K(need_check_log_missing));
plugins_.record_disable_vote_event(palf_id_);
}
}
}
@ -1327,6 +1366,7 @@ int PalfHandleImpl::enable_vote()
}
} else {
PALF_EVENT("enable_vote success", palf_id_, KPC(this));
plugins_.record_enable_vote_event(palf_id_);
}
return ret;
}
@ -1444,8 +1484,8 @@ int PalfHandleImpl::advance_base_info(const PalfBaseInfo &palf_base_info, const
PALF_LOG(INFO, "sw_ truncate_for_rebuild success", K(ret), KPC(this), K(palf_base_info));
}
}
PALF_EVENT("advance_base_info finished", palf_id_, KPC(this), K(palf_base_info), K(time_guard));
plugins_.record_advance_base_info_event(palf_id_, palf_base_info);
return ret;
}
@ -2939,6 +2979,7 @@ int PalfHandleImpl::handle_notify_rebuild_req(const common::ObAddr &server,
PALF_LOG(WARN, "on_rebuild failed", K(ret), K(server), K(base_lsn));
} else {
PALF_EVENT("on_rebuild success", palf_id_, K(ret), K_(self), K(server), K(base_lsn));
plugins_.record_rebuild_event(palf_id_, server, base_lsn);
}
// Whether on_rebuild returns OB_SUCCESS or not, set value for rebuild_base_lsn_
SpinLockGuard rebuild_guard(last_rebuild_meta_info_lock_);
@ -3888,6 +3929,7 @@ int PalfHandleImpl::flashback(const int64_t mode_version,
FLOG_INFO("[END FLASHBACK PALF_DUMP]", K(ret), K_(palf_id), K_(self), "[SlidingWindow]", sw_,
"[StateMgr]", state_mgr_, "[ConfigMgr]", config_mgr_, "[ModeMgr]", mode_mgr_,
"[LogEngine]", log_engine_, "[Reconfirm]", reconfirm_);
plugins_.record_flashback_event(palf_id_, mode_version, flashback_scn, curr_end_scn, curr_max_scn);
break;
} else {
usleep(100*1000);
@ -4347,6 +4389,205 @@ int PalfHandleImpl::get_leader_max_scn_(SCN &max_scn, LSN &end_lsn)
return ret;
}
void PalfHandleImpl::report_set_initial_member_list_(const int64_t paxos_replica_num, const common::ObMemberList &member_list)
{
ObSqlString member_list_buf;
(void) member_list_to_string(member_list, member_list_buf);
plugins_.record_set_initial_member_list_event(palf_id_, paxos_replica_num, member_list_buf.ptr());
}
void PalfHandleImpl::report_set_initial_member_list_with_arb_(const int64_t paxos_replica_num, const common::ObMemberList &member_list, const common::ObMember &arb_member)
{
PALF_REPORT_INFO_KV(K(arb_member));
ObSqlString member_list_buf;
(void) member_list_to_string(member_list, member_list_buf);
plugins_.record_set_initial_member_list_event(palf_id_, paxos_replica_num, member_list_buf.ptr(), EXTRA_INFOS);
}
void PalfHandleImpl::report_force_set_as_single_replica_(const int64_t prev_replica_num, const int64_t curr_replica_num, const ObMember &member)
{
LogConfigVersion config_version;
(void) config_mgr_.get_config_version(config_version);
ObSqlString member_buf;
member_to_string(member, member_buf);
PALF_REPORT_INFO_KV("member", member_buf);
plugins_.record_reconfiguration_event(LogConfigChangeType2Str(LogConfigChangeType::FORCE_SINGLE_MEMBER),
palf_id_, config_version, prev_replica_num, curr_replica_num, EXTRA_INFOS);
}
void PalfHandleImpl::report_change_replica_num_(const int64_t prev_replica_num, const int64_t curr_replica_num, const common::ObMemberList &member_list)
{
LogConfigVersion config_version;
(void) config_mgr_.get_config_version(config_version);
ObSqlString member_list_buf;
(void) member_list_to_string(member_list, member_list_buf);
PALF_REPORT_INFO_KV("member_list", member_list_buf);
plugins_.record_reconfiguration_event(LogConfigChangeType2Str(LogConfigChangeType::CHANGE_REPLICA_NUM),
palf_id_, config_version, prev_replica_num, curr_replica_num, EXTRA_INFOS);
}
void PalfHandleImpl::report_add_member_(const int64_t prev_replica_num, const int64_t curr_replica_num, const common::ObMember &added_member)
{
LogConfigVersion config_version;
(void) config_mgr_.get_config_version(config_version);
ObMemberList curr_member_list;
int64_t replica_num;
config_mgr_.get_curr_member_list(curr_member_list, replica_num);
ObSqlString member_list_buf;
(void) member_list_to_string(curr_member_list, member_list_buf);
ObSqlString member_buf;
member_to_string(added_member, member_buf);
PALF_REPORT_INFO_KV(
"added_member", member_buf,
"member_list", member_list_buf);
plugins_.record_reconfiguration_event(LogConfigChangeType2Str(LogConfigChangeType::ADD_MEMBER),
palf_id_, config_version, prev_replica_num, curr_replica_num, EXTRA_INFOS);
}
void PalfHandleImpl::report_remove_member_(const int64_t prev_replica_num, const int64_t curr_replica_num, const common::ObMember &removed_member)
{
LogConfigVersion config_version;
(void) config_mgr_.get_config_version(config_version);
ObMemberList curr_member_list;
int64_t replica_num;
config_mgr_.get_curr_member_list(curr_member_list, replica_num);
ObSqlString member_list_buf;
(void) member_list_to_string(curr_member_list, member_list_buf);
ObSqlString member_buf;
member_to_string(removed_member, member_buf);
PALF_REPORT_INFO_KV(
"removed_member", member_buf,
"member_list", member_list_buf);
plugins_.record_reconfiguration_event(LogConfigChangeType2Str(LogConfigChangeType::REMOVE_MEMBER),
palf_id_, config_version, prev_replica_num, curr_replica_num, EXTRA_INFOS);
}
void PalfHandleImpl::report_replace_member_(const common::ObMember &added_member, const common::ObMember &removed_member, const common::ObMemberList &member_list)
{
LogConfigVersion config_version;
(void) config_mgr_.get_config_version(config_version);
ObSqlString member_list_buf;
(void) member_list_to_string(member_list, member_list_buf);
ObSqlString added_member_buf;
ObSqlString removed_member_buf;
member_to_string(added_member, added_member_buf);
member_to_string(removed_member, removed_member_buf);
PALF_REPORT_INFO_KV(
"added_member", added_member_buf,
"removed_member", removed_member_buf,
"member_list", member_list_buf);
int64_t curr_replica_num;
(void) config_mgr_.get_replica_num(curr_replica_num);
plugins_.record_reconfiguration_event("REPLACE_MEMBER", palf_id_, config_version, curr_replica_num, curr_replica_num, EXTRA_INFOS);
}
void PalfHandleImpl::report_add_learner_(const common::ObMember &added_learner)
{
LogConfigVersion config_version;
(void) config_mgr_.get_config_version(config_version);
common::ObMemberList curr_member_list;
int64_t curr_replica_num;
(void) config_mgr_.get_curr_member_list(curr_member_list, curr_replica_num);
ObSqlString member_list_buf;
(void) member_list_to_string(curr_member_list, member_list_buf);
ObSqlString member_buf;
member_to_string(added_learner, member_buf);
PALF_REPORT_INFO_KV(
"added_learner", member_buf,
"member_list", member_list_buf);
plugins_.record_reconfiguration_event(LogConfigChangeType2Str(LogConfigChangeType::ADD_LEARNER),
palf_id_, config_version, curr_replica_num, curr_replica_num, EXTRA_INFOS);
}
void PalfHandleImpl::report_remove_learner_(const common::ObMember &removed_learner)
{
LogConfigVersion config_version;
(void) config_mgr_.get_config_version(config_version);
common::ObMemberList curr_member_list;
int64_t curr_replica_num;
(void) config_mgr_.get_curr_member_list(curr_member_list, curr_replica_num);
ObSqlString member_list_buf;
(void) member_list_to_string(curr_member_list, member_list_buf);
ObSqlString member_buf;
member_to_string(removed_learner, member_buf);
PALF_REPORT_INFO_KV(
"removed_learner", member_buf,
"member_list", member_list_buf);
plugins_.record_reconfiguration_event(LogConfigChangeType2Str(LogConfigChangeType::REMOVE_LEARNER),
palf_id_, config_version, curr_replica_num, curr_replica_num, EXTRA_INFOS);
}
void PalfHandleImpl::report_add_arb_member_(const common::ObMember &added_arb_member)
{
LogConfigVersion config_version;
(void) config_mgr_.get_config_version(config_version);
common::ObMemberList curr_member_list;
int64_t curr_replica_num;
(void) config_mgr_.get_curr_member_list(curr_member_list, curr_replica_num);
ObSqlString member_list_buf;
(void) member_list_to_string(curr_member_list, member_list_buf);
char member_buf_[MAX_SINGLE_MEMBER_LENGTH] = {'\0'};
ObSqlString member_buf;
member_to_string(added_arb_member, member_buf);
PALF_REPORT_INFO_KV(
"added_arb_member", member_buf,
"member_list", member_list_buf);
plugins_.record_reconfiguration_event(LogConfigChangeType2Str(LogConfigChangeType::ADD_ARB_MEMBER),
palf_id_, config_version, curr_replica_num, curr_replica_num, EXTRA_INFOS);
}
void PalfHandleImpl::report_remove_arb_member_(const common::ObMember &removed_arb_member)
{
LogConfigVersion config_version;
(void) config_mgr_.get_config_version(config_version);
common::ObMemberList curr_member_list;
int64_t curr_replica_num;
(void) config_mgr_.get_curr_member_list(curr_member_list, curr_replica_num);
ObSqlString member_list_buf;
(void) member_list_to_string(curr_member_list, member_list_buf);
ObSqlString member_buf;
member_to_string(removed_arb_member, member_buf);
PALF_REPORT_INFO_KV(
"removed_arb_member", member_buf,
"member_list", member_list_buf);
plugins_.record_reconfiguration_event(LogConfigChangeType2Str(LogConfigChangeType::REMOVE_ARB_MEMBER),
palf_id_, config_version, curr_replica_num, curr_replica_num, EXTRA_INFOS);
}
void PalfHandleImpl::report_switch_learner_to_acceptor_(const common::ObMember &learner)
{
LogConfigVersion config_version;
(void) config_mgr_.get_config_version(config_version);
ObMemberList curr_member_list;
int64_t curr_replica_num;
(void) config_mgr_.get_curr_member_list(curr_member_list, curr_replica_num);
ObSqlString member_list_buf;
(void) member_list_to_string(curr_member_list, member_list_buf);
ObSqlString member_buf;
member_to_string(learner, member_buf);
PALF_REPORT_INFO_KV(
"member", member_buf,
"curr_member_list", member_list_buf,
"curr_replica_num", curr_replica_num);
char replica_readonly_name_[common::MAX_REPLICA_TYPE_LENGTH];
char replica_full_name_[common::MAX_REPLICA_TYPE_LENGTH];
replica_type_to_string(ObReplicaType::REPLICA_TYPE_READONLY, replica_readonly_name_, sizeof(replica_readonly_name_));
replica_type_to_string(ObReplicaType::REPLICA_TYPE_FULL, replica_full_name_, sizeof(replica_full_name_));
plugins_.record_replica_type_change_event(palf_id_, config_version, replica_readonly_name_, replica_full_name_, EXTRA_INFOS);
}
void PalfHandleImpl::report_switch_acceptor_to_learner_(const common::ObMember &acceptor)
{
LogConfigVersion config_version;
(void) config_mgr_.get_config_version(config_version);
ObMemberList curr_member_list;
int64_t curr_replica_num;
(void) config_mgr_.get_curr_member_list(curr_member_list, curr_replica_num);
ObSqlString member_list_buf;
(void) member_list_to_string(curr_member_list, member_list_buf);
ObSqlString member_buf;
member_to_string(acceptor, member_buf);
PALF_REPORT_INFO_KV(
"member", member_buf,
"curr_member_list", member_list_buf,
"curr_replica_num", curr_replica_num);
char replica_readonly_name_[common::MAX_REPLICA_TYPE_LENGTH];
char replica_full_name_[common::MAX_REPLICA_TYPE_LENGTH];
replica_type_to_string(ObReplicaType::REPLICA_TYPE_READONLY, replica_readonly_name_, sizeof(replica_readonly_name_));
replica_type_to_string(ObReplicaType::REPLICA_TYPE_FULL, replica_full_name_, sizeof(replica_full_name_));
plugins_.record_replica_type_change_event(palf_id_, config_version, replica_full_name_, replica_readonly_name_, EXTRA_INFOS);
}
PalfStat::PalfStat()
: self_(),
palf_id_(INVALID_PALF_ID),

View File

@ -1072,6 +1072,21 @@ private:
int get_leader_max_scn_(SCN &max_scn, LSN &end_lsn);
void gen_rebuild_meta_info_(RebuildMetaInfo &rebuild_meta) const;
void get_last_rebuild_meta_info_(RebuildMetaInfo &rebuild_meta_info) const;
// ======================= report event begin =======================================
void report_set_initial_member_list_(const int64_t paxos_replica_num, const common::ObMemberList &member_list);
void report_set_initial_member_list_with_arb_(const int64_t paxos_replica_num, const common::ObMemberList &member_list, const common::ObMember &arb_member);
void report_force_set_as_single_replica_(const int64_t prev_replica_num, const int64_t curr_replica_num, const ObMember &member);
void report_change_replica_num_(const int64_t prev_replica_num, const int64_t curr_replica_num, const common::ObMemberList &member_list);
void report_add_member_(const int64_t prev_replica_num, const int64_t curr_replica_num, const common::ObMember &added_member);
void report_remove_member_(const int64_t prev_replica_num, const int64_t curr_replica_num, const common::ObMember &removed_member);
void report_replace_member_(const common::ObMember &added_member, const common::ObMember &removed_member, const common::ObMemberList &member_list);
void report_add_learner_(const common::ObMember &added_learner);
void report_remove_learner_(const common::ObMember &removed_learner);
void report_add_arb_member_(const common::ObMember &added_arb_member);
void report_remove_arb_member_(const common::ObMember &removed_arb_member);
void report_switch_learner_to_acceptor_(const common::ObMember &learner);
void report_switch_acceptor_to_learner_(const common::ObMember &acceptor);
// ======================= report event end =======================================
private:
class ElectionMsgSender : public election::ElectionMsgSender
{