/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #pragma once #include "lib/hash/ob_hashset.h" #include "lib/hash/ob_hashset.h" #include "lib/hash/ob_linear_hash_map.h" #include "lib/ob_errno.h" #include "lib/thread/ob_simple_thread_pool.h" #include "lib/thread/thread_mgr_interface.h" #include "lib/signal/ob_signal_worker.h" #include "lib/signal/ob_signal_struct.h" #include "lib/function/ob_function.h" // ObFunction #include "observer/ob_signal_handle.h" #include "lib/utility/ob_defer.h" #include "rpc/frame/ob_req_deliver.h" #include "rpc/ob_request.h" #include "rpc/frame/ob_net_easy.h" #include "rpc/obrpc/ob_rpc_handler.h" #include "rpc/frame/ob_req_transport.h" #include "logservice/logrpc/ob_log_rpc_processor.h" #include "logservice/logrpc/ob_log_request_handler.h" #include "logservice/palf/log_rpc_macros.h" #include "logservice/palf/log_rpc_processor.h" #include "logservice/palf/palf_env.h" #include "logservice/ob_arbitration_service.h" #include "mock_election.h" #include "mock_ob_locality_manager.h" #include "mock_ob_meta_reporter.h" #include "lib/net/ob_addr.h" #include "share/ob_rpc_struct.h" #include "storage/blocksstable/ob_block_sstable_struct.h" #include "storage/tx_storage/ob_ls_service.h" #include "share/allocator/ob_tenant_mutil_allocator.h" #include "storage/tx_storage/ob_ls_map.h" #include "storage/tx_storage/ob_ls_service.h" #include "logservice/palf/palf_handle_impl.h" #include "logservice/ob_log_service.h" #include "logservice/ob_server_log_block_mgr.h" #include "share/ob_local_device.h" #include "share/ob_occam_timer.h" #include "share/resource_manager/ob_cgroup_ctrl.h" #include "logservice/ob_net_keepalive_adapter.h" #include "logservice/leader_coordinator/ob_failure_detector.h" #include "logservice/arbserver/arb_tg_helper.h" #include #include #include "share/ob_tenant_mem_limit_getter.h" namespace oceanbase { namespace unittest { class ObLogDeliver; } namespace unittest { using namespace oceanbase; using namespace oceanbase::rpc; using namespace oceanbase::rpc::frame; using namespace oceanbase::obrpc; using namespace oceanbase::common; using namespace oceanbase::palf; using namespace oceanbase::share; class MockNetKeepAliveAdapter : public logservice::IObNetKeepAliveAdapter { public: MockNetKeepAliveAdapter() : log_deliver_(NULL) {} ~MockNetKeepAliveAdapter() { log_deliver_ = NULL; } int init(unittest::ObLogDeliver *log_deliver); bool in_black_or_stopped(const common::ObAddr &server) override final; bool is_server_stopped(const common::ObAddr &server) override final; bool in_black(const common::ObAddr &server) override final; int get_last_resp_ts(const common::ObAddr &server, int64_t &last_resp_ts) override final; private: unittest::ObLogDeliver *log_deliver_; }; uint32_t get_local_addr(const char *dev_name); std::string get_local_ip(); struct LossConfig { ObAddr src_; int loss_rate_; LossConfig() : src_(), loss_rate_(0) {} LossConfig(const ObAddr &src, const int loss_rate) : src_(src), loss_rate_(loss_rate) {} TO_STRING_KV(K_(src), K_(loss_rate)); }; class ObMittestBlacklist { public: int init(const common::ObAddr &self); void block_net(const ObAddr &src); void unblock_net(const ObAddr &src); void block_pcode(const ObRpcPacketCode &pcode); void unblock_pcode(const ObRpcPacketCode &pcode); bool need_filter_packet_by_blacklist(const ObAddr &address); bool need_filter_packet_by_pcode_blacklist(const ObRpcPacketCode &pcode); void set_need_drop_packet(const bool need_drop_packet) { need_drop_packet_ = need_drop_packet; } void set_rpc_loss(const ObAddr &src, const int loss_rate); void reset_rpc_loss(const ObAddr &src); bool need_drop_by_loss_config(const ObAddr &addr); void get_loss_config(const ObAddr &src, bool &exist, LossConfig &loss_config); TO_STRING_KV(K_(blacklist), K_(rpc_loss_config)); protected: hash::ObHashSet blacklist_; hash::ObHashSet pcode_blacklist_; bool need_drop_packet_; common::ObSEArray rpc_loss_config_; common::ObAddr self_; }; class ObSimpleLogServer; class ObLooper : public share::ObThreadPool { public: static constexpr int64_t INTERVAL_US = 1000*1000; ObLooper(); virtual ~ObLooper(); public: int init(ObSimpleLogServer *log_server); void destroy(); void run1(); private: void log_loop_(); private: ObSimpleLogServer *log_server_; int64_t run_interval_; bool is_inited_; private: DISALLOW_COPY_AND_ASSIGN(ObLooper); }; class MockLogGetCkptReqP: public obrpc::ObRpcProcessor> { public: MockLogGetCkptReqP(): ckpt_functor_(NULL), filter_(NULL) {} virtual ~MockLogGetCkptReqP() { ckpt_functor_ = NULL; filter_ = NULL; } int process() { int ret = OB_SUCCESS; const logservice::LogGetCkptReq &req = arg_; const common::ObAddr server = req.src_; \ const share::ObLSID &ls_id = req.ls_id_; logservice::LogGetCkptResp &resp = result_; if (OB_ISNULL(ckpt_functor_)) { ret = OB_ERR_UNEXPECTED; CLOG_LOG(ERROR, "ckpt_functor_ is NULL", K(ret), K(req), K(resp)); } else if (OB_UNLIKELY(NULL != filter_ && true == (*filter_)(server))) { \ CLOG_LOG(INFO, "need filter this packet", K(req)); \ } else if (OB_FAIL((*ckpt_functor_)(ls_id, resp.ckpt_scn_, resp.ckpt_lsn_))) { CLOG_LOG(ERROR, "ckpt_functor_ failed", K(ret), K(req), K(resp)); } else { CLOG_LOG(INFO, "ckpt_functor_ success", K(ret), K(req), K(resp)); } return ret; } void set_ckpt_functor(void *functor) { ckpt_functor_ = reinterpret_cast *>(functor); } void set_filter(void *filter) { filter_ = reinterpret_cast *>(filter); } private: ObFunction *ckpt_functor_; ObFunction *filter_; }; class ObLogDeliver : public rpc::frame::ObReqDeliver, public lib::TGTaskHandler, public ObMittestBlacklist { public: ObLogDeliver() : rpc::frame::ObReqDeliver(), log_server_(NULL), tg_id_(0), is_stopped_(true) {} ~ObLogDeliver() { destroy(true); } int init() override final {return OB_SUCCESS;} int init(const common::ObAddr &self, const bool is_bootstrap, ObSimpleLogServer *log_server); void destroy(const bool is_shutdown); int deliver(rpc::ObRequest &req); int start(); void stop(); int wait(); void handle(void *task); private: void init_all_propocessor_(); typedef std::function Func; Func funcs_[MAX_PCODE]; template void register_rpc_propocessor_(int pcode) { auto func = [](ObReqProcessor *&ptr) -> int { int ret = OB_SUCCESS; if (NULL == (ptr = OB_NEW(PROCESSOR, "SimpleLogSvr"))) { SERVER_LOG(WARN, "allocate memory failed"); } else if (OB_FAIL(ptr->init())) { } else { } return ret; }; funcs_[pcode] = func; SERVER_LOG(INFO, "register_rpc_propocessor_ success", K(pcode)); } int handle_req_(rpc::ObRequest &req); private: mutable common::RWLock lock_; bool is_inited_; ObSimpleLogServer *log_server_; int tg_id_; bool is_stopped_; int64_t node_id_; }; class MockElectionAlloc { public: typedef common::LinkHashNode Node; static MockElection *alloc_value() { return NULL; } static void free_value(MockElection *mock_election) { ob_free(mock_election); mock_election = NULL; } static Node *alloc_node(MockElection *val) { UNUSED(val); ObMemAttr attr(1, ObNewModIds::OB_ELECTION); Node* node = (Node*)ob_malloc(sizeof(Node), attr); new(node) Node(); return node; } static void free_node(Node *node) { node->~Node(); ob_free(node); node = NULL; } }; typedef common::ObLinkHashMap MockElectionMap; typedef common::ObLinearHashMap> MockCkptMap; class ObISimpleLogServer { public: static const uint64_t DEFAULT_TENANT_ID = 1002; ObISimpleLogServer() {} virtual ~ObISimpleLogServer() {} virtual bool is_valid() const = 0; virtual IPalfEnvImpl *get_palf_env() = 0; virtual void revert_palf_env(IPalfEnvImpl *palf_env) = 0; virtual const std::string& get_clog_dir() const = 0; virtual common::ObAddr get_addr() const = 0; virtual ObTenantBase *get_tenant_base() const = 0; virtual logservice::ObLogFlashbackService *get_flashback_service() = 0; virtual void set_need_drop_packet(const bool need_drop_packet) = 0; virtual void block_net(const ObAddr &src) = 0; virtual void unblock_net(const ObAddr &src) = 0; virtual void block_pcode(const ObRpcPacketCode &pcode) = 0; virtual void unblock_pcode(const ObRpcPacketCode &pcode) = 0; virtual void set_rpc_loss(const ObAddr &src, const int loss_rate) = 0; virtual void reset_rpc_loss(const ObAddr &src) = 0; virtual int simple_init(const std::string &cluster_name, const common::ObAddr &addr, const int64_t node_id, ObTenantIOManager *tio_manager, LogMemberRegionMap *region_map, const bool is_bootstrap) = 0; virtual int simple_start(const bool is_bootstrap) = 0; virtual int simple_close(const bool is_shutdown) = 0; virtual int simple_restart(const std::string &cluster_name, const int64_t node_idx, ObTenantIOManager *tio_manager) = 0; virtual ILogBlockPool *get_block_pool() = 0; virtual ObILogAllocator *get_allocator() = 0; virtual int update_disk_opts(const PalfDiskOptions &opts) = 0; virtual int get_disk_opts(PalfDiskOptions &opts) = 0; virtual int get_palf_env(PalfEnv *&palf_env) = 0; virtual bool is_arb_server() const {return false;}; virtual int64_t get_node_id() = 0; virtual int create_mock_election(const int64_t palf_id, MockElection *&mock_election) = 0; virtual int remove_mock_election(const int64_t palf_id) = 0; virtual int set_leader(const int64_t palf_id, const common::ObAddr &leader, const int64_t new_epoch = 0) = 0; virtual int update_server_log_disk(const int64_t log_disk_size) = 0; virtual int create_ls(const int64_t palf_id, const AccessMode &access_mode, const PalfBaseInfo &base_info, IPalfHandleImpl *&palf_handle_impl) = 0; virtual int remove_ls(const int64_t palf_id) = 0; virtual MockObLocalityManager *get_locality_manager() = 0; DECLARE_PURE_VIRTUAL_TO_STRING; }; class ObLogMittestTenantBase : public arbserver::ArbTGHelper { public: ObLogMittestTenantBase(int64_t cluster_id, uint64_t tenant_id, int64_t node_id) : ArbTGHelper(cluster_id, tenant_id, node_id) {} virtual int pre_run() override final { int ret = OB_SUCCESS; if (OB_FAIL(arbserver::ArbTGHelper::pre_run())) { CLOG_LOG(ERROR, "ArbTGHelper pre_run failed"); } else if (OB_FAIL(ObTenantBase::pre_run())) { CLOG_LOG(ERROR, "ObTenantBase pre_run failed"); } return ret; } virtual int end_run() override final { int ret = OB_SUCCESS; if (OB_FAIL(ObTenantBase::end_run())) { CLOG_LOG(ERROR, "ObTenantBase pre_run failed"); } else if (OB_FAIL(arbserver::ArbTGHelper::end_run())) { CLOG_LOG(ERROR, "ArbTGHelper pre_run failed"); } return ret; } }; class ObSimpleLogServer : public ObISimpleLogServer { public: ObSimpleLogServer() : cluster_id_(1), tenant_id_(ObISimpleLogServer::DEFAULT_TENANT_ID), // the tenant in mittest must be 1002, otherwise, arb server can not start service. handler_(deliver_), transport_(NULL), batch_rpc_transport_(NULL), high_prio_rpc_transport_(NULL) { } ~ObSimpleLogServer() { if (OB_NOT_NULL(allocator_)) { ob_delete(allocator_); } if (OB_NOT_NULL(io_device_)) { ob_delete(io_device_); } if (OB_NOT_NULL(timer_service_)) { ob_delete(timer_service_); } } int simple_init(const std::string &cluster_name, const common::ObAddr &addr, const int64_t node_id, ObTenantIOManager *tio_manager, LogMemberRegionMap *region_map, const bool is_bootstrap) override final; int simple_start(const bool is_bootstrap) override final; int simple_close(const bool is_shutdown) override final; int simple_restart(const std::string &cluster_name, const int64_t node_idx, ObTenantIOManager *tio_manager) override final; public: int64_t get_node_id() {return node_id_;} ILogBlockPool *get_block_pool() override final {return &log_block_pool_;}; ObILogAllocator *get_allocator() override final { return allocator_; } virtual int update_disk_opts(const PalfDiskOptions &opts) override final; virtual int get_disk_opts(PalfDiskOptions &opts) override final { ObSpinLockGuard guard(log_disk_lock_); opts = disk_opts_; return OB_SUCCESS; } virtual int try_resize(); virtual int get_palf_env(PalfEnv *&palf_env) { palf_env = palf_env_; return OB_SUCCESS;} virtual void revert_palf_env(IPalfEnvImpl *palf_env) { UNUSED(palf_env); } bool is_valid() const override final {return NULL != palf_env_;} IPalfEnvImpl *get_palf_env() override final { return palf_env_->get_palf_env_impl(); } const std::string& get_clog_dir() const override final { return clog_dir_; } common::ObAddr get_addr() const override final { return addr_; } ObTenantBase *get_tenant_base() const override final { return tenant_base_; } logservice::ObLogFlashbackService *get_flashback_service() override final { return log_service_.get_flashback_service(); } // Nowdat, not support drop packet from specificed address void set_need_drop_packet(const bool need_drop_packet) override final { deliver_.set_need_drop_packet(need_drop_packet);} void block_net(const ObAddr &src) override final { deliver_.block_net(src); } void unblock_net(const ObAddr &src) override final { deliver_.unblock_net(src); } void block_pcode(const ObRpcPacketCode &pcode) override final { deliver_.block_pcode(pcode); } void unblock_pcode(const ObRpcPacketCode &pcode) override final { deliver_.unblock_pcode(pcode); } void set_rpc_loss(const ObAddr &src, const int loss_rate) override final { deliver_.set_rpc_loss(src, loss_rate); } void reset_rpc_loss(const ObAddr &src) override final { deliver_.reset_rpc_loss(src); } int create_mock_election(const int64_t palf_id, MockElection *&mock_election) override final { int ret = OB_SUCCESS; mock_election = NULL; void *buf = NULL; ObMemAttr attr(1, ObNewModIds::OB_ELECTION); if (OB_ISNULL(buf = ob_malloc(sizeof(MockElection), attr))) { ret = OB_ERR_UNEXPECTED; SERVER_LOG(ERROR, "ob_malloc failed", K(palf_id)); } else if (FALSE_IT(mock_election = new (buf) MockElection)) { } else if (OB_FAIL(mock_election->init(palf_id, addr_))) { SERVER_LOG(WARN, "mock_election->init failed", K(palf_id), K_(addr)); } else if (OB_FAIL(mock_election_map_.insert_and_get(palf::LSKey(palf_id), mock_election))) { SERVER_LOG(WARN, "create_mock_election failed", K(palf_id)); } else { SERVER_LOG(INFO, "create_mock_election success", K(palf_id), K_(addr), KP(mock_election)); } if (OB_FAIL(ret) && NULL != mock_election) { ob_free(mock_election); mock_election = NULL; } return ret; } int remove_mock_election(const int64_t palf_id) override final { int ret = OB_SUCCESS; if (OB_FAIL(mock_election_map_.del(palf::LSKey(palf_id))) && OB_ENTRY_NOT_EXIST != ret) { SERVER_LOG(WARN, "del failed", K(palf_id)); } else { ret = OB_SUCCESS; SERVER_LOG(INFO, "remove_mock_election success", K(palf_id), K_(addr)); } return ret; } int set_leader(const int64_t palf_id, const common::ObAddr &leader, const int64_t new_epoch = 0) { int ret = OB_SUCCESS; MockElection *mock_election= NULL; if (OB_FAIL(mock_election_map_.get(palf::LSKey(palf_id), mock_election))) { SERVER_LOG(WARN, "get failed", K(palf_id)); } else if (OB_FAIL(mock_election->set_leader(leader, new_epoch))) { SERVER_LOG(WARN, "set_leader failed", K(palf_id), KP(mock_election), K(leader), K(new_epoch)); } if (OB_NOT_NULL(mock_election)) { mock_election_map_.revert(mock_election); } return ret; } int update_server_log_disk(const int64_t log_disk_size); int set_mock_ls_ckpt(const int64_t palf_id, const SCN scn, const LSN lsn); int get_mock_ls_ckpt(const int64_t palf_id, share::SCN &scn, LSN &lsn) const; int create_ls(const int64_t palf_id, const AccessMode &access_mode, const PalfBaseInfo &base_info, IPalfHandleImpl *&palf_handle_impl) override final; int remove_ls(const int64_t palf_id) override final; MockObLocalityManager *get_locality_manager() { return &mock_locality_manager_; } TO_STRING_KV(K_(node_id), K_(addr), KP(palf_env_)); protected: int init_io_(const std::string &cluster_name); int init_network_(const common::ObAddr &addr, const bool is_bootstrap); int init_log_service_(const bool is_bootstrap); int init_ls_service_(const bool is_bootstrap); int init_memory_dump_timer_(); int update_tenant_log_disk_size_(const uint64_t tenant_id, const int64_t old_log_disk_size, const int64_t new_log_disk_size, int64_t &allowed_log_disk_size); int update_disk_opts_no_lock_(const PalfDiskOptions &opts); int add_ls_to_ls_map_(const int64_t palf_id); int init_log_kv_cache_(); private: int64_t cluster_id_; int64_t tenant_id_; int64_t node_id_; common::ObAddr addr_; rpc::frame::ObNetEasy net_; obrpc::ObRpcHandler handler_; ObLogDeliver deliver_; ObRandom rand_; PalfEnv *palf_env_; ObTenantBase *tenant_base_; ObMalloc malloc_mgr_; ObLocalDevice *io_device_; static const int64_t MAX_IOD_OPT_CNT = 5; ObIODOpt iod_opt_array_[MAX_IOD_OPT_CNT]; ObIODOpts iod_opts_; std::string clog_dir_; ObOccamTimer timer_; ObOccamTimerTaskRAIIHandle timer_handle_; logservice::ObLogService log_service_; ObTenantMutilAllocator *allocator_; rpc::frame::ObReqTransport *transport_; rpc::frame::ObReqTransport *batch_rpc_transport_; rpc::frame::ObReqTransport *high_prio_rpc_transport_; ObLSService *ls_service_; ObLocationService location_service_; MockMetaReporter reporter_; logservice::ObServerLogBlockMgr log_block_pool_; common::ObMySQLProxy sql_proxy_; MockNetKeepAliveAdapter *net_keepalive_; ObSrvRpcProxy srv_proxy_; logservice::coordinator::ObFailureDetector detector_; MockElectionMap mock_election_map_; // ObTenantUnit以及__all_unit_configs ObSpinLock log_disk_lock_; // 本地已生效日志盘规格 palf::PalfDiskOptions disk_opts_; // 内部表中记录日志盘规格 palf::PalfDiskOptions inner_table_disk_opts_; ObLooper looper_; MockCkptMap ckpt_map_; MockObLocalityManager mock_locality_manager_; obrpc::ObBatchRpc batch_rpc_; int batch_rpc_tg_id_; omt::ObSharedTimer shared_timer_; ObTimerService *timer_service_ = nullptr; // for timer }; } // end unittest } // oceanbase