542 lines
20 KiB
C++
542 lines
20 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#pragma once
|
|
#include "lib/hash/ob_hashset.h"
|
|
#include "lib/hash/ob_hashset.h"
|
|
#include "lib/hash/ob_linear_hash_map.h"
|
|
#include "lib/ob_errno.h"
|
|
#include "lib/thread/ob_simple_thread_pool.h"
|
|
#include "lib/thread/thread_mgr_interface.h"
|
|
#include "lib/signal/ob_signal_worker.h"
|
|
#include "lib/signal/ob_signal_struct.h"
|
|
#include "lib/function/ob_function.h" // ObFunction
|
|
#include "observer/ob_signal_handle.h"
|
|
#include "lib/utility/ob_defer.h"
|
|
#include "rpc/frame/ob_req_deliver.h"
|
|
#include "rpc/ob_request.h"
|
|
#include "rpc/frame/ob_net_easy.h"
|
|
#include "rpc/obrpc/ob_rpc_handler.h"
|
|
#include "rpc/frame/ob_req_transport.h"
|
|
#include "logservice/logrpc/ob_log_rpc_processor.h"
|
|
#include "logservice/logrpc/ob_log_request_handler.h"
|
|
#include "logservice/palf/log_rpc_macros.h"
|
|
#include "logservice/palf/log_rpc_processor.h"
|
|
#include "logservice/palf/palf_env.h"
|
|
#include "logservice/ob_arbitration_service.h"
|
|
#include "mock_election.h"
|
|
#include "mock_ob_locality_manager.h"
|
|
#include "mock_ob_meta_reporter.h"
|
|
#include "lib/net/ob_addr.h"
|
|
#include "share/ob_rpc_struct.h"
|
|
#include "storage/blocksstable/ob_block_sstable_struct.h"
|
|
#include "storage/tx_storage/ob_ls_service.h"
|
|
#include "share/allocator/ob_tenant_mutil_allocator.h"
|
|
#include "storage/tx_storage/ob_ls_map.h"
|
|
#include "storage/tx_storage/ob_ls_service.h"
|
|
#include "logservice/palf/palf_handle_impl.h"
|
|
#include "logservice/ob_log_service.h"
|
|
#include "logservice/ob_server_log_block_mgr.h"
|
|
#include "share/ob_local_device.h"
|
|
#include "share/ob_occam_timer.h"
|
|
#include "share/resource_manager/ob_cgroup_ctrl.h"
|
|
#include "logservice/ob_net_keepalive_adapter.h"
|
|
#include "logservice/leader_coordinator/ob_failure_detector.h"
|
|
#include "logservice/arbserver/arb_tg_helper.h"
|
|
#include <memory>
|
|
#include <map>
|
|
#include "share/ob_tenant_mem_limit_getter.h"
|
|
|
|
namespace oceanbase
|
|
{
|
|
namespace unittest
|
|
{
|
|
class ObLogDeliver;
|
|
}
|
|
|
|
namespace unittest
|
|
{
|
|
using namespace oceanbase;
|
|
using namespace oceanbase::rpc;
|
|
using namespace oceanbase::rpc::frame;
|
|
using namespace oceanbase::obrpc;
|
|
using namespace oceanbase::common;
|
|
using namespace oceanbase::palf;
|
|
using namespace oceanbase::share;
|
|
|
|
class MockNetKeepAliveAdapter : public logservice::IObNetKeepAliveAdapter
|
|
{
|
|
public:
|
|
MockNetKeepAliveAdapter() : log_deliver_(NULL) {}
|
|
~MockNetKeepAliveAdapter() { log_deliver_ = NULL; }
|
|
int init(unittest::ObLogDeliver *log_deliver);
|
|
bool in_black_or_stopped(const common::ObAddr &server) override final;
|
|
bool is_server_stopped(const common::ObAddr &server) override final;
|
|
bool in_black(const common::ObAddr &server) override final;
|
|
int get_last_resp_ts(const common::ObAddr &server, int64_t &last_resp_ts) override final;
|
|
private:
|
|
unittest::ObLogDeliver *log_deliver_;
|
|
};
|
|
uint32_t get_local_addr(const char *dev_name);
|
|
std::string get_local_ip();
|
|
|
|
struct LossConfig
|
|
{
|
|
ObAddr src_;
|
|
int loss_rate_;
|
|
LossConfig()
|
|
: src_(), loss_rate_(0)
|
|
{}
|
|
LossConfig(const ObAddr &src, const int loss_rate)
|
|
: src_(src), loss_rate_(loss_rate)
|
|
{}
|
|
TO_STRING_KV(K_(src), K_(loss_rate));
|
|
};
|
|
|
|
class ObMittestBlacklist {
|
|
public:
|
|
int init(const common::ObAddr &self);
|
|
void block_net(const ObAddr &src);
|
|
void unblock_net(const ObAddr &src);
|
|
void block_pcode(const ObRpcPacketCode &pcode);
|
|
void unblock_pcode(const ObRpcPacketCode &pcode);
|
|
bool need_filter_packet_by_blacklist(const ObAddr &address);
|
|
bool need_filter_packet_by_pcode_blacklist(const ObRpcPacketCode &pcode);
|
|
void set_need_drop_packet(const bool need_drop_packet) { need_drop_packet_ = need_drop_packet; }
|
|
void set_rpc_loss(const ObAddr &src, const int loss_rate);
|
|
void reset_rpc_loss(const ObAddr &src);
|
|
bool need_drop_by_loss_config(const ObAddr &addr);
|
|
void get_loss_config(const ObAddr &src, bool &exist, LossConfig &loss_config);
|
|
TO_STRING_KV(K_(blacklist), K_(rpc_loss_config));
|
|
protected:
|
|
hash::ObHashSet<ObAddr> blacklist_;
|
|
hash::ObHashSet<int64_t> pcode_blacklist_;
|
|
bool need_drop_packet_;
|
|
common::ObSEArray<LossConfig, 4> rpc_loss_config_;
|
|
common::ObAddr self_;
|
|
};
|
|
|
|
class ObSimpleLogServer;
|
|
class ObLooper : public share::ObThreadPool {
|
|
public:
|
|
static constexpr int64_t INTERVAL_US = 1000*1000;
|
|
ObLooper();
|
|
virtual ~ObLooper();
|
|
public:
|
|
int init(ObSimpleLogServer *log_server);
|
|
void destroy();
|
|
void run1();
|
|
private:
|
|
void log_loop_();
|
|
private:
|
|
ObSimpleLogServer *log_server_;
|
|
int64_t run_interval_;
|
|
bool is_inited_;
|
|
private:
|
|
DISALLOW_COPY_AND_ASSIGN(ObLooper);
|
|
};
|
|
|
|
class MockLogGetCkptReqP: public
|
|
obrpc::ObRpcProcessor<obrpc::ObLogServiceRpcProxy::ObRpc<obrpc::OB_LOG_GET_LS_CKPT>>
|
|
{
|
|
public:
|
|
MockLogGetCkptReqP(): ckpt_functor_(NULL), filter_(NULL) {}
|
|
virtual ~MockLogGetCkptReqP() { ckpt_functor_ = NULL; filter_ = NULL; }
|
|
int process()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
const logservice::LogGetCkptReq &req = arg_;
|
|
const common::ObAddr server = req.src_; \
|
|
const share::ObLSID &ls_id = req.ls_id_;
|
|
logservice::LogGetCkptResp &resp = result_;
|
|
if (OB_ISNULL(ckpt_functor_)) {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
CLOG_LOG(ERROR, "ckpt_functor_ is NULL", K(ret), K(req), K(resp));
|
|
} else if (OB_UNLIKELY(NULL != filter_ && true == (*filter_)(server))) { \
|
|
CLOG_LOG(INFO, "need filter this packet", K(req)); \
|
|
} else if (OB_FAIL((*ckpt_functor_)(ls_id, resp.ckpt_scn_, resp.ckpt_lsn_))) {
|
|
CLOG_LOG(ERROR, "ckpt_functor_ failed", K(ret), K(req), K(resp));
|
|
} else {
|
|
CLOG_LOG(INFO, "ckpt_functor_ success", K(ret), K(req), K(resp));
|
|
}
|
|
return ret;
|
|
}
|
|
void set_ckpt_functor(void *functor)
|
|
{
|
|
ckpt_functor_ = reinterpret_cast<ObFunction<int(const share::ObLSID &ls_id, share::SCN &scn, palf::LSN &lsn)> *>(functor);
|
|
}
|
|
void set_filter(void *filter)
|
|
{
|
|
filter_ = reinterpret_cast<ObFunction<bool(const ObAddr &src)> *>(filter);
|
|
}
|
|
private:
|
|
ObFunction<int(const share::ObLSID &ls_id, share::SCN &scn, palf::LSN &lsn)> *ckpt_functor_;
|
|
ObFunction<bool(const ObAddr &src)> *filter_;
|
|
};
|
|
|
|
class ObLogDeliver : public rpc::frame::ObReqDeliver, public lib::TGTaskHandler, public ObMittestBlacklist
|
|
{
|
|
public:
|
|
ObLogDeliver()
|
|
: rpc::frame::ObReqDeliver(),
|
|
log_server_(NULL),
|
|
tg_id_(0),
|
|
is_stopped_(true) {}
|
|
~ObLogDeliver() { destroy(true); }
|
|
int init() override final {return OB_SUCCESS;}
|
|
int init(const common::ObAddr &self,
|
|
const bool is_bootstrap,
|
|
ObSimpleLogServer *log_server);
|
|
void destroy(const bool is_shutdown);
|
|
int deliver(rpc::ObRequest &req);
|
|
int start();
|
|
void stop();
|
|
int wait();
|
|
void handle(void *task);
|
|
|
|
private:
|
|
void init_all_propocessor_();
|
|
typedef std::function<int(ObReqProcessor *&)> Func;
|
|
|
|
Func funcs_[MAX_PCODE];
|
|
template <typename PROCESSOR>
|
|
void register_rpc_propocessor_(int pcode)
|
|
{
|
|
auto func = [](ObReqProcessor *&ptr) -> int {
|
|
int ret = OB_SUCCESS;
|
|
if (NULL == (ptr = OB_NEW(PROCESSOR, "SimpleLogSvr"))) {
|
|
SERVER_LOG(WARN, "allocate memory failed");
|
|
} else if (OB_FAIL(ptr->init())) {
|
|
} else {
|
|
}
|
|
return ret;
|
|
};
|
|
funcs_[pcode] = func;
|
|
SERVER_LOG(INFO, "register_rpc_propocessor_ success", K(pcode));
|
|
}
|
|
int handle_req_(rpc::ObRequest &req);
|
|
private:
|
|
mutable common::RWLock lock_;
|
|
bool is_inited_;
|
|
ObSimpleLogServer *log_server_;
|
|
int tg_id_;
|
|
bool is_stopped_;
|
|
int64_t node_id_;
|
|
};
|
|
|
|
class MockElectionAlloc
|
|
{
|
|
public:
|
|
typedef common::LinkHashNode<palf::LSKey> Node;
|
|
static MockElection *alloc_value() { return NULL; }
|
|
static void free_value(MockElection *mock_election)
|
|
{
|
|
ob_free(mock_election);
|
|
mock_election = NULL;
|
|
}
|
|
static Node *alloc_node(MockElection *val)
|
|
{
|
|
UNUSED(val);
|
|
ObMemAttr attr(1, ObNewModIds::OB_ELECTION);
|
|
Node* node = (Node*)ob_malloc(sizeof(Node), attr);
|
|
new(node) Node();
|
|
return node;
|
|
}
|
|
static void free_node(Node *node)
|
|
{
|
|
node->~Node();
|
|
ob_free(node);
|
|
node = NULL;
|
|
}
|
|
};
|
|
|
|
typedef common::ObLinkHashMap<palf::LSKey, MockElection, MockElectionAlloc> MockElectionMap;
|
|
typedef common::ObLinearHashMap<share::ObLSID, std::pair<SCN, LSN>> MockCkptMap;
|
|
class ObISimpleLogServer
|
|
{
|
|
public:
|
|
static const uint64_t DEFAULT_TENANT_ID = 1002;
|
|
ObISimpleLogServer() {}
|
|
virtual ~ObISimpleLogServer() {}
|
|
virtual bool is_valid() const = 0;
|
|
virtual IPalfEnvImpl *get_palf_env() = 0;
|
|
virtual void revert_palf_env(IPalfEnvImpl *palf_env) = 0;
|
|
virtual const std::string& get_clog_dir() const = 0;
|
|
virtual common::ObAddr get_addr() const = 0;
|
|
virtual ObTenantBase *get_tenant_base() const = 0;
|
|
virtual logservice::ObLogFlashbackService *get_flashback_service() = 0;
|
|
virtual void set_need_drop_packet(const bool need_drop_packet) = 0;
|
|
virtual void block_net(const ObAddr &src) = 0;
|
|
virtual void unblock_net(const ObAddr &src) = 0;
|
|
virtual void block_pcode(const ObRpcPacketCode &pcode) = 0;
|
|
virtual void unblock_pcode(const ObRpcPacketCode &pcode) = 0;
|
|
virtual void set_rpc_loss(const ObAddr &src, const int loss_rate) = 0;
|
|
virtual void reset_rpc_loss(const ObAddr &src) = 0;
|
|
virtual int simple_init(const std::string &cluster_name,
|
|
const common::ObAddr &addr,
|
|
const int64_t node_id,
|
|
ObTenantIOManager *tio_manager,
|
|
LogMemberRegionMap *region_map,
|
|
const bool is_bootstrap) = 0;
|
|
virtual int simple_start(const bool is_bootstrap) = 0;
|
|
virtual int simple_close(const bool is_shutdown) = 0;
|
|
virtual int simple_restart(const std::string &cluster_name,
|
|
const int64_t node_idx,
|
|
ObTenantIOManager *tio_manager) = 0;
|
|
virtual ILogBlockPool *get_block_pool() = 0;
|
|
virtual ObILogAllocator *get_allocator() = 0;
|
|
virtual int update_disk_opts(const PalfDiskOptions &opts) = 0;
|
|
virtual int get_disk_opts(PalfDiskOptions &opts) = 0;
|
|
virtual int get_palf_env(PalfEnv *&palf_env) = 0;
|
|
virtual bool is_arb_server() const {return false;};
|
|
virtual int64_t get_node_id() = 0;
|
|
virtual int create_mock_election(const int64_t palf_id, MockElection *&mock_election) = 0;
|
|
virtual int remove_mock_election(const int64_t palf_id) = 0;
|
|
virtual int set_leader(const int64_t palf_id, const common::ObAddr &leader, const int64_t new_epoch = 0) = 0;
|
|
virtual int update_server_log_disk(const int64_t log_disk_size) = 0;
|
|
virtual int create_ls(const int64_t palf_id,
|
|
const AccessMode &access_mode,
|
|
const PalfBaseInfo &base_info,
|
|
IPalfHandleImpl *&palf_handle_impl) = 0;
|
|
virtual int remove_ls(const int64_t palf_id) = 0;
|
|
virtual MockObLocalityManager *get_locality_manager() = 0;
|
|
DECLARE_PURE_VIRTUAL_TO_STRING;
|
|
};
|
|
|
|
class ObLogMittestTenantBase : public arbserver::ArbTGHelper {
|
|
public:
|
|
ObLogMittestTenantBase(int64_t cluster_id, uint64_t tenant_id, int64_t node_id)
|
|
: ArbTGHelper(cluster_id, tenant_id, node_id) {}
|
|
virtual int pre_run() override final
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (OB_FAIL(arbserver::ArbTGHelper::pre_run())) {
|
|
CLOG_LOG(ERROR, "ArbTGHelper pre_run failed");
|
|
} else if (OB_FAIL(ObTenantBase::pre_run())) {
|
|
CLOG_LOG(ERROR, "ObTenantBase pre_run failed");
|
|
}
|
|
return ret;
|
|
}
|
|
virtual int end_run() override final
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (OB_FAIL(ObTenantBase::end_run())) {
|
|
CLOG_LOG(ERROR, "ObTenantBase pre_run failed");
|
|
} else if (OB_FAIL(arbserver::ArbTGHelper::end_run())) {
|
|
CLOG_LOG(ERROR, "ArbTGHelper pre_run failed");
|
|
}
|
|
return ret;
|
|
}
|
|
};
|
|
|
|
class ObSimpleLogServer : public ObISimpleLogServer
|
|
{
|
|
public:
|
|
ObSimpleLogServer()
|
|
: cluster_id_(1),
|
|
tenant_id_(ObISimpleLogServer::DEFAULT_TENANT_ID), // the tenant in mittest must be 1002, otherwise, arb server can not start service.
|
|
handler_(deliver_),
|
|
transport_(NULL),
|
|
batch_rpc_transport_(NULL),
|
|
high_prio_rpc_transport_(NULL)
|
|
{
|
|
}
|
|
~ObSimpleLogServer()
|
|
{
|
|
if (OB_NOT_NULL(allocator_)) {
|
|
ob_delete(allocator_);
|
|
}
|
|
if (OB_NOT_NULL(io_device_)) {
|
|
ob_delete(io_device_);
|
|
}
|
|
if (OB_NOT_NULL(timer_service_)) {
|
|
ob_delete(timer_service_);
|
|
}
|
|
}
|
|
int simple_init(const std::string &cluster_name,
|
|
const common::ObAddr &addr,
|
|
const int64_t node_id,
|
|
ObTenantIOManager *tio_manager,
|
|
LogMemberRegionMap *region_map,
|
|
const bool is_bootstrap) override final;
|
|
int simple_start(const bool is_bootstrap) override final;
|
|
int simple_close(const bool is_shutdown) override final;
|
|
int simple_restart(const std::string &cluster_name,
|
|
const int64_t node_idx,
|
|
ObTenantIOManager *tio_manager) override final;
|
|
public:
|
|
int64_t get_node_id() {return node_id_;}
|
|
ILogBlockPool *get_block_pool() override final
|
|
{return &log_block_pool_;};
|
|
ObILogAllocator *get_allocator() override final
|
|
{ return allocator_; }
|
|
virtual int update_disk_opts(const PalfDiskOptions &opts) override final;
|
|
virtual int get_disk_opts(PalfDiskOptions &opts) override final
|
|
{
|
|
ObSpinLockGuard guard(log_disk_lock_);
|
|
opts = disk_opts_;
|
|
return OB_SUCCESS;
|
|
}
|
|
virtual int try_resize();
|
|
virtual int get_palf_env(PalfEnv *&palf_env)
|
|
{ palf_env = palf_env_; return OB_SUCCESS;}
|
|
virtual void revert_palf_env(IPalfEnvImpl *palf_env) { UNUSED(palf_env); }
|
|
bool is_valid() const override final {return NULL != palf_env_;}
|
|
IPalfEnvImpl *get_palf_env() override final
|
|
{ return palf_env_->get_palf_env_impl(); }
|
|
const std::string& get_clog_dir() const override final
|
|
{ return clog_dir_; }
|
|
common::ObAddr get_addr() const override final
|
|
{ return addr_; }
|
|
ObTenantBase *get_tenant_base() const override final
|
|
{ return tenant_base_; }
|
|
logservice::ObLogFlashbackService *get_flashback_service() override final
|
|
{ return log_service_.get_flashback_service(); }
|
|
// Nowdat, not support drop packet from specificed address
|
|
void set_need_drop_packet(const bool need_drop_packet) override final
|
|
{ deliver_.set_need_drop_packet(need_drop_packet);}
|
|
void block_net(const ObAddr &src) override final
|
|
{ deliver_.block_net(src); }
|
|
void unblock_net(const ObAddr &src) override final
|
|
{ deliver_.unblock_net(src); }
|
|
void block_pcode(const ObRpcPacketCode &pcode) override final
|
|
{ deliver_.block_pcode(pcode); }
|
|
void unblock_pcode(const ObRpcPacketCode &pcode) override final
|
|
{ deliver_.unblock_pcode(pcode); }
|
|
void set_rpc_loss(const ObAddr &src, const int loss_rate) override final
|
|
{ deliver_.set_rpc_loss(src, loss_rate); }
|
|
void reset_rpc_loss(const ObAddr &src) override final
|
|
{ deliver_.reset_rpc_loss(src); }
|
|
int create_mock_election(const int64_t palf_id, MockElection *&mock_election) override final
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
mock_election = NULL;
|
|
void *buf = NULL;
|
|
ObMemAttr attr(1, ObNewModIds::OB_ELECTION);
|
|
if (OB_ISNULL(buf = ob_malloc(sizeof(MockElection), attr))) {
|
|
ret = OB_ERR_UNEXPECTED;
|
|
SERVER_LOG(ERROR, "ob_malloc failed", K(palf_id));
|
|
} else if (FALSE_IT(mock_election = new (buf) MockElection)) {
|
|
} else if (OB_FAIL(mock_election->init(palf_id, addr_))) {
|
|
SERVER_LOG(WARN, "mock_election->init failed", K(palf_id), K_(addr));
|
|
} else if (OB_FAIL(mock_election_map_.insert_and_get(palf::LSKey(palf_id), mock_election))) {
|
|
SERVER_LOG(WARN, "create_mock_election failed", K(palf_id));
|
|
} else {
|
|
SERVER_LOG(INFO, "create_mock_election success", K(palf_id), K_(addr), KP(mock_election));
|
|
}
|
|
if (OB_FAIL(ret) && NULL != mock_election) {
|
|
ob_free(mock_election);
|
|
mock_election = NULL;
|
|
}
|
|
return ret;
|
|
}
|
|
int remove_mock_election(const int64_t palf_id) override final
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (OB_FAIL(mock_election_map_.del(palf::LSKey(palf_id))) && OB_ENTRY_NOT_EXIST != ret) {
|
|
SERVER_LOG(WARN, "del failed", K(palf_id));
|
|
} else {
|
|
ret = OB_SUCCESS;
|
|
SERVER_LOG(INFO, "remove_mock_election success", K(palf_id), K_(addr));
|
|
}
|
|
return ret;
|
|
}
|
|
int set_leader(const int64_t palf_id, const common::ObAddr &leader, const int64_t new_epoch = 0)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
MockElection *mock_election= NULL;
|
|
if (OB_FAIL(mock_election_map_.get(palf::LSKey(palf_id), mock_election))) {
|
|
SERVER_LOG(WARN, "get failed", K(palf_id));
|
|
} else if (OB_FAIL(mock_election->set_leader(leader, new_epoch))) {
|
|
SERVER_LOG(WARN, "set_leader failed", K(palf_id), KP(mock_election), K(leader), K(new_epoch));
|
|
}
|
|
if (OB_NOT_NULL(mock_election)) {
|
|
mock_election_map_.revert(mock_election);
|
|
}
|
|
return ret;
|
|
}
|
|
int update_server_log_disk(const int64_t log_disk_size);
|
|
int set_mock_ls_ckpt(const int64_t palf_id, const SCN scn, const LSN lsn);
|
|
int get_mock_ls_ckpt(const int64_t palf_id, share::SCN &scn, LSN &lsn) const;
|
|
int create_ls(const int64_t palf_id,
|
|
const AccessMode &access_mode,
|
|
const PalfBaseInfo &base_info,
|
|
IPalfHandleImpl *&palf_handle_impl) override final;
|
|
int remove_ls(const int64_t palf_id) override final;
|
|
MockObLocalityManager *get_locality_manager() { return &mock_locality_manager_; }
|
|
TO_STRING_KV(K_(node_id), K_(addr), KP(palf_env_));
|
|
|
|
protected:
|
|
int init_io_(const std::string &cluster_name);
|
|
int init_network_(const common::ObAddr &addr, const bool is_bootstrap);
|
|
int init_log_service_(const bool is_bootstrap);
|
|
int init_ls_service_(const bool is_bootstrap);
|
|
int init_memory_dump_timer_();
|
|
int update_tenant_log_disk_size_(const uint64_t tenant_id,
|
|
const int64_t old_log_disk_size,
|
|
const int64_t new_log_disk_size,
|
|
int64_t &allowed_log_disk_size);
|
|
int update_disk_opts_no_lock_(const PalfDiskOptions &opts);
|
|
int add_ls_to_ls_map_(const int64_t palf_id);
|
|
int init_log_kv_cache_();
|
|
|
|
private:
|
|
int64_t cluster_id_;
|
|
int64_t tenant_id_;
|
|
int64_t node_id_;
|
|
common::ObAddr addr_;
|
|
rpc::frame::ObNetEasy net_;
|
|
obrpc::ObRpcHandler handler_;
|
|
ObLogDeliver deliver_;
|
|
ObRandom rand_;
|
|
PalfEnv *palf_env_;
|
|
ObTenantBase *tenant_base_;
|
|
ObMalloc malloc_mgr_;
|
|
ObLocalDevice *io_device_;
|
|
static const int64_t MAX_IOD_OPT_CNT = 5;
|
|
ObIODOpt iod_opt_array_[MAX_IOD_OPT_CNT];
|
|
ObIODOpts iod_opts_;
|
|
std::string clog_dir_;
|
|
ObOccamTimer timer_;
|
|
ObOccamTimerTaskRAIIHandle timer_handle_;
|
|
logservice::ObLogService log_service_;
|
|
ObTenantMutilAllocator *allocator_;
|
|
rpc::frame::ObReqTransport *transport_;
|
|
rpc::frame::ObReqTransport *batch_rpc_transport_;
|
|
rpc::frame::ObReqTransport *high_prio_rpc_transport_;
|
|
ObLSService *ls_service_;
|
|
ObLocationService location_service_;
|
|
MockMetaReporter reporter_;
|
|
logservice::ObServerLogBlockMgr log_block_pool_;
|
|
common::ObMySQLProxy sql_proxy_;
|
|
MockNetKeepAliveAdapter *net_keepalive_;
|
|
ObSrvRpcProxy srv_proxy_;
|
|
logservice::coordinator::ObFailureDetector detector_;
|
|
MockElectionMap mock_election_map_;
|
|
// ObTenantUnit以及__all_unit_configs
|
|
ObSpinLock log_disk_lock_;
|
|
// 本地已生效日志盘规格
|
|
palf::PalfDiskOptions disk_opts_;
|
|
// 内部表中记录日志盘规格
|
|
palf::PalfDiskOptions inner_table_disk_opts_;
|
|
ObLooper looper_;
|
|
MockCkptMap ckpt_map_;
|
|
MockObLocalityManager mock_locality_manager_;
|
|
obrpc::ObBatchRpc batch_rpc_;
|
|
int batch_rpc_tg_id_;
|
|
omt::ObSharedTimer shared_timer_;
|
|
ObTimerService *timer_service_ = nullptr; // for timer
|
|
};
|
|
|
|
} // end unittest
|
|
} // oceanbase
|