Files
oceanbase/mittest/logservice/env/ob_simple_arb_server.cpp

248 lines
8.0 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define private public
#include "logservice/arbserver/ob_arb_srv_network_frame.h"
#include "logservice/arbserver/ob_arb_server_timer.h"
#undef private
#include "ob_simple_arb_server.h"
namespace oceanbase
{
namespace palflite
{
int PalfEnvLiteMgr::get_palf_env_lite(const palflite::PalfEnvKey &key,
PalfEnvLite *&palf_env_lite)
{
// NB:保证arb server的node id为1002
palf_env_lite = NULL;
int ret = OB_SUCCESS;
if (OB_FAIL(palf_env_lite_map_.get(key, palf_env_lite))) {
CLOG_LOG(WARN, "get from map failed", KR(ret), KPC(this), K(key));
} else if (true == palf_env_lite->has_set_deleted()) {
ret = OB_ENTRY_NOT_EXIST;
} else {
}
if (OB_FAIL(ret) && NULL != palf_env_lite) {
palf_env_lite_map_.revert(palf_env_lite);
palf_env_lite = NULL;
}
return ret;
}
}
namespace unittest
{
ObSimpleArbServer::ObSimpleArbServer()
: palf_env_mgr_(),
srv_network_frame_(),
timer_(),
allocator_(1),
is_inited_(false)
{}
ObSimpleArbServer::~ObSimpleArbServer()
{
if (NULL != tenant_base_) {
destroy();
ob_delete(tenant_base_);
tenant_base_ = NULL;
}
CLOG_LOG_RET(WARN, OB_SUCCESS, "reset tenant_base_");
}
int ObSimpleArbServer::simple_init(const std::string &cluster_name,
const common::ObAddr &addr,
const int64_t node_id,
bool is_bootstrap)
{
const std::string logserver_dir = cluster_name + "/port_" + std::to_string(addr.get_port());
ObBaseLogWriterCfg cfg;
std::string clog_dir = logserver_dir + "/clog";
rpc::frame::ObNetOptions opts;
arbserver::ArbNetOptions arb_opts;
opts.rpc_io_cnt_ = 10; // rpc io thread count
opts.high_prio_rpc_io_cnt_ = 0; // high priority io thread count
opts.mysql_io_cnt_ = 0; // mysql io thread count
opts.batch_rpc_io_cnt_ = 0; // batch rpc io thread count
opts.tcp_user_timeout_ = 10 * 1000 * 1000; // 10s
arb_opts.opts_ = opts;
arb_opts.rpc_port_ = static_cast<uint32_t>(addr.get_port());
arb_opts.self_ = addr;
arb_opts.negotiation_enable_ = 0;
arb_opts.mittest_ = true;
int ret = OB_SUCCESS;
tenant_base_ = OB_NEW(ObTenantBase, "TestBase", node_id);
auto malloc = ObMallocAllocator::get_instance();
if (NULL == malloc->get_tenant_ctx_allocator(node_id, 0)) {
malloc->create_and_add_tenant_allocator(node_id);
}
tenant_base_->init();
ObTenantEnv::set_tenant(tenant_base_);
std::vector<std::string> dirs{logserver_dir, clog_dir};
for (auto &dir : dirs) {
if (-1 == mkdir(dir.c_str(), 0777)) {
if (errno == EEXIST) {
ret = OB_SUCCESS;
SERVER_LOG(INFO, "for restart");
} else {
SERVER_LOG(WARN, "mkdir failed", K(errno));
ret = OB_IO_ERROR;
break;
}
}
}
cfg.group_commit_max_item_cnt_ = 1;
cfg.group_commit_min_item_cnt_ = 1;
if (is_bootstrap && OB_FAIL(OB_LOGGER.init(cfg, true))) {
SERVER_LOG(ERROR, "OB_LOGGER init failed");
}
if (OB_FAIL(ret)) {
} else if (is_bootstrap && OB_FAIL(blacklist_.init(addr))) {
CLOG_LOG(WARN, "blacklist_ init failed", K(ret), K(opts));
} else if (is_bootstrap && OB_FAIL(srv_network_frame_.init(arb_opts, &palf_env_mgr_))) {
CLOG_LOG(WARN, "ObArbSrvNetWorkFrame init failed", K(ret), K(opts));
} else if (OB_FAIL(palf_env_mgr_.init(logserver_dir.c_str(), addr,
srv_network_frame_.get_req_transport()))) {
CLOG_LOG(WARN, "PalfEnvLiteMgr init failed", K(ret), K(addr), K(clog_dir.c_str()));
} else if (OB_FAIL(mock_election_map_.init())) {
SERVER_LOG(ERROR, "mock_election_map_ init fail", K(ret));
} else if (OB_FAIL(timer_.init(lib::TGDefIDs::ArbServerTimer, &palf_env_mgr_))) {
CLOG_LOG(WARN, "timer init failed", K(ret), K(addr), K(clog_dir.c_str()));
} else {
filter_ = [this, &ret](const ObAddr &src) -> bool {
if (blacklist_.need_filter_packet_by_blacklist(src)) {
SERVER_LOG(WARN, "need_filter_packet_by_blacklist", K(src));
return true;
} else if (blacklist_.need_drop_by_loss_config(src)) {
SERVER_LOG(WARN, "need_drop_by_loss_config", K(src));
return true;
}
return false;
};
srv_network_frame_.normal_rpc_qhandler_.filter_ = &filter_;
base_dir_ = clog_dir;
self_ = addr;
node_id_ = node_id;
timer_.timer_interval_ = 1000*1000;
is_inited_ = true;
CLOG_LOG(INFO, "simple_arb_server init success", K(node_id), KPC(this));
}
if (OB_FAIL(ret) && OB_INIT_TWICE != ret) {
destroy();
}
return ret;
}
void ObSimpleArbServer::destroy()
{
is_inited_ = false;
srv_network_frame_.destroy();
palf_env_mgr_.destroy();
timer_.destroy();
}
int ObSimpleArbServer::simple_start(const bool is_bootstrat)
{
int ret = OB_SUCCESS;
palflite::PalfEnvKey key(cluster_id_, OB_SERVER_TENANT_ID);
if (OB_FAIL(srv_network_frame_.start())) {
CLOG_LOG(WARN, "start ObArbSrvNetWorkFrame failed", K(ret));
} else if (OB_FAIL(palf_env_mgr_.create_palf_env_lite(key))) {
CLOG_LOG(WARN, "PalfEnvLiteMgr create_palf_env_lite failed", K(ret));
} else if (OB_FAIL(timer_.start())) {
CLOG_LOG(WARN, "timer start failed", K(ret));
} else {
CLOG_LOG(INFO, "start ObSimpleArbServer success", KPC(this));
}
return ret;
}
int ObSimpleArbServer::simple_close(const bool is_shutdown)
{
CLOG_LOG_RET(WARN, OB_SUCCESS, "arb simple_close");
srv_network_frame_.destroy();
palf_env_mgr_.destroy();
timer_.destroy();
return OB_SUCCESS;
}
int ObSimpleArbServer::simple_restart(const std::string &cluster_name,
const int64_t node_idx)
{
int ret = OB_SUCCESS;
srv_network_frame_.deliver_.stop();
palf_env_mgr_.destroy();
timer_.destroy();
if (OB_FAIL(simple_init(cluster_name, self_, node_idx, false))) {
CLOG_LOG(WARN, "simple_init failed", K(ret));
} else if (OB_FAIL(srv_network_frame_.deliver_.start(srv_network_frame_.normal_rpc_qhandler_,
srv_network_frame_.server_rpc_qhandler_))) {
CLOG_LOG(WARN, "start ObArbSrvNetWorkFrame failed", K(ret));
} else if (OB_FAIL(timer_.start())) {
CLOG_LOG(WARN, "timer start failed", K(ret));
}
return ret;
}
int ObSimpleArbServer::create_palf_env_lite(const int64_t tenant_id)
{
int ret = OB_SUCCESS;
palflite::PalfEnvKey key(cluster_id_, tenant_id);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "ObSimpleArbServer not inited", K(ret), K(key));
} else if (OB_FAIL(palf_env_mgr_.create_palf_env_lite(key))) {
CLOG_LOG(WARN, "create_palf_env_lite failed", K(ret), K(key));
} else {
}
return ret;
}
int ObSimpleArbServer::remove_palf_env_lite(const int64_t tenant_id)
{
int ret = OB_SUCCESS;
palflite::PalfEnvKey key(cluster_id_, tenant_id);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "ObSimpleArbServer not inited", K(ret), K(tenant_id));
} else if (OB_FAIL(palf_env_mgr_.remove_palf_env_lite(key))) {
CLOG_LOG(WARN, "remove_palf_env_lite failed", K(ret), K(key));
} else {
}
return ret;
}
int ObSimpleArbServer::get_palf_env_lite(const int64_t tenant_id, PalfEnvLiteGuard &guard)
{
int ret = OB_SUCCESS;
palflite::PalfEnvLite *palf_env_impl = NULL;
palflite::PalfEnvKey key(cluster_id_, tenant_id);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(WARN, "ObSimpleArbServer not inited", K(ret), K(tenant_id));
} else if (OB_FAIL(palf_env_mgr_.get_palf_env_lite(key, palf_env_impl))) {
CLOG_LOG(WARN, "get_palf_env_lite failed", K(ret), K(tenant_id));
} else {
guard.palf_env_mgr_ = &palf_env_mgr_;
guard.palf_env_lite_ = palf_env_impl;
}
return ret;
}
}
}