From c18fefb161108bfa1009dd8a5143728aea2d3ba0 Mon Sep 17 00:00:00 2001 From: HaHaJeff Date: Tue, 6 Feb 2024 21:42:11 +0000 Subject: [PATCH] [FEAT MERGE] cherry pick restore_pf_opt from 421release to master --- .../logservice/env/ob_simple_log_server.cpp | 32 ++++++- mittest/logservice/env/ob_simple_log_server.h | 8 +- .../test_ob_simple_log_basic_func.cpp | 2 +- src/logservice/ob_log_service.cpp | 11 ++- src/logservice/ob_log_service.h | 5 + src/logservice/palf/log_define.h | 7 ++ src/logservice/palf/log_engine.cpp | 5 +- src/logservice/palf/log_engine.h | 42 +++++---- src/logservice/palf/log_net_service.cpp | 9 +- src/logservice/palf/log_net_service.h | 30 +++++- src/logservice/palf/log_req.h | 44 ++++++++- src/logservice/palf/log_rpc.cpp | 7 +- src/logservice/palf/log_rpc.h | 43 ++++++++- src/logservice/palf/log_sliding_window.cpp | 32 +++++-- src/logservice/palf/log_sliding_window.h | 3 +- src/logservice/palf/lsn_allocator.cpp | 73 +++++++++----- src/logservice/palf/palf_env.cpp | 3 +- src/logservice/palf/palf_env.h | 5 + src/logservice/palf/palf_env_impl.cpp | 7 +- src/logservice/palf/palf_env_impl.h | 5 + .../replayservice/ob_log_replay_service.cpp | 32 ++++++- .../replayservice/ob_log_replay_service.h | 1 + .../restoreservice/ob_log_restore_handler.cpp | 43 +++++++-- .../restoreservice/ob_log_restore_handler.h | 7 ++ .../ob_log_restore_scheduler.cpp | 13 ++- .../restoreservice/ob_log_restore_service.cpp | 42 +++------ .../restoreservice/ob_log_restore_service.h | 3 - .../restoreservice/ob_remote_log_writer.cpp | 37 +++++++- .../restoreservice/ob_remote_log_writer.h | 2 + src/rootserver/ob_tenant_info_loader.cpp | 13 +++ src/rootserver/ob_tenant_info_loader.h | 9 ++ src/share/rpc/ob_batch_processor.cpp | 46 ++++++++- src/share/rpc/ob_batch_processor.h | 1 + src/share/rpc/ob_batch_proxy.cpp | 8 +- src/share/rpc/ob_batch_proxy.h | 4 +- .../mock_log_engine.h | 4 +- unittest/logservice/test_lsn_allocator.cpp | 94 ++++++++++++++++++- 37 files changed, 604 insertions(+), 128 deletions(-) diff --git a/mittest/logservice/env/ob_simple_log_server.cpp b/mittest/logservice/env/ob_simple_log_server.cpp index e31e7c6b88..39520d1618 100644 --- a/mittest/logservice/env/ob_simple_log_server.cpp +++ b/mittest/logservice/env/ob_simple_log_server.cpp @@ -30,6 +30,7 @@ #include "share/ob_io_device_helper.h" #include "share/ob_thread_mgr.h" #include "logservice/palf/palf_options.h" +#include "share/rpc/ob_batch_processor.h" namespace oceanbase { @@ -278,6 +279,8 @@ int ObSimpleLogServer::init_network_(const common::ObAddr &addr, const bool is_b int ret = OB_SUCCESS; ObNetOptions opts; opts.rpc_io_cnt_ = 10; + opts.high_prio_rpc_io_cnt_ = 10; + opts.batch_rpc_io_cnt_ = 10; opts.tcp_user_timeout_ = 10 * 1000 * 1000; // 10s addr_ = addr; obrpc::ObRpcNetHandler::CLUSTER_ID = 1; @@ -293,6 +296,18 @@ int ObSimpleLogServer::init_network_(const common::ObAddr &addr, const bool is_b SERVER_LOG(ERROR, "net_ listen failed", K(ret)); } else if (is_bootstrap && OB_FAIL(srv_proxy_.init(transport_))) { SERVER_LOG(ERROR, "init srv_proxy_ failed"); + } else if (is_bootstrap && OB_FAIL(net_.add_high_prio_rpc_listen(addr_.get_port(), handler_, high_prio_rpc_transport_))) { + SERVER_LOG(ERROR, "net_ listen failed", K(ret)); + } else if (is_bootstrap && OB_FAIL(net_.batch_rpc_net_register(handler_, batch_rpc_transport_))) { + SERVER_LOG(ERROR, "batch_rpc_ init failed", K(ret)); + } else if (FALSE_IT(batch_rpc_transport_->set_bucket_count(10))) { + } else if (is_bootstrap && OB_FAIL(batch_rpc_.init(batch_rpc_transport_, high_prio_rpc_transport_, addr))) { + SERVER_LOG(ERROR, "batch_rpc_ init failed", K(ret)); + // } else if (is_bootstrap && OB_FAIL(TG_SET_RUNNABLE_AND_START(lib::TGDefIDs::BRPC, batch_rpc_))) { + } else if (is_bootstrap && OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::BRPC, batch_rpc_tg_id_))) { + SERVER_LOG(ERROR, "batch_rpc_ init failed", K(ret)); + } else if (is_bootstrap && OB_FAIL(TG_SET_RUNNABLE_AND_START(batch_rpc_tg_id_, batch_rpc_))) { + SERVER_LOG(ERROR, "batch_rpc_ start failed", K(ret)); } else { deliver_.node_id_ = node_id_; SERVER_LOG(INFO, "init_network success", K(ret), K(addr_), K(node_id_), K(opts)); @@ -395,7 +410,7 @@ int ObSimpleLogServer::init_log_service_() net_keepalive_ = MTL_NEW(MockNetKeepAliveAdapter, "SimpleLog"); if (OB_FAIL(net_keepalive_->init(&deliver_))) { - } else if (OB_FAIL(log_service_.init(opts, clog_dir.c_str(), addr_, allocator_, transport_, &ls_service_, + } else if (OB_FAIL(log_service_.init(opts, clog_dir.c_str(), addr_, allocator_, transport_, &batch_rpc_, &ls_service_, &location_service_, &reporter_, &log_block_pool_, &sql_proxy_, net_keepalive_))) { SERVER_LOG(ERROR, "init_log_service_ fail", K(ret)); } else if (OB_FAIL(log_block_pool_.create_tenant(opts.disk_options_.log_disk_usage_limit_size_))) { @@ -442,6 +457,11 @@ int ObSimpleLogServer::simple_close(const bool is_shutdown = false) guard.click("destroy_palf_env"); if (is_shutdown) { + TG_STOP(batch_rpc_tg_id_); + TG_WAIT(batch_rpc_tg_id_); + TG_DESTROY(batch_rpc_tg_id_); + batch_rpc_tg_id_ = -1; + net_.rpc_shutdown(); net_.stop(); net_.wait(); @@ -734,6 +754,12 @@ int ObLogDeliver::handle_req_(rpc::ObRequest &req) return OB_SUCCESS; } switch (pkt.get_pcode()) { + #define BATCH_RPC_PROCESS() \ + ObBatchP p;\ + p.init(); \ + p.set_ob_request(req);\ + p.run();\ + break; #define PROCESS(processer) \ processer p;\ p.init();\ @@ -833,6 +859,10 @@ int ObLogDeliver::handle_req_(rpc::ObRequest &req) modify_pkt.set_tenant_id(node_id_); PROCESS(LogNotifyFetchLogReqP) } + case obrpc::OB_BATCH: { + modify_pkt.set_tenant_id(node_id_); + BATCH_RPC_PROCESS() + } default: SERVER_LOG(ERROR, "invalid req type", K(pkt.get_pcode())); break; diff --git a/mittest/logservice/env/ob_simple_log_server.h b/mittest/logservice/env/ob_simple_log_server.h index b34018c1c3..e0644a3a92 100644 --- a/mittest/logservice/env/ob_simple_log_server.h +++ b/mittest/logservice/env/ob_simple_log_server.h @@ -261,7 +261,9 @@ class ObSimpleLogServer : public ObISimpleLogServer public: ObSimpleLogServer() : handler_(deliver_), - transport_(NULL) + transport_(NULL), + batch_rpc_transport_(NULL), + high_prio_rpc_transport_(NULL) { } ~ObSimpleLogServer() @@ -405,6 +407,8 @@ private: logservice::ObLogService log_service_; ObTenantMutilAllocator *allocator_; rpc::frame::ObReqTransport *transport_; + rpc::frame::ObReqTransport *batch_rpc_transport_; + rpc::frame::ObReqTransport *high_prio_rpc_transport_; ObLSService ls_service_; ObLocationService location_service_; MockMetaReporter reporter_; @@ -421,6 +425,8 @@ private: // 内部表中记录日志盘规格 palf::PalfDiskOptions inner_table_disk_opts_; ObLooper looper_; + obrpc::ObBatchRpc batch_rpc_; + int batch_rpc_tg_id_; }; } // end unittest diff --git a/mittest/logservice/test_ob_simple_log_basic_func.cpp b/mittest/logservice/test_ob_simple_log_basic_func.cpp index 2a1eaa8f5a..91e321fc58 100644 --- a/mittest/logservice/test_ob_simple_log_basic_func.cpp +++ b/mittest/logservice/test_ob_simple_log_basic_func.cpp @@ -53,7 +53,7 @@ bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false; TEST_F(TestObSimpleLogClusterBasicFunc, submit_log) { SET_CASE_LOG_FILE(TEST_NAME, "submit_log"); - //OB_LOGGER.set_log_level("TRACE"); + OB_LOGGER.set_log_level("TRACE"); const int64_t id = ATOMIC_AAF(&palf_id_, 1); const int64_t create_ts = 100; share::SCN create_scn; diff --git a/src/logservice/ob_log_service.cpp b/src/logservice/ob_log_service.cpp index 15fcda267f..aeb0e4d2c8 100644 --- a/src/logservice/ob_log_service.cpp +++ b/src/logservice/ob_log_service.cpp @@ -25,6 +25,7 @@ #include "share/ob_unit_getter.h" #include "share/rc/ob_tenant_base.h" #include "share/rc/ob_tenant_module_init_ctx.h" +#include "share/rpc/ob_batch_rpc.h" #include "storage/tx_storage/ob_ls_map.h" #include "storage/tx_storage/ob_ls_service.h" #include "observer/ob_srv_network_frame.h" @@ -99,6 +100,7 @@ int ObLogService::mtl_init(ObLogService* &logservice) self, alloc_mgr, net_frame->get_req_transport(), + GCTX.batch_rpc_, MTL(ObLSService*), location_service, reporter, @@ -230,6 +232,7 @@ int ObLogService::init(const PalfOptions &options, const common::ObAddr &self, common::ObILogAllocator *alloc_mgr, rpc::frame::ObReqTransport *transport, + obrpc::ObBatchRpc *batch_rpc, ObLSService *ls_service, ObLocationService *location_service, observer::ObIMetaReport *reporter, @@ -246,14 +249,14 @@ int ObLogService::init(const PalfOptions &options, ret = OB_INIT_TWICE; CLOG_LOG(WARN, "ObLogService init twice", K(ret)); } else if (false == options.is_valid() || OB_ISNULL(base_dir) || OB_UNLIKELY(!self.is_valid()) - || OB_ISNULL(alloc_mgr) || OB_ISNULL(transport) || OB_ISNULL(ls_service) + || OB_ISNULL(alloc_mgr) || OB_ISNULL(transport) || OB_ISNULL(batch_rpc) || OB_ISNULL(ls_service) || OB_ISNULL(location_service) || OB_ISNULL(reporter) || OB_ISNULL(log_block_pool) || OB_ISNULL(sql_proxy) || OB_ISNULL(net_keepalive_adapter)) { ret = OB_INVALID_ARGUMENT; CLOG_LOG(WARN, "invalid arguments", K(ret), K(options), KP(base_dir), K(self), - KP(alloc_mgr), KP(transport), KP(ls_service), KP(location_service), KP(reporter), + KP(alloc_mgr), KP(transport), KP(batch_rpc), KP(ls_service), KP(location_service), KP(reporter), KP(log_block_pool), KP(sql_proxy), KP(net_keepalive_adapter)); - } else if (OB_FAIL(PalfEnv::create_palf_env(options, base_dir, self, transport, + } else if (OB_FAIL(PalfEnv::create_palf_env(options, base_dir, self, transport, batch_rpc, alloc_mgr, log_block_pool, &monitor_, palf_env_))) { CLOG_LOG(WARN, "failed to create_palf_env", K(base_dir), K(ret)); } else if (OB_ISNULL(palf_env_)) { @@ -287,7 +290,7 @@ int ObLogService::init(const PalfOptions &options, net_keepalive_adapter_ = net_keepalive_adapter; self_ = self; is_inited_ = true; - FLOG_INFO("ObLogService init success", K(ret), K(base_dir), K(self), KP(transport), + FLOG_INFO("ObLogService init success", K(ret), K(base_dir), K(self), KP(transport), KP(batch_rpc), KP(ls_service), K(tenant_id)); } diff --git a/src/logservice/ob_log_service.h b/src/logservice/ob_log_service.h index ad30b0848f..01880a86b6 100644 --- a/src/logservice/ob_log_service.h +++ b/src/logservice/ob_log_service.h @@ -57,6 +57,10 @@ namespace frame class ObReqTransport; } } +namespace obrpc +{ +class ObBatchRpc; +} namespace share { @@ -98,6 +102,7 @@ public: const common::ObAddr &self, common::ObILogAllocator *alloc_mgr, rpc::frame::ObReqTransport *transport, + obrpc::ObBatchRpc *batch_rpc, storage::ObLSService *ls_service, share::ObLocationService *location_service, observer::ObIMetaReport *reporter, diff --git a/src/logservice/palf/log_define.h b/src/logservice/palf/log_define.h index 0ab8ea52b1..b9ce6a4a0b 100644 --- a/src/logservice/palf/log_define.h +++ b/src/logservice/palf/log_define.h @@ -149,6 +149,13 @@ constexpr int LOG_WRITE_FLAG = O_RDWR | O_DIRECT | O_SYNC; constexpr mode_t FILE_OPEN_MODE = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; // =========== Disk io end ==================== + +// =========== BatchRPC start ================== +// NOTE: ORDER AND VALUE ARE VITAL, DO NOT CHANGE +constexpr int64_t LOG_BATCH_PUSH_LOG_REQ = 1; +constexpr int64_t LOG_BATCH_PUSH_LOG_RESP = 2; +// =========== BatchRPC end ================== + const int64_t OB_INVALID_CONFIG_CHANGE_LOCK_OWNER = -1; enum ObReplicaState { diff --git a/src/logservice/palf/log_engine.cpp b/src/logservice/palf/log_engine.cpp index 0882b2f6c7..bb9ec70790 100644 --- a/src/logservice/palf/log_engine.cpp +++ b/src/logservice/palf/log_engine.cpp @@ -957,13 +957,14 @@ int LogEngine::submit_push_log_req(const common::ObAddr &server, int LogEngine::submit_push_log_resp(const ObAddr &server, const int64_t &msg_proposal_id, - const LSN &lsn) + const LSN &lsn, + const bool is_batch) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; } else { - ret = log_net_service_.submit_push_log_resp(server, msg_proposal_id, lsn); + ret = log_net_service_.submit_push_log_resp(server, msg_proposal_id, lsn, is_batch); PALF_LOG(TRACE, "submit_push_log_resp success", K(ret), K(server)); } return ret; diff --git a/src/logservice/palf/log_engine.h b/src/logservice/palf/log_engine.h index 606e4756f1..98b8d1de25 100644 --- a/src/logservice/palf/log_engine.h +++ b/src/logservice/palf/log_engine.h @@ -201,29 +201,29 @@ public: const int64_t &prev_log_proposal_id, const LSN &prev_lsn, const LSN &curr_lsn, - const LogWriteBuf &write_buf) + const LogWriteBuf &write_buf, + const bool need_batch_rpc) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; PALF_LOG(ERROR, "LogEngine not init", K(ret), KPC(this)); - } else if (OB_FAIL(log_net_service_.submit_push_log_req(member_list, - push_log_type, - msg_proposal_id, - prev_log_proposal_id, - prev_lsn, - curr_lsn, - write_buf))) { - // PALF_LOG(ERROR, - // "LogNetService submit_group_entry_to_memberlist failed", - // K(ret), - // KPC(this), - // K(member_list), - // K(prev_log_proposal_id), - // K(prev_lsn), - // K(prev_log_proposal_id), - // K(curr_lsn), - // K(write_buf)); + } else if (!need_batch_rpc + && OB_FAIL(log_net_service_.submit_push_log_req(member_list, + push_log_type, + msg_proposal_id, + prev_log_proposal_id, + prev_lsn, + curr_lsn, + write_buf))) { + } else if (need_batch_rpc + && OB_FAIL(log_net_service_.submit_batch_push_log_req(member_list, + push_log_type, + msg_proposal_id, + prev_log_proposal_id, + prev_lsn, + curr_lsn, + write_buf))) { } else { PALF_LOG(TRACE, "submit_group_entry_to_memberlist success", @@ -233,7 +233,8 @@ public: K(msg_proposal_id), K(prev_log_proposal_id), K(prev_lsn), - K(curr_lsn)); + K(curr_lsn), + K(need_batch_rpc)); } return ret; } @@ -251,7 +252,8 @@ public: // @param[in] lsn: the offset of log virtual int submit_push_log_resp(const common::ObAddr &server, const int64_t &msg_proposal_id, - const LSN &lsn); + const LSN &lsn, + const bool is_batch); template int submit_prepare_meta_req_(const List &member_list, const int64_t &log_proposal_id) diff --git a/src/logservice/palf/log_net_service.cpp b/src/logservice/palf/log_net_service.cpp index befb30e671..8f8cc90292 100644 --- a/src/logservice/palf/log_net_service.cpp +++ b/src/logservice/palf/log_net_service.cpp @@ -116,7 +116,8 @@ int LogNetService::submit_committed_info_req( int LogNetService::submit_push_log_resp( const ObAddr &server, const int64_t &msg_proposal_id, - const LSN &lsn) + const LSN &lsn, + const bool is_batch) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -129,9 +130,11 @@ int LogNetService::submit_push_log_resp( ret = OB_INVALID_ARGUMENT; PALF_LOG(ERROR, "Invalid argument!!!", K(ret), K(palf_id_), K(server), K(msg_proposal_id), K(lsn)); + } else if (is_batch) { + LogBatchPushResp push_log_resp(msg_proposal_id, lsn); + ret = post_request_to_server_(server, push_log_resp); } else { - LogPushResp push_log_resp(msg_proposal_id, - lsn); + LogPushResp push_log_resp(msg_proposal_id, lsn); ret = post_request_to_server_(server, push_log_resp); } return ret; diff --git a/src/logservice/palf/log_net_service.h b/src/logservice/palf/log_net_service.h index 97ffed6a39..32b2d7ada8 100644 --- a/src/logservice/palf/log_net_service.h +++ b/src/logservice/palf/log_net_service.h @@ -73,6 +73,33 @@ public: return ret; } + template + int submit_batch_push_log_req( + const List &member_list, + const PushLogType &push_log_type, + const int64_t &msg_proposal_id, + const int64_t &prev_log_proposal_id, + const LSN &prev_lsn, + const LSN &curr_lsn, + const LogWriteBuf &write_buf) + { + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + PALF_LOG(ERROR, "LogNetService has not inited!!!", K(ret)); + } else { + LogBatchPushReq push_log_req(push_log_type, + msg_proposal_id, + prev_log_proposal_id, + prev_lsn, + curr_lsn, + write_buf); + ret = post_request_to_member_list_(member_list, push_log_req); + PALF_LOG(TRACE, "post_request_to_member_list_ success", K(member_list), K(push_log_req)); + } + return ret; + } + int submit_push_log_req( const ObAddr &server, const PushLogType &push_log_type, @@ -86,7 +113,8 @@ public: int submit_push_log_resp( const common::ObAddr &server, const int64_t &msg_proposal_id, - const LSN &lsn); + const LSN &lsn, + const bool is_batch); template int submit_prepare_meta_req( diff --git a/src/logservice/palf/log_req.h b/src/logservice/palf/log_req.h index 6b0943afd5..f5ae9426fd 100644 --- a/src/logservice/palf/log_req.h +++ b/src/logservice/palf/log_req.h @@ -13,13 +13,14 @@ #ifndef OCEANBASE_LOGSERVICE_LOG_REQ_ #define OCEANBASE_LOGSERVICE_LOG_REQ_ -#include "lib/utility/ob_unify_serialize.h" // OB_UNIS_VERSION -#include "lib/utility/ob_print_utils.h" // TO_STRING_KV +#include "lib/utility/ob_unify_serialize.h" // OB_UNIS_VERSION +#include "lib/utility/ob_print_utils.h" // TO_STRING_KV #include "log_define.h" #include "log_meta_info.h" -#include "log_learner.h" // LogLearner, LogLearnerList -#include "logservice/palf/lsn.h" // LSN +#include "log_learner.h" // LogLearner, LogLearnerList +#include "logservice/palf/lsn.h" // LSN #include "log_writer_utils.h" // LogWriteBuf +#include "share/rpc/ob_batch_proxy.h" // BatchRPC namespace oceanbase { @@ -72,6 +73,26 @@ public: LogWriteBuf write_buf_; }; +struct LogBatchPushReq : public LogPushReq, public obrpc::ObIFill { + LogBatchPushReq(const PushLogType push_log_type, + const int64_t &msg_proposal_id, + const int64_t &prev_log_proposal_id, + const LSN &prev_lsn, + const LSN &curr_lsn, + const LogWriteBuf &write_buf) : LogPushReq(push_log_type, msg_proposal_id, + prev_log_proposal_id, prev_lsn, curr_lsn, write_buf), + ObIFill() {} + int fill_buffer(char* buf, int64_t size, int64_t &filled_size) const override final + { + filled_size = 0; + return serialize(buf, size, filled_size); + } + int64_t get_req_size() const override final + { + return get_serialize_size(); + } +}; + struct LogPushResp { OB_UNIS_VERSION(1); public: @@ -86,6 +107,21 @@ public: LSN lsn_; }; +struct LogBatchPushResp : public LogPushResp, public obrpc::ObIFill { + LogBatchPushResp(const int64_t &msg_proposal_id, + const LSN &lsn) + : LogPushResp(msg_proposal_id, lsn), ObIFill() {} + int fill_buffer(char* buf, int64_t size, int64_t &filled_size) const override final + { + filled_size = 0; + return serialize(buf, size, filled_size); + } + int64_t get_req_size() const override final + { + return get_serialize_size(); + } +}; + enum FetchLogType { FETCH_LOG_FOLLOWER = 0, diff --git a/src/logservice/palf/log_rpc.cpp b/src/logservice/palf/log_rpc.cpp index fcec73048c..957379613d 100644 --- a/src/logservice/palf/log_rpc.cpp +++ b/src/logservice/palf/log_rpc.cpp @@ -15,6 +15,7 @@ #include "log_rpc_proxy.h" // LogRpcProxyV2 #include "log_rpc_packet.h" // LogRpcPaket #include "log_req.h" // LogPushReq... +#include "observer/ob_server_struct.h" // GCTX namespace oceanbase { using namespace common; @@ -25,6 +26,7 @@ LogRpc::LogRpc() : rpc_proxy_(NULL), opt_lock_(), options_(), tenant_id_(0), + cluster_id_(0), is_inited_(false) { } @@ -37,7 +39,8 @@ LogRpc::~LogRpc() int LogRpc::init(const ObAddr &self, const int64_t cluster_id, const int64_t tenant_id, - rpc::frame::ObReqTransport *transport) + rpc::frame::ObReqTransport *transport, + obrpc::ObBatchRpc *batch_rpc) { int ret = OB_SUCCESS; if (IS_INIT) { @@ -47,6 +50,8 @@ int LogRpc::init(const ObAddr &self, } else { self_ = self; tenant_id_ = tenant_id; + batch_rpc_ = batch_rpc; + cluster_id_ = cluster_id; is_inited_ = true; PALF_LOG(INFO, "LogRpc init success", K(tenant_id), K(self)); } diff --git a/src/logservice/palf/log_rpc.h b/src/logservice/palf/log_rpc.h index 9aa3aba0ba..ecc83bbe96 100644 --- a/src/logservice/palf/log_rpc.h +++ b/src/logservice/palf/log_rpc.h @@ -21,6 +21,7 @@ #include "log_rpc_packet.h" // LogRpcPacketImpl #include "log_rpc_proxy.h" // LogRpcProxyV2 #include "share/resource_manager/ob_cgroup_ctrl.h" +#include "share/rpc/ob_batch_rpc.h" namespace oceanbase { @@ -72,11 +73,14 @@ public: int init(const common::ObAddr &self, const int64_t cluster_id, const int64_t tenant_id, - rpc::frame::ObReqTransport *transport); + rpc::frame::ObReqTransport *transport, + obrpc::ObBatchRpc *batch_rpc); void destroy(); int update_transport_compress_options(const PalfTransportCompressOptions &compress_opt); const PalfTransportCompressOptions& get_compress_opts() const; - template + template::value && + !std::is_same::value), bool>::type=true> int post_request(const common::ObAddr &server, const int64_t palf_id, const ReqType &req) @@ -120,13 +124,48 @@ public: return ret; } + // BatchRPC + template::value || + std::is_same::value), bool>::type=true> + int post_request(const common::ObAddr &server, + const int64_t palf_id, + const ReqType &req) + { + int ret = common::OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (false == server.is_valid() + || false == is_valid_palf_id(palf_id) + || false == req.is_valid()) { + ret = OB_INVALID_ARGUMENT; + } else if (OB_ISNULL(batch_rpc_)) { + ret = OB_ERR_UNEXPECTED; + PALF_LOG(ERROR, "batch_rpc_ is nullptr, unexpected error", K(ret), K(server), K(palf_id), KP(batch_rpc_), K(req)); + } else { + const int64_t LOG_BATCH_SUB_TYPE = (std::is_same::value)? \ + LOG_BATCH_PUSH_LOG_REQ: LOG_BATCH_PUSH_LOG_RESP; + ret = batch_rpc_->post(tenant_id_, + server, + cluster_id_, + obrpc::CLOG_BATCH_REQ, + LOG_BATCH_SUB_TYPE, + share::ObLSID(palf_id), + req); + PALF_LOG(TRACE, "post batch rpc finished", K(ret), K(server), K(tenant_id_)); + } + return ret; + } + TO_STRING_KV(K_(self), K_(is_inited)); private: ObAddr self_; obrpc::LogRpcProxyV2 rpc_proxy_; mutable ObSpinLock opt_lock_; PalfTransportCompressOptions options_; + obrpc::ObBatchRpc *batch_rpc_; int64_t tenant_id_; + int64_t cluster_id_; bool is_inited_; }; } // end namespace palf diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index 942a076f2a..5854fef4f7 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -856,6 +856,7 @@ int LogSlidingWindow::try_push_log_to_paxos_follower_(const int64_t curr_proposa ObMemberList dst_member_list; int64_t replica_num = 0; const bool need_send_log = (state_mgr_->is_leader_active()) ? true : false; + const bool need_batch_push = need_use_batch_rpc_(log_write_buf.get_total_size()); if (false == need_send_log) { // no need send log to paxos follower } else if (OB_FAIL(mm_->get_log_sync_member_list(dst_member_list, replica_num))) { @@ -864,7 +865,7 @@ int LogSlidingWindow::try_push_log_to_paxos_follower_(const int64_t curr_proposa PALF_LOG(WARN, "dst_member_list remove_server failed", K(ret), K_(palf_id), K_(self)); } else if (dst_member_list.is_valid() && OB_FAIL(log_engine_->submit_push_log_req(dst_member_list, PUSH_LOG, curr_proposal_id, - prev_log_pid, prev_lsn, lsn, log_write_buf))) { + prev_log_pid, prev_lsn, lsn, log_write_buf, need_batch_push))) { PALF_LOG(WARN, "submit_push_log_req failed", K(ret), K_(palf_id), K_(self)); } else { // do nothing @@ -882,18 +883,19 @@ int LogSlidingWindow::try_push_log_to_children_(const int64_t curr_proposal_id, LogLearnerList children_list; common::GlobalLearnerList degraded_learner_list; const bool need_presend_log = (state_mgr_->is_leader_active()) ? true : false; + const bool need_batch_push = need_use_batch_rpc_(log_write_buf.get_total_size()); if (OB_FAIL(mm_->get_children_list(children_list))) { PALF_LOG(WARN, "get_children_list failed", K(ret), K_(palf_id)); } else if (children_list.is_valid() && OB_FAIL(log_engine_->submit_push_log_req(children_list, PUSH_LOG, curr_proposal_id, - prev_log_pid, prev_lsn, lsn, log_write_buf))) { + prev_log_pid, prev_lsn, lsn, log_write_buf, need_batch_push))) { PALF_LOG(WARN, "submit_push_log_req failed", K(ret), K_(palf_id), K_(self)); } else if (false == need_presend_log) { } else if (OB_FAIL(mm_->get_degraded_learner_list(degraded_learner_list))) { PALF_LOG(WARN, "get_degraded_learner_list failed", K(ret), K_(palf_id), K_(self)); } else if (OB_UNLIKELY(degraded_learner_list.is_valid() && mm_->is_sync_to_degraded_learners())) { (void) log_engine_->submit_push_log_req(degraded_learner_list, PUSH_LOG, - curr_proposal_id, prev_log_pid, prev_lsn, lsn, log_write_buf); + curr_proposal_id, prev_log_pid, prev_lsn, lsn, log_write_buf, need_batch_push); } return ret; } @@ -1546,7 +1548,7 @@ int LogSlidingWindow::get_max_flushed_log_info_(LSN &lsn, int64_t &log_proposal_id) const { int ret = OB_SUCCESS; - RLockGuard guard(max_flushed_info_lock_); + common::ObSpinLockGuard guard(max_flushed_info_lock_); lsn = max_flushed_lsn_; end_lsn = max_flushed_end_lsn_; log_proposal_id = max_flushed_log_pid_; @@ -1717,7 +1719,7 @@ int LogSlidingWindow::inc_update_max_flushed_log_info_(const LSN &lsn, } else if (curr_max_flushed_end_lsn.is_valid() && curr_max_flushed_end_lsn >= end_lsn) { // no need update max_flushed_end_lsn_ } else { - WLockGuard guard(max_flushed_info_lock_); + common::ObSpinLockGuard guard(max_flushed_info_lock_); // double check if (max_flushed_end_lsn_.is_valid() && max_flushed_end_lsn_ >= end_lsn) { PALF_LOG(WARN, "arg end lsn is not larger than current, no need update", K_(palf_id), K_(self), @@ -1744,7 +1746,7 @@ int LogSlidingWindow::truncate_max_flushed_log_info_(const LSN &lsn, ret = OB_INVALID_ARGUMENT; PALF_LOG(WARN, "invalid argumetns", K(ret), K_(palf_id), K_(self), K(lsn), K(end_lsn), K(proposal_id)); } else { - WLockGuard guard(max_flushed_info_lock_); + common::ObSpinLockGuard guard(max_flushed_info_lock_); max_flushed_lsn_ = lsn; max_flushed_end_lsn_ = end_lsn; max_flushed_log_pid_ = proposal_id; @@ -1863,6 +1865,18 @@ bool LogSlidingWindow::need_execute_fetch_(const FetchTriggerType &fetch_trigger return bool_ret; } +bool LogSlidingWindow::need_use_batch_rpc_(const int64_t buf_size) const +{ + constexpr int64_t BATCH_PUSH_LOG_THRESHOLD = 4 * 1024; + // only use batch rpc when access mode is raw write and log size is smaller than BATCH_PUSH_LOG_THRESHOLD + // NB: BATCH_PUSH_LOG_THRESHOLD must be smaller than 256 * 1024 because of the buffer size of ObBatchRpc is 256 * 1024. + const bool need_batch_push = (mode_mgr_->can_raw_write() + && buf_size < BATCH_PUSH_LOG_THRESHOLD + && GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_2_1_2) + ? true : false; + return need_batch_push; +} + int LogSlidingWindow::try_fetch_log(const FetchTriggerType &fetch_log_type, const LSN prev_lsn, const LSN fetch_start_lsn, @@ -3294,6 +3308,8 @@ int LogSlidingWindow::receive_log(const common::ObAddr &src_server, } } } + // Note: can not use log_task below this line + guard.revert_log_task(); if (OB_SUCC(ret)) { bool is_committed_lsn_updated = false; @@ -3338,7 +3354,9 @@ int LogSlidingWindow::submit_push_log_resp_(const common::ObAddr &server, const LSN &log_end_lsn) { int ret = OB_SUCCESS; - if (state_mgr_->is_allow_vote() && OB_FAIL(log_engine_->submit_push_log_resp(server, msg_proposal_id, log_end_lsn))) { + const bool is_need_batch = need_use_batch_rpc_(0); + if (state_mgr_->is_allow_vote() && + OB_FAIL(log_engine_->submit_push_log_resp(server, msg_proposal_id, log_end_lsn, is_need_batch))) { PALF_LOG(WARN, "submit_push_log_resp failed", K(ret), K_(palf_id), K_(self), K(server), K(log_end_lsn)); } return ret; diff --git a/src/logservice/palf/log_sliding_window.h b/src/logservice/palf/log_sliding_window.h index d8ec910721..3eb76b25c6 100755 --- a/src/logservice/palf/log_sliding_window.h +++ b/src/logservice/palf/log_sliding_window.h @@ -470,6 +470,7 @@ private: const LSN &lsn, const LogWriteBuf &log_write_buf); bool need_execute_fetch_(const FetchTriggerType &fetch_trigger_type); + bool need_use_batch_rpc_(const int64_t buf_size) const; public: typedef common::ObLinearHashMap SvrMatchOffsetMap; static const int64_t TMP_HEADER_SER_BUF_LEN = 256; // log header序列化的临时buffer大小 @@ -524,7 +525,7 @@ private: // max_flushed_lsn_: start lsn of max flushed log, it can be used as prev_lsn for fetching log. // max_flushed_end_lsn_: end lsn of max flushed log, it can be used as start_lsn for fetching log. // max_flushed_log_pid_: the proposal_id of max flushed log. - mutable RWLock max_flushed_info_lock_; + mutable common::ObSpinLock max_flushed_info_lock_; LSN max_flushed_lsn_; LSN max_flushed_end_lsn_; int64_t max_flushed_log_pid_; diff --git a/src/logservice/palf/lsn_allocator.cpp b/src/logservice/palf/lsn_allocator.cpp index 3edbf40ead..4b061ef0d8 100644 --- a/src/logservice/palf/lsn_allocator.cpp +++ b/src/logservice/palf/lsn_allocator.cpp @@ -105,35 +105,64 @@ int LSNAllocator::inc_update_last_log_info(const LSN &lsn, const int64_t log_id, int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; - } else if (!lsn.is_valid() || !scn.is_valid() || OB_INVALID_LOG_ID == log_id) { + } else if (OB_UNLIKELY(!lsn.is_valid() || !scn.is_valid() || OB_INVALID_LOG_ID == log_id)) { ret = OB_INVALID_ARGUMENT; PALF_LOG(WARN, "invalid arguments", K(ret), K(lsn), K(scn), K(log_id)); } else { LSNTsMeta last; LSNTsMeta next; - while (true) { - WLockGuard guard(lock_); - LOAD128(last, &lsn_ts_meta_); - const int64_t cur_log_id = log_id_base_ + last.log_id_delta_; - next.log_id_delta_ = 0; - next.scn_delta_ = 0; - next.lsn_val_ = lsn.val_; - next.is_need_cut_ = 1; - if (log_id < cur_log_id) { - // no need update + bool need_update_base = false; + do { + if (need_update_base) { + WLockGuard guard(lock_); + LOAD128(last, &lsn_ts_meta_); + const int64_t cur_log_id = log_id_base_ + last.log_id_delta_; + next.log_id_delta_ = 0; + next.scn_delta_ = 0; + next.lsn_val_ = lsn.val_; + next.is_need_cut_ = 1; + + if (log_id < cur_log_id || lsn.val_ < last.lsn_val_) { + // no need update + } else if (CAS128(&lsn_ts_meta_, last, next)) { + log_id_base_ = log_id; + scn_base_ = scn.get_val_for_logservice(); + PALF_LOG(TRACE, "inc_update_last_log_info success", K(lsn), K(scn), K(log_id)); + } else { + ret = OB_ERR_UNEXPECTED; + PALF_LOG(ERROR, "CAS128 failed, unexpected", K(ret)); + } break; - } else if (next.lsn_val_ < last.lsn_val_) { - // no need update - break; - } else if (CAS128(&lsn_ts_meta_, last, next)) { - log_id_base_ = log_id; - scn_base_ = scn.get_val_for_logservice(); - PALF_LOG(TRACE, "inc_update_last_log_info success", K(lsn), K(scn), K(log_id)); - break; - } else { - PAUSE(); } - } + need_update_base = false; + RLockGuard guard(lock_); + while (OB_SUCC(ret)) { + LOAD128(last, &lsn_ts_meta_); + const int64_t cur_log_id = log_id_base_ + last.log_id_delta_; + if (log_id < cur_log_id || lsn.val_ < last.lsn_val_) { + // no need update + break; + } else if (log_id - log_id_base_ >= LOG_ID_DELTA_UPPER_BOUND) { + // log_id reaches the upper bound + need_update_base = true; + break; + } else if (scn.get_val_for_logservice() - scn_base_ >= LOG_TS_DELTA_UPPER_BOUND) { + // scn reaches the upper bound + need_update_base = true; + break; + } else { + next.log_id_delta_ = log_id - log_id_base_; + next.scn_delta_ = scn.get_val_for_logservice() - scn_base_; + next.lsn_val_ = lsn.val_; + next.is_need_cut_ = 1; + if (CAS128(&lsn_ts_meta_, last, next)) { + break; + } else { + PAUSE(); + } + } + } + } while (need_update_base); } return ret; } diff --git a/src/logservice/palf/palf_env.cpp b/src/logservice/palf/palf_env.cpp index 888f9a856e..056e77b426 100644 --- a/src/logservice/palf/palf_env.cpp +++ b/src/logservice/palf/palf_env.cpp @@ -43,6 +43,7 @@ int PalfEnv::create_palf_env( const char *base_dir, const common::ObAddr &self, rpc::frame::ObReqTransport *transport, + obrpc::ObBatchRpc *batch_rpc, common::ObILogAllocator *log_alloc_mgr, ILogBlockPool *log_block_pool, PalfMonitorCb *monitor, @@ -55,7 +56,7 @@ int PalfEnv::create_palf_env( } else if (OB_FAIL(FileDirectoryUtils::delete_tmp_file_or_directory_at(base_dir))) { CLOG_LOG(WARN, "delete_tmp_file_or_directory_at failed", K(ret), K(base_dir)); } else if (OB_FAIL(palf_env->palf_env_impl_.init(options, base_dir, self, obrpc::ObRpcNetHandler::CLUSTER_ID, - MTL_ID(), transport, + MTL_ID(), transport, batch_rpc, log_alloc_mgr, log_block_pool, monitor))) { PALF_LOG(WARN, "PalfEnvImpl init failed", K(ret), K(base_dir)); } else if (OB_FAIL(palf_env->start_())) { diff --git a/src/logservice/palf/palf_env.h b/src/logservice/palf/palf_env.h index fb433c9494..3e4e038eac 100644 --- a/src/logservice/palf/palf_env.h +++ b/src/logservice/palf/palf_env.h @@ -32,6 +32,10 @@ class ObReqTransport; } } +namespace obrpc +{ +class ObBatchRpc; +} namespace palf { class PalfRoleChangeCb; @@ -53,6 +57,7 @@ public: const char *base_dir, const common::ObAddr &self, rpc::frame::ObReqTransport *transport, + obrpc::ObBatchRpc *batch_rpc, common::ObILogAllocator *alloc_mgr, ILogBlockPool *log_block_pool, PalfMonitorCb *monitor, diff --git a/src/logservice/palf/palf_env_impl.cpp b/src/logservice/palf/palf_env_impl.cpp index d9006cc99d..5a3ed01f96 100644 --- a/src/logservice/palf/palf_env_impl.cpp +++ b/src/logservice/palf/palf_env_impl.cpp @@ -211,6 +211,7 @@ int PalfEnvImpl::init( const int64_t cluster_id, const int64_t tenant_id, rpc::frame::ObReqTransport *transport, + obrpc::ObBatchRpc *batch_rpc, common::ObILogAllocator *log_alloc_mgr, ILogBlockPool *log_block_pool, PalfMonitorCb *monitor) @@ -221,10 +222,10 @@ int PalfEnvImpl::init( if (is_inited_) { ret = OB_INIT_TWICE; PALF_LOG(ERROR, "PalfEnvImpl is inited twiced", K(ret)); - } else if (OB_ISNULL(base_dir) || !self.is_valid() || NULL == transport + } else if (OB_ISNULL(base_dir) || !self.is_valid() || NULL == transport || NULL == batch_rpc || OB_ISNULL(log_alloc_mgr) || OB_ISNULL(log_block_pool) || OB_ISNULL(monitor)) { ret = OB_INVALID_ARGUMENT; - PALF_LOG(ERROR, "invalid arguments", K(ret), KP(transport), K(base_dir), K(self), KP(transport), + PALF_LOG(ERROR, "invalid arguments", K(ret), KP(transport), KP(batch_rpc), K(base_dir), K(self), KP(transport), KP(log_alloc_mgr), KP(log_block_pool), KP(monitor)); } else if (OB_FAIL(init_log_io_worker_config_(options.disk_options_.log_writer_parallelism_, tenant_id, @@ -232,7 +233,7 @@ int PalfEnvImpl::init( PALF_LOG(WARN, "init_log_io_worker_config_ failed", K(options)); } else if (OB_FAIL(fetch_log_engine_.init(this, log_alloc_mgr))) { PALF_LOG(ERROR, "FetchLogEngine init failed", K(ret)); - } else if (OB_FAIL(log_rpc_.init(self, cluster_id, tenant_id, transport))) { + } else if (OB_FAIL(log_rpc_.init(self, cluster_id, tenant_id, transport, batch_rpc))) { PALF_LOG(ERROR, "LogRpc init failed", K(ret)); } else if (OB_FAIL(cb_thread_pool_.init(io_cb_num, this))) { PALF_LOG(ERROR, "LogIOTaskThreadPool init failed", K(ret)); diff --git a/src/logservice/palf/palf_env_impl.h b/src/logservice/palf/palf_env_impl.h index c7b1b3a121..d007316b75 100644 --- a/src/logservice/palf/palf_env_impl.h +++ b/src/logservice/palf/palf_env_impl.h @@ -47,6 +47,10 @@ namespace frame class ObReqTransport; } } +namespace obrpc +{ +class ObBatchRpc; +} namespace palf { class IPalfHandleImpl; @@ -215,6 +219,7 @@ public: const int64_t cluster_id, const int64_t tenant_id, rpc::frame::ObReqTransport *transport, + obrpc::ObBatchRpc *batch_rpc, common::ObILogAllocator *alloc_mgr, ILogBlockPool *log_block_pool, PalfMonitorCb *monitor); diff --git a/src/logservice/replayservice/ob_log_replay_service.cpp b/src/logservice/replayservice/ob_log_replay_service.cpp index 494b993007..68d58b7171 100644 --- a/src/logservice/replayservice/ob_log_replay_service.cpp +++ b/src/logservice/replayservice/ob_log_replay_service.cpp @@ -16,6 +16,7 @@ #include "logservice/ob_log_base_header.h" #include "logservice/ob_ls_adapter.h" #include "logservice/palf/palf_env.h" +#include "rootserver/ob_tenant_info_loader.h" // ObTenantInfoLoader #include "share/scn.h" #include "share/ob_thread_mgr.h" #include "share/rc/ob_tenant_base.h" @@ -668,6 +669,33 @@ int ObLogReplayService::get_replayable_point(SCN &replayable_scn) return ret; } +share::SCN ObLogReplayService::inner_get_replayable_point_() const +{ + int ret = OB_SUCCESS; + const int64_t ADVANCED_NS_VAL = 3 * 1000 * 1000 * 1000L; + share::SCN replayable_scn; + const share::SCN replayable_point = replayable_point_.atomic_load(); + + replayable_scn = replayable_point; + if (MTL_TENANT_ROLE_CACHE_IS_RESTORE()) { + rootserver::ObTenantInfoLoader *tenant_info_loader = MTL(rootserver::ObTenantInfoLoader*); + share::SCN recovery_until_scn; + if (OB_ISNULL(tenant_info_loader)) { + CLOG_LOG(WARN, "ObTenantInfoLoader is NULL", K(ret)); + } else if (OB_FAIL(tenant_info_loader->get_recovery_until_scn(recovery_until_scn))) { + if (REACH_TIME_INTERVAL(5 * 1000 * 1000)) { + CLOG_LOG(WARN, "get_recovery_until_scn failed", K(ret)); + } + } else { + replayable_scn = SCN::min(SCN::plus(replayable_point, ADVANCED_NS_VAL), recovery_until_scn); + } + if (OB_UNLIKELY(false == replayable_scn.is_valid())) { + replayable_scn = replayable_point; + } + } else { } + return replayable_scn; +} + int ObLogReplayService::stat_for_each(const common::ObFunction &func) { auto stat_func = [&func](const ObLSID &id, ObReplayStatus *replay_status) -> bool { @@ -973,7 +1001,7 @@ int ObLogReplayService::fetch_and_submit_single_log_(ObReplayStatus &replay_stat ObLogBaseHeader header; int64_t header_pos = 0; ObLogReplayTask *replay_task = NULL; - const SCN &replayable_point = replayable_point_.atomic_load(); + const SCN &replayable_point = inner_get_replayable_point_(); if (OB_UNLIKELY(OB_ISNULL(submit_task))) { ret = OB_ERR_UNEXPECTED; CLOG_LOG(WARN, "submit task is NULL when fetch log", K(id), K(replay_status), KPC(submit_task), K(ret)); @@ -1089,7 +1117,7 @@ int ObLogReplayService::handle_submit_task_(ObReplayServiceSubmitTask *submit_ta if (!replay_status->is_enabled_without_lock() || !replay_status->need_submit_log()) { need_submit_log = false; } else { - const SCN &replayable_point = replayable_point_.atomic_load(); + const SCN &replayable_point = inner_get_replayable_point_(); need_submit_log = submit_task->has_remained_submit_log(replayable_point, iterate_end_by_replayable_point); if (!need_submit_log) { diff --git a/src/logservice/replayservice/ob_log_replay_service.h b/src/logservice/replayservice/ob_log_replay_service.h index 69af0b5529..b3dff482a7 100644 --- a/src/logservice/replayservice/ob_log_replay_service.h +++ b/src/logservice/replayservice/ob_log_replay_service.h @@ -209,6 +209,7 @@ private: void on_replay_error_(); // 析构前调用,归还所有日志流的replay status计数 int remove_all_ls_(); + share::SCN inner_get_replayable_point_() const; private: const int64_t MAX_REPLAY_TIME_PER_ROUND = 10 * 1000; //10ms const int64_t MAX_SUBMIT_TIME_PER_ROUND = 100 * 1000; //100ms diff --git a/src/logservice/restoreservice/ob_log_restore_handler.cpp b/src/logservice/restoreservice/ob_log_restore_handler.cpp index ab6605bb97..6e8f912193 100644 --- a/src/logservice/restoreservice/ob_log_restore_handler.cpp +++ b/src/logservice/restoreservice/ob_log_restore_handler.cpp @@ -353,7 +353,7 @@ int ObLogRestoreHandler::raw_write(const int64_t proposal_id, while (wait_times < MAX_RAW_WRITE_RETRY_TIMES) { do { ret = OB_SUCCESS; - WLockGuard guard(lock_); + RLockGuard guard(lock_); if (IS_NOT_INIT) { ret = OB_NOT_INIT; } else if (is_in_stop_state_) { @@ -383,13 +383,6 @@ int ObLogRestoreHandler::raw_write(const int64_t proposal_id, if (OB_SUCC(ret)) { uint64_t tenant_id = palf_env_->get_palf_env_impl()->get_tenant_id(); EVENT_TENANT_ADD(ObStatEventIds::RESTORE_WRITE_LOG_SIZE, buf_size, tenant_id); - context_.max_fetch_lsn_ = lsn + buf_size; - context_.max_fetch_scn_ = scn; - context_.last_fetch_ts_ = ObTimeUtility::fast_current_time(); - if (parent_->set_to_end(scn)) { - // To stop and clear all restore log tasks and restore context, reset context and advance issue version - CLOG_LOG(INFO, "restore log to_end succ", KPC(this), KPC(parent_)); - } } } } @@ -410,6 +403,38 @@ int ObLogRestoreHandler::raw_write(const int64_t proposal_id, return ret; } +int ObLogRestoreHandler::update_max_fetch_info(const int64_t proposal_id, + const palf::LSN &lsn, + const SCN &scn) +{ + int ret = OB_SUCCESS; + WLockGuard guard(lock_); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (is_in_stop_state_) { + ret = OB_IN_STOP_STATE; + } else if (LEADER != role_) { + ret = OB_NOT_MASTER; + } else if (proposal_id != proposal_id_) { + ret = OB_NOT_MASTER; + CLOG_LOG(INFO, "stale task, just skip", K(proposal_id), K(proposal_id_), K(lsn), K(id_)); + } else if (OB_UNLIKELY(!lsn.is_valid() || !scn.is_valid())) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "invalid lsn or scn", K(proposal_id), K(lsn), K(scn), KPC(this)); + } else if (context_.max_fetch_lsn_.is_valid() && context_.max_fetch_lsn_ >= lsn) { + // do nothing + } else { + context_.max_fetch_lsn_ = lsn; + context_.max_fetch_scn_ = scn; + context_.last_fetch_ts_ = ObTimeUtility::fast_current_time(); + if (parent_->set_to_end(scn)) { + // To stop and clear all restore log tasks and restore context, reset context and advance issue version + CLOG_LOG(INFO, "restore log to_end succ", KPC(this), KPC(parent_)); + } + } + return ret; +} + void ObLogRestoreHandler::deep_copy_source(ObRemoteSourceGuard &source_guard) { RLockGuard guard(lock_); @@ -802,7 +827,7 @@ int ObLogRestoreHandler::refresh_error_context() if (IS_NOT_INIT) { ret = OB_NOT_INIT; } else if (! is_strong_leader(role_)) { - CLOG_LOG(INFO, "not leader, no need refresh error context", K(id_)); + CLOG_LOG(TRACE, "not leader, no need refresh error context", K(id_)); } else if (OB_FAIL(palf_handle_.get_end_lsn(end_lsn))) { CLOG_LOG(WARN, "get end_lsn failed", K(id_)); } else if (end_lsn > context_.error_context_.err_lsn_ && OB_SUCCESS != context_.error_context_.ret_code_) { diff --git a/src/logservice/restoreservice/ob_log_restore_handler.h b/src/logservice/restoreservice/ob_log_restore_handler.h index 86b29baeb9..e45ecb7a6f 100644 --- a/src/logservice/restoreservice/ob_log_restore_handler.h +++ b/src/logservice/restoreservice/ob_log_restore_handler.h @@ -168,6 +168,13 @@ public: const share::SCN &scn, const char *buf, const int64_t buf_size); + // @brief update max fetch info + // @param[in] const int64_t, proposal_id used to distinguish stale logs after flashback + // @param[in] const palf::LSN, the max_lsn submitted + // @param[in] const int64_t, the max_scn submitted + int update_max_fetch_info(const int64_t proposal_id, + const palf::LSN &lsn, + const share::SCN &scn); // @brief check if need update fetch log source, // ONLY return true if role of RestoreHandler is LEADER bool need_update_source() const; diff --git a/src/logservice/restoreservice/ob_log_restore_scheduler.cpp b/src/logservice/restoreservice/ob_log_restore_scheduler.cpp index f0cd9da2ec..c1e35e6901 100644 --- a/src/logservice/restoreservice/ob_log_restore_scheduler.cpp +++ b/src/logservice/restoreservice/ob_log_restore_scheduler.cpp @@ -78,17 +78,24 @@ int ObLogRestoreScheduler::modify_thread_count_(const share::ObLogRestoreSourceT { int ret = OB_SUCCESS; int64_t restore_concurrency = 0; + const int64_t MIN_LOG_RESTORE_CONCURRENCY = 1; + const int64_t MAX_LOG_RESTORE_CONCURRENCY = 100; // for primary tenant, set restore_concurrency to 1. // otherwise, set restore_concurrency to tenant config. if (MTL_GET_TENANT_ROLE_CACHE() == share::ObTenantRole::PRIMARY_TENANT || !share::is_location_log_source_type(source_type)) { - restore_concurrency = 1; + restore_concurrency = MIN_LOG_RESTORE_CONCURRENCY; } else { omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id_)); if (!tenant_config.is_valid()) { - restore_concurrency = 1L; + restore_concurrency = MIN_LOG_RESTORE_CONCURRENCY; } else if (0 == tenant_config->log_restore_concurrency) { - restore_concurrency = MTL_CPU_COUNT(); + const int64_t max_cpu = MTL_CPU_COUNT(); + if (max_cpu <= 8) { + restore_concurrency = max_cpu * 4; + } else { + restore_concurrency = MAX_LOG_RESTORE_CONCURRENCY; + } } else { restore_concurrency = tenant_config->log_restore_concurrency; } diff --git a/src/logservice/restoreservice/ob_log_restore_service.cpp b/src/logservice/restoreservice/ob_log_restore_service.cpp index 261a9db86a..92240c4ea7 100644 --- a/src/logservice/restoreservice/ob_log_restore_service.cpp +++ b/src/logservice/restoreservice/ob_log_restore_service.cpp @@ -33,8 +33,6 @@ using namespace oceanbase::storage; using namespace oceanbase::palf; ObLogRestoreService::ObLogRestoreService() : inited_(false), - last_normal_work_ts_(OB_INVALID_TIMESTAMP), - last_update_restore_upper_limit_ts_(OB_INVALID_TIMESTAMP), ls_svr_(NULL), proxy_(), location_adaptor_(), @@ -110,8 +108,6 @@ void ObLogRestoreService::destroy() allocator_.destroy(); scheduler_.destroy(); ls_svr_ = NULL; - last_normal_work_ts_ = OB_INVALID_TIMESTAMP; - last_update_restore_upper_limit_ts_ = OB_INVALID_TIMESTAMP; } int ObLogRestoreService::start() @@ -166,7 +162,7 @@ void ObLogRestoreService::run1() LOG_ERROR_RET(OB_NOT_INIT, "ObLogRestoreService not init", "tenant_id", MTL_ID()); } else { while (! has_set_stop()) { - int64_t begin_stamp = ObTimeUtility::current_time(); + int64_t begin_stamp = ObTimeUtility::fast_current_time(); const bool is_primary = MTL_GET_TENANT_ROLE_CACHE() == share::ObTenantRole::PRIMARY_TENANT; const int64_t thread_interval = is_primary ? PRIMARY_THREAD_RUN_INTERVAL : STANDBY_THREAD_RUN_INTERVAL; do_thread_task_(); @@ -184,25 +180,22 @@ void ObLogRestoreService::do_thread_task_() { int ret = OB_SUCCESS; if (is_user_tenant(MTL_ID())) { - if (need_schedule_()) { - share::ObLogRestoreSourceItem source; - bool source_exist = false; + share::ObLogRestoreSourceItem source; + bool source_exist = false; - if (OB_FAIL(update_upstream_(source, source_exist))) { - LOG_WARN("update_upstream_ failed"); - } else if (source_exist) { - // log restore source exist, do schedule - // source_exist means tenant_role is standby or restore and log_restore_source exists - schedule_fetch_log_(source); - } else { - // tenant_role not match or log_restore_source not exist - clean_resource_(); - } - - schedule_resource_(source.type_); - report_error_(); - last_normal_work_ts_ = common::ObTimeUtility::fast_current_time(); + if (OB_FAIL(update_upstream_(source, source_exist))) { + LOG_WARN("update_upstream_ failed"); + } else if (source_exist) { + // log restore source exist, do schedule + // source_exist means tenant_role is standby or restore and log_restore_source exists + schedule_fetch_log_(source); + } else { + // tenant_role not match or log_restore_source not exist + clean_resource_(); } + + schedule_resource_(source.type_); + report_error_(); update_restore_upper_limit_(); refresh_error_context_(); set_compressor_type_(); @@ -297,10 +290,5 @@ void ObLogRestoreService::refresh_error_context_() } } } - -bool ObLogRestoreService::need_schedule_() const -{ - return common::ObTimeUtility::fast_current_time() - last_normal_work_ts_ > SCHEDULE_INTERVAL; -} } // namespace logservice } // namespace oceanbase diff --git a/src/logservice/restoreservice/ob_log_restore_service.h b/src/logservice/restoreservice/ob_log_restore_service.h index 50c9cc36e6..654f3d8ae2 100644 --- a/src/logservice/restoreservice/ob_log_restore_service.h +++ b/src/logservice/restoreservice/ob_log_restore_service.h @@ -81,12 +81,9 @@ private: void update_restore_upper_limit_(); void set_compressor_type_(); void refresh_error_context_(); - bool need_schedule_() const; private: bool inited_; - int64_t last_normal_work_ts_; - int64_t last_update_restore_upper_limit_ts_; ObLSService *ls_svr_; ObLogResSvrRpc proxy_; ObRemoteLocationAdaptor location_adaptor_; diff --git a/src/logservice/restoreservice/ob_remote_log_writer.cpp b/src/logservice/restoreservice/ob_remote_log_writer.cpp index 53c6d004bc..67150284b4 100644 --- a/src/logservice/restoreservice/ob_remote_log_writer.cpp +++ b/src/logservice/restoreservice/ob_remote_log_writer.cpp @@ -223,6 +223,10 @@ int ObRemoteLogWriter::submit_entries_(ObFetchLogTask &task) int64_t size = 0; LSN lsn; const ObLSID id = task.id_; + const int64_t proposal_id = task.proposal_id_; + int64_t entry_size = 0; + LSN max_submit_lsn; + SCN max_submit_scn; while (OB_SUCC(ret) && ! has_set_stop()) { if (OB_FAIL(task.iter_.next(entry, lsn, buf, size))) { if (OB_ITER_END != ret) { @@ -235,19 +239,32 @@ int ObRemoteLogWriter::submit_entries_(ObFetchLogTask &task) LOG_WARN("entry is invalid", K(entry), K(lsn), K(task)); } else if (task.cur_lsn_ > lsn) { LOG_INFO("repeated log, just skip", K(lsn), K(entry), K(task)); - } else if (OB_FAIL(submit_log_(id, task.proposal_id_, lsn, - entry.get_scn(), buf, entry.get_serialize_size()))) { + } else if (FALSE_IT(entry_size = entry.get_serialize_size())) { + } else if (OB_FAIL(submit_log_(id, proposal_id, lsn, + entry.get_scn(), buf, entry_size))) { LOG_WARN("submit log failed", K(buf), K(entry), K(lsn), K(task)); } else { - task.cur_lsn_ = lsn + entry.get_serialize_size(); + task.cur_lsn_ = lsn + entry_size; + max_submit_lsn = lsn + entry_size; + max_submit_scn = entry.get_scn(); } } // while + if (OB_ITER_END == ret) { if (lsn.is_valid()) { LOG_INFO("submit_entries_ succ", K(id), K(lsn), K(entry.get_scn()), K(task)); } ret = OB_SUCCESS; } + + if (max_submit_lsn.is_valid() && max_submit_scn.is_valid()) { + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(update_max_fetch_info_(id, proposal_id, max_submit_lsn, max_submit_scn))) { + LOG_WARN("update max fetch info failed", K(id), K(proposal_id), K(max_submit_lsn), K(max_submit_scn)); + } else { + LOG_INFO("update max fetch context succ", K(id), K(proposal_id), K(max_submit_lsn), K(max_submit_scn)); + } + } return ret; } @@ -281,6 +298,20 @@ int ObRemoteLogWriter::submit_log_(const ObLSID &id, return ret; } +int ObRemoteLogWriter::update_max_fetch_info_(const ObLSID &id, + const int64_t proposal_id, + const palf::LSN &lsn, + const share::SCN &scn) +{ + int ret = OB_SUCCESS; + GET_RESTORE_HANDLER_CTX(id) { + if (OB_FAIL(restore_handler->update_max_fetch_info(proposal_id, lsn, scn))) { + LOG_WARN("update max fetch info failed", K(id), K(proposal_id), K(lsn), K(scn)); + } + } + return ret; +} + int ObRemoteLogWriter::try_retire_(ObFetchLogTask *&task) { int ret = OB_SUCCESS; diff --git a/src/logservice/restoreservice/ob_remote_log_writer.h b/src/logservice/restoreservice/ob_remote_log_writer.h index dd7275d4ac..9b2ccb0417 100644 --- a/src/logservice/restoreservice/ob_remote_log_writer.h +++ b/src/logservice/restoreservice/ob_remote_log_writer.h @@ -61,6 +61,8 @@ private: int submit_entries_(ObFetchLogTask &task); int submit_log_(const share::ObLSID &id, const int64_t proposal_id, const palf::LSN &lsn, const share::SCN &scn, const char *buf, const int64_t buf_size); + int update_max_fetch_info_(const share::ObLSID &id, const int64_t proposal_id, + const palf::LSN &lsn, const share::SCN &scn); int try_retire_(ObFetchLogTask *&task); void inner_free_task_(ObFetchLogTask &task); void report_error_(const share::ObLSID &id, diff --git a/src/rootserver/ob_tenant_info_loader.cpp b/src/rootserver/ob_tenant_info_loader.cpp index 1eb43df843..e59e3fa24d 100644 --- a/src/rootserver/ob_tenant_info_loader.cpp +++ b/src/rootserver/ob_tenant_info_loader.cpp @@ -501,6 +501,19 @@ int ObTenantInfoLoader::get_sync_scn(share::SCN &sync_scn) return ret; } +int ObTenantInfoLoader::get_recovery_until_scn(share::SCN &recovery_until_scn) +{ + int ret = OB_SUCCESS; + share::ObAllTenantInfo tenant_info; + recovery_until_scn.set_invalid(); + if (OB_FAIL(get_tenant_info(tenant_info))) { + LOG_WARN("failed to get tenant info", KR(ret)); + } else { + recovery_until_scn = tenant_info.get_recovery_until_scn(); + } + return ret; +} + int ObTenantInfoLoader::get_tenant_info(share::ObAllTenantInfo &tenant_info) { int ret = OB_SUCCESS; diff --git a/src/rootserver/ob_tenant_info_loader.h b/src/rootserver/ob_tenant_info_loader.h index b647f25987..3e9ada2d02 100644 --- a/src/rootserver/ob_tenant_info_loader.h +++ b/src/rootserver/ob_tenant_info_loader.h @@ -130,6 +130,15 @@ public: */ int get_sync_scn(share::SCN &sync_scn); + /** + * @description: + * get tenant recovery_until_scn. + * for SYS/META tenant: there isn't recovery_until_scn + * for user tenant: get recovery_until_scn from __all_tenant_info cache + * @param[out] recovery_until_scn + */ + int get_recovery_until_scn(share::SCN &recovery_until_scn); + /** * @description: * get tenant is_standby_normal_status diff --git a/src/share/rpc/ob_batch_processor.cpp b/src/share/rpc/ob_batch_processor.cpp index 190db4a75e..c72b8b85a8 100644 --- a/src/share/rpc/ob_batch_processor.cpp +++ b/src/share/rpc/ob_batch_processor.cpp @@ -17,6 +17,8 @@ #include "observer/omt/ob_tenant.h" #include "storage/tx/ob_trans_service.h" #include "lib/utility/serialization.h" +#include "logservice/ob_log_service.h" +#include "logservice/palf/log_request_handler.h" namespace oceanbase { @@ -69,7 +71,10 @@ int ObBatchP::process() ret = OB_NOT_SUPPORTED; break; case CLOG_BATCH_REQ: - ret = OB_NOT_SUPPORTED; + if (OB_SUCCESS == ls_id.deserialize(buf, req->size_, pos)) { + handle_log_req(sender, msg_type, ls_id, buf + pos, req->size_ - (int32_t)pos); + } + clog_batch_cnt++; break; case ELECTION_BATCH_REQ: ret = OB_NOT_SUPPORTED; @@ -166,5 +171,44 @@ int ObBatchP::handle_sql_req(common::ObAddr& sender, int type, const char* buf, } return ret; } + +int ObBatchP::handle_log_req(const common::ObAddr& sender, int type, const share::ObLSID &ls_id, const char* buf, int32_t size) +{ + int ret = OB_SUCCESS; + ObReqTimestamp req_ts; + req_ts.receive_timestamp_ = get_receive_timestamp(); + req_ts.enqueue_timestamp_ = get_enqueue_timestamp(); + req_ts.run_timestamp_ = get_run_timestamp(); + logservice::ObLogService *log_service = MTL(logservice::ObLogService*); + palf::IPalfEnvImpl *palf_env_impl = NULL; + + #define __LOG_BATCH_PROCESS_REQ(TYPE) \ + palf::LogRequestHandler log_request_handler(palf_env_impl); \ + TYPE req; \ + int64_t pos = 0; \ + if (OB_FAIL(req.deserialize(buf, size, pos))) { \ + RPC_LOG(ERROR, "deserialize rpc failed", K(ret), KP(buf), K(size)); \ + } else if (OB_FAIL(log_request_handler.handle_request(ls_id.id(), sender, req))) { \ + RPC_LOG(TRACE, "handle_request failed", K(ret), K(ls_id), K(sender), K(req)); \ + } else { \ + RPC_LOG(TRACE, "handle_log_request success", K(ret), K(ls_id), K(sender), K(req)); \ + } + if (OB_ISNULL(log_service)) { + ret = OB_ERR_UNEXPECTED; + RPC_LOG(ERROR, "log_service is nullptr", K(ret), K(log_service)); + } else if (OB_ISNULL(log_service->get_palf_env())) { + ret = OB_ERR_UNEXPECTED; + RPC_LOG(ERROR, "palf_env is nullptr", K(ret), KP(log_service)); + } else if (FALSE_IT(palf_env_impl = log_service->get_palf_env()->get_palf_env_impl())) { + } else if (palf::LOG_BATCH_PUSH_LOG_REQ == type) { + __LOG_BATCH_PROCESS_REQ(palf::LogPushReq); + } else if (palf::LOG_BATCH_PUSH_LOG_RESP == type) { + __LOG_BATCH_PROCESS_REQ(palf::LogPushResp); + } else { + RPC_LOG(ERROR, "invalid sub_type", K(ret), K(type)); + } + #undef __LOG_BATCH_PROCESS_REQ + return ret; +} }; // end namespace rpc }; // end namespace oceanbase diff --git a/src/share/rpc/ob_batch_processor.h b/src/share/rpc/ob_batch_processor.h index af60bc1b2a..4dda4ea125 100644 --- a/src/share/rpc/ob_batch_processor.h +++ b/src/share/rpc/ob_batch_processor.h @@ -31,6 +31,7 @@ protected: int handle_trx_req(common::ObAddr& sender, int type, const char* buf, int32_t size); int handle_tx_req(int type, const char* buf, int32_t size); int handle_sql_req(common::ObAddr& sender, int type, const char* buf, int32_t size); + int handle_log_req(const common::ObAddr& sender, int type, const share::ObLSID &ls_id, const char* buf, int32_t size); private: DISALLOW_COPY_AND_ASSIGN(ObBatchP); }; diff --git a/src/share/rpc/ob_batch_proxy.cpp b/src/share/rpc/ob_batch_proxy.cpp index 7971a953d0..7e6b85e43c 100644 --- a/src/share/rpc/ob_batch_proxy.cpp +++ b/src/share/rpc/ob_batch_proxy.cpp @@ -13,6 +13,7 @@ #include "ob_batch_proxy.h" #include "share/config/ob_server_config.h" #include "share/ob_cluster_version.h" +#include "share/resource_manager/ob_cgroup_ctrl.h" namespace oceanbase { @@ -100,8 +101,11 @@ int ObBatchRpcProxy::post_batch(uint64_t tenant_id, const common::ObAddr &addr, int ret = OB_SUCCESS; static BatchCallBack s_cb; BatchCallBack *cb = &s_cb; - - ret = this->to(addr).dst_cluster_id(dst_cluster_id).by(tenant_id).as(OB_SERVER_TENANT_ID).post_packet(pkt, cb); + if (CLOG_BATCH_REQ == batch_type) { + ret = this->to(addr).dst_cluster_id(dst_cluster_id).by(tenant_id).group_id(share::OBCG_CLOG).post_packet(pkt, cb); + } else { + ret = this->to(addr).dst_cluster_id(dst_cluster_id).by(tenant_id).as(OB_SERVER_TENANT_ID).post_packet(pkt, cb); + } return ret; } diff --git a/src/share/rpc/ob_batch_proxy.h b/src/share/rpc/ob_batch_proxy.h index fb448a3328..290ff5a69c 100644 --- a/src/share/rpc/ob_batch_proxy.h +++ b/src/share/rpc/ob_batch_proxy.h @@ -36,13 +36,13 @@ enum { inline int64_t get_batch_delay_us(const int batch_type) { - int64_t delay[BATCH_REQ_TYPE_COUNT] = {2 * 1000, 2 * 1000, 1 * 1000, 0, 0, 0, 0, 0}; + int64_t delay[BATCH_REQ_TYPE_COUNT] = {2 * 1000, 2 * 1000, 500, 0, 0, 0, 0, 0}; return (batch_type >= 0 && batch_type < BATCH_REQ_TYPE_COUNT) ? delay[batch_type]: 0; } inline int64_t get_batch_buffer_size(const int batch_type) { - int64_t batch_buffer_size_k[BATCH_REQ_TYPE_COUNT] = {256, 256, 2048, 2048, 256, 256, 256, 2048}; + int64_t batch_buffer_size_k[BATCH_REQ_TYPE_COUNT] = {256, 256, 256, 2048, 256, 256, 256, 2048}; return batch_buffer_size_k[batch_type] * 1024; } diff --git a/unittest/logservice/mock_logservice_container/mock_log_engine.h b/unittest/logservice/mock_logservice_container/mock_log_engine.h index f8534a87b0..a456410888 100644 --- a/unittest/logservice/mock_logservice_container/mock_log_engine.h +++ b/unittest/logservice/mock_logservice_container/mock_log_engine.h @@ -318,12 +318,14 @@ public: int submit_push_log_resp( const common::ObAddr &server, const int64_t &msg_proposal_id, - const LSN &lsn) override + const LSN &lsn, + const bool is_batch) override { int ret = OB_SUCCESS; UNUSED(server); UNUSED(msg_proposal_id); UNUSED(lsn); + UNUSED(is_batch); return ret; } diff --git a/unittest/logservice/test_lsn_allocator.cpp b/unittest/logservice/test_lsn_allocator.cpp index 3b5b0ae2d8..e2b9f2733a 100644 --- a/unittest/logservice/test_lsn_allocator.cpp +++ b/unittest/logservice/test_lsn_allocator.cpp @@ -386,6 +386,98 @@ TEST_F(TestLSNAllocator, test_alloc_offset_multi_thread) } }; +// 测试多线程 inc_update_last_log_info +const int64_t UPDATE_MAX_LSN_CNT = 1 * 1000 * 1000; +const int64_t GAP_PER_THREAD = 0; +const int64_t LOG_ID_BASE = LSNAllocator::LOG_ID_DELTA_UPPER_BOUND - 1000; +const int64_t SCN_BASE = LSNAllocator::LOG_TS_DELTA_UPPER_BOUND - 1000; +class UpdateMaxLSNTestThread +{ +public: + UpdateMaxLSNTestThread() {} + virtual ~UpdateMaxLSNTestThread() { } +public: + void create_and_run(int64_t th_idx) + { + th_idx_ = th_idx; + if (0 != pthread_create(&thread_, NULL, routine, this)){ + PALF_LOG_RET(ERROR, OB_ERR_SYS, "create thread fail", K(thread_)); + } else { + PALF_LOG(INFO, "create thread success", K(thread_), K(th_idx_)); + } + } + + void join() + { + pthread_join(thread_, NULL); + } + + static void* routine(void *arg) { + ob_usleep(1000); + UpdateMaxLSNTestThread *test_thread = static_cast(arg); + const int64_t start_number = GAP_PER_THREAD * test_thread->th_idx_; + for (int64_t i = 0; i < UPDATE_MAX_LSN_CNT; i++) { + int64_t log_id = LOG_ID_BASE + start_number + i; + LSN lsn(log_id); + share::SCN scn; + EXPECT_EQ(OB_SUCCESS, scn.convert_from_ts(SCN_BASE + start_number + i)); + EXPECT_EQ(OB_SUCCESS, global_lsn_allocator.inc_update_last_log_info(lsn, log_id, scn)); + } + return NULL; + } +public: + pthread_t thread_; + int64_t th_idx_; +}; + +TEST_F(TestLSNAllocator, test_update_max_lsn) +{ + int ret = OB_SUCCESS; + LSNAllocator lsn_allocator; + LSN start_lsn(0); + EXPECT_EQ(OB_SUCCESS, lsn_allocator.init(0, share::SCN::base_scn(), start_lsn)); + lsn_allocator.lsn_ts_meta_.log_id_delta_ = LSNAllocator::LOG_ID_DELTA_UPPER_BOUND - 1000; + lsn_allocator.lsn_ts_meta_.scn_delta_ = LSNAllocator::LOG_TS_DELTA_UPPER_BOUND - 1000; + PALF_LOG(INFO, "inc_update_last_log_info before", K(lsn_allocator)); + int64_t begin_ts_ns = ObTimeUtility::current_time_ns(); + for (int64_t i = 0; i < 1 * 1000 * 1000L; i++) { + LSN lsn(i+1); + int64_t log_id = i + LSNAllocator::LOG_ID_DELTA_UPPER_BOUND - 1000; + share::SCN scn; + scn.convert_for_logservice(i + 2 + LSNAllocator::LOG_TS_DELTA_UPPER_BOUND - 1000); + if (OB_FAIL(lsn_allocator.inc_update_last_log_info(lsn, log_id, scn))) { + PALF_LOG(INFO, "inc_update_last_log_info failed", K(i), K(lsn), K(log_id), K(scn)); + } + } + const int64_t cost_time_ns = ObTimeUtility::current_time_ns() - begin_ts_ns; + PALF_LOG(INFO, "inc_update_last_log_info finish", K(cost_time_ns), K(lsn_allocator)); + + const int64_t THREAD_CNT = 64; + const int64_t target_cnt = (THREAD_CNT - 1) * GAP_PER_THREAD + UPDATE_MAX_LSN_CNT - 1; + const int64_t target_log_id = LOG_ID_BASE + target_cnt; + LSN target_lsn(target_log_id); + share::SCN target_scn; + EXPECT_EQ(OB_SUCCESS, target_scn.convert_from_ts(SCN_BASE + target_cnt)); + global_lsn_allocator.reset(); + EXPECT_EQ(OB_SUCCESS, global_lsn_allocator.init(0, share::SCN::min_scn(), LSN(0))); + UpdateMaxLSNTestThread threads[THREAD_CNT]; + begin_ts_ns = ObTimeUtility::current_time_ns(); + for (int tidx = 0; tidx < THREAD_CNT; ++tidx) { + threads[tidx].create_and_run(tidx); + PALF_LOG(INFO, "create thread", K(tidx)); + } + for (int tidx = 0; tidx < THREAD_CNT; ++tidx) { + threads[tidx].join(); + } + EXPECT_EQ(target_log_id, global_lsn_allocator.get_max_log_id()); + EXPECT_EQ(target_scn, global_lsn_allocator.get_max_scn()); + LSN curr_end_lsn; + EXPECT_EQ(OB_SUCCESS, global_lsn_allocator.get_curr_end_lsn(curr_end_lsn)); + EXPECT_EQ(target_lsn, curr_end_lsn); + const int64_t parallel_cost_time_ns = ObTimeUtility::current_time_ns() - begin_ts_ns; + PALF_LOG(INFO, "inc_update_last_log_info parallel finish", K(parallel_cost_time_ns), K(global_lsn_allocator)); +} + } // END of unittest } // end of oceanbase @@ -394,7 +486,7 @@ int main(int argc, char **argv) { system("rm -rf ./test_lsn_allocator.log*"); OB_LOGGER.set_file_name("test_lsn_allocator.log", true); - OB_LOGGER.set_log_level("INFO"); + OB_LOGGER.set_log_level("TRACE"); PALF_LOG(INFO, "begin unittest::test_lsn_allocator"); ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();