/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #include #include #include #include "share/ob_define.h" #include "storage/ob_storage_log_type.h" #include "storage/transaction/ob_trans_log.h" #include "obcdc/src/ob_log_instance.h" #include "obcdc/src/ob_log_fetcher_stream.h" #include "obcdc/src/ob_log_fetcher_part_stream.h" #include "ob_log_utils.h" // get_timestamp using namespace oceanbase; using namespace common; using namespace liboblog; using namespace fetcher; using namespace transaction; using namespace storage; using namespace clog; namespace oceanbase { namespace unittest { /* * Utils. */ typedef std::vector Svrs; typedef std::vector PKeys; typedef std::vector LogIds; typedef std::vector Tstamps; /* * Mock Rpc Interface 1. * It owns N partitions, each has M log entries. * It returns L log entries in each fetch_log() call. * Log entry contains nothing. * Used to test: * - add partition into Stream. * - update file id & offset. * - fetch log. * - kick out offline partitions. * - discard partitions. */ class MockRpcInterface1 : public IFetcherRpcInterface { struct Entry { file_id_t file_id_; offset_t offset_; uint64_t log_id_; ObPartitionKey pkey_; }; // Use offset_ as index of Entry in EntryVec. typedef std::vector EntryVec; public: MockRpcInterface1(const PKeys &pkeys, const int64_t log_entry_per_p, const int64_t log_entry_per_call) { log_entry_per_call_ = log_entry_per_call; log_entry_per_p_ = log_entry_per_p; addr_ = ObAddr(ObAddr::IPV4, "127.0.0.1", 5999); // Gen entries. int64_t log_entry_cnt = 0; for (int64_t log_id = 1; log_id < log_entry_per_p + 1; ++log_id) { for (int64_t pidx = 0, cnt = pkeys.size(); pidx < cnt; ++pidx) { // Gen entry. Entry entry; entry.pkey_ = pkeys.at(pidx); entry.file_id_ = 1; entry.offset_ = static_cast(log_entry_cnt++); entry.log_id_ = log_id; // Save it. entries_.push_back(entry); } } } virtual ~MockRpcInterface1() { } virtual void set_svr(const common::ObAddr& svr) { UNUSED(svr); } virtual const ObAddr& get_svr() const { static ObAddr svr; return svr; } virtual void set_timeout(const int64_t timeout) { UNUSED(timeout); } virtual int req_start_log_id_by_ts( const obrpc::ObLogReqStartLogIdByTsRequest& req, obrpc::ObLogReqStartLogIdByTsResponse& res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int req_start_log_id_by_ts_2(const obrpc::ObLogReqStartLogIdByTsRequestWithBreakpoint &req, obrpc::ObLogReqStartLogIdByTsResponseWithBreakpoint &res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int req_start_pos_by_log_id_2(const obrpc::ObLogReqStartPosByLogIdRequestWithBreakpoint &req, obrpc::ObLogReqStartPosByLogIdResponseWithBreakpoint &res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int req_start_pos_by_log_id( const obrpc::ObLogReqStartPosByLogIdRequest& req, obrpc::ObLogReqStartPosByLogIdResponse& res) { /* * Err Supportted: * - OB_SUCCESS * - OB_ENTRY_NOT_EXIST: partition exists without any log * - OB_ERR_OUT_OF_LOWER_BOUND: log id beyond lower bound * - OB_ERR_OUT_OF_UPPER_BOUND: log id beyond upper bound */ typedef obrpc::ObLogReqStartPosByLogIdRequest::Param Param; typedef obrpc::ObLogReqStartPosByLogIdResponse::Result Result; for (int64_t idx = 0, cnt = req.get_params().count(); idx < cnt; ++idx) { // Locate for a partition. const Param ¶m = req.get_params().at(idx); const ObPartitionKey &pkey = param.pkey_; const uint64_t start_log_id = param.start_log_id_; Result result; result.reset(); // Search. bool done = false; bool partition_exist = false; for (int64_t entry_idx = 0, entry_cnt = entries_.size(); _SUCC_(result.err_) && !done && entry_idx < entry_cnt; ++entry_idx) { const Entry &entry = entries_.at(entry_idx); if (entry.pkey_ == pkey) { partition_exist = true; // Got it. if (start_log_id == entry.log_id_) { result.err_ = OB_SUCCESS; result.file_id_ = 1; result.offset_ = entry.offset_; done = true; } // Too small log id. else if (start_log_id < entry.log_id_) { result.err_ = OB_ERR_OUT_OF_LOWER_BOUND; } } } if (!done && _SUCC_(result.err_)) { // No log entry. if (!partition_exist) { result.err_ = OB_ENTRY_NOT_EXIST; } // Too large log id. else { result.err_ = OB_ERR_OUT_OF_UPPER_BOUND; } } res.append_result(result); } _D_("mock rpc 1 req pos", K(req), K(res)); return OB_SUCCESS; } virtual int fetch_log( const obrpc::ObLogExternalFetchLogRequest& req, obrpc::ObLogExternalFetchLogResponse& res) { typedef obrpc::ObLogExternalFetchLogRequest::Param Param; typedef obrpc::ObLogExternalFetchLogResponse::OfflinePartition OP; // Fetch log. const offset_t offset = req.get_offset(); if (offset < 0) { return OB_INVALID_ARGUMENT; } offset_t ret_offset = offset; // Scan. for (int64_t idx = static_cast(offset), cnt = entries_.size(); idx < cnt && res.get_log_num() < log_entry_per_call_; ++idx) { const Entry &entry = entries_.at(idx); bool fetch = false; for (int64_t pidx = 0, pcnt = req.get_params().count(); !fetch && pidx < pcnt; ++pidx) { const Param ¶m = req.get_params().at(pidx); if (entry.pkey_ == param.pkey_ && param.start_log_id_ <= entry.log_id_ && entry.log_id_ <= param.last_log_id_) { fetch = true; } } if (fetch) { ret_offset = static_cast(entry.offset_); // Gen header. int64_t ts = get_timestamp(); ObProposalID proposal_id; proposal_id.addr_ = addr_; proposal_id.ts_ = ts; ObLogEntryHeader header; header.generate_header(OB_LOG_SUBMIT, entry.pkey_, entry.log_id_, mock_load_, mock_load_len_, ts, ts, proposal_id, ts, ObVersion(1)); ObLogEntry log_entry; log_entry.generate_entry(header, mock_load_); res.append_log(log_entry); } } res.set_file_id_offset(1, ret_offset + 1); // Handle offline partition. // Here, if a partition reaches its last log, it is offline. for (int64_t idx = 0, cnt = req.get_params().count(); idx < cnt; ++idx) { const Param ¶m = req.get_params().at(idx); const uint64_t last_log_id = log_entry_per_p_; if (last_log_id < param.start_log_id_) { OP op; op.pkey_ = param.pkey_; // op.last_log_id_ = last_log_id; op.sync_ts_ = last_log_id; res.append_offline_partition(op); } } _D_("mock rpc 1 fetch log", K(req), K(res)); return OB_SUCCESS; } virtual int req_heartbeat_info( const obrpc::ObLogReqHeartbeatInfoRequest& req, obrpc::ObLogReqHeartbeatInfoResponse& res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int req_leader_heartbeat( const obrpc::ObLogLeaderHeartbeatReq &req, obrpc::ObLogLeaderHeartbeatResp &res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int open_stream(const obrpc::ObLogOpenStreamReq &req, obrpc::ObLogOpenStreamResp &res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int fetch_stream_log(const obrpc::ObLogStreamFetchLogReq &req, obrpc::ObLogStreamFetchLogResp &res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int req_svr_feedback(const ReqLogSvrFeedback &feedback) { UNUSED(feedback); return OB_SUCCESS; } private: ObAddr addr_; int64_t log_entry_per_call_; int64_t log_entry_per_p_; EntryVec entries_; static const int64_t mock_load_len_ = 8; char mock_load_[mock_load_len_]; }; /* * Factory. */ class MockRpcInterface1Factory : public IFetcherRpcInterfaceFactory { public: MockRpcInterface1Factory(const PKeys &pkeys, const int64_t log_entry_per_p, const int64_t log_entry_per_call) : pkeys_(pkeys), log_entry_per_p_(log_entry_per_p), log_entry_per_call_(log_entry_per_call) { } virtual int new_fetcher_rpc_interface(IFetcherRpcInterface*& rpc) { rpc = new MockRpcInterface1(pkeys_, log_entry_per_p_, log_entry_per_call_); return OB_SUCCESS; } virtual int delete_fetcher_rpc_interface(IFetcherRpcInterface* rpc) { delete rpc; return OB_SUCCESS; } private: PKeys pkeys_; int64_t log_entry_per_p_; int64_t log_entry_per_call_; }; /* * Mock Rpc Interface 2. * It owns N servers, each hold some partitions, one of them is * the leader. When request start log id, a preseted value is returned. * Used to test: * - fetch partition * - locate start log id * - activate partition stream * - discard partition stream */ class MockRpcInterface2 : public IFetcherRpcInterface { struct Partition { ObPartitionKey pkey_; uint64_t start_log_id_; bool is_leader_; }; struct Svr { ObAddr svr_; std::vector partitions_; bool operator==(const Svr &other) const { return svr_ == other.svr_; } bool operator<(const Svr &other) const { return svr_ < other.svr_; } }; public: /* * Set static result set. The first partition in svrs is the leader. */ static void add_partition(const ObPartitionKey &pkey, uint64_t start_log_id, std::vector svrs) { EXPECT_NE(0, svrs.size()); Partition pt = { pkey, start_log_id, false }; for (int64_t idx = 0, cnt = svrs.size(); idx < cnt; ++idx) { pt.is_leader_ = (0 == idx); Svr target; target.svr_ = svrs.at(idx); std::vector::iterator itor = std::find(svrs_.begin(), svrs_.end(), target); if (svrs_.end() == itor) { target.partitions_.push_back(pt); svrs_.push_back(target); std::sort(svrs_.begin(), svrs_.end()); } else { (*itor).partitions_.push_back(pt); } } } /* * Clear static result set. */ static void clear_result_set() { svrs_.clear(); } public: virtual void set_svr(const common::ObAddr& svr) { svr_ = svr; } virtual const ObAddr& get_svr() const { return svr_; } virtual void set_timeout(const int64_t timeout) { UNUSED(timeout); } virtual int req_start_log_id_by_ts( const obrpc::ObLogReqStartLogIdByTsRequest& req, obrpc::ObLogReqStartLogIdByTsResponse& res) { bool svr_exist = false; for (int64_t idx = 0, cnt = req.get_params().count(); idx < cnt; ++idx) { const ObPartitionKey &pkey = req.get_params().at(idx).pkey_; bool done = false; for (int64_t svr_idx = 0, svr_cnt = svrs_.size(); svr_idx < svr_cnt; ++svr_idx) { // Simulating sending rpc to svr. if (svr_ == svrs_.at(svr_idx).svr_) { svr_exist = true; const Svr &svr = svrs_.at(svr_idx); for (int64_t pidx = 0, pcnt = svr.partitions_.size(); pidx < pcnt; ++pidx) { const Partition &p = svr.partitions_.at(pidx); if (pkey == p.pkey_) { done = true; typedef obrpc::ObLogReqStartLogIdByTsResponse::Result Result; Result result = { OB_SUCCESS, p.start_log_id_, false}; res.append_result(result); } } if (!done) { res.set_err(OB_PARTITION_NOT_EXIST); } } } // End for. } _D_("mock rpc req start log id", K(req), K(res)); return (svr_exist) ? OB_SUCCESS : OB_TIMEOUT; } virtual int req_start_log_id_by_ts_2(const obrpc::ObLogReqStartLogIdByTsRequestWithBreakpoint &req, obrpc::ObLogReqStartLogIdByTsResponseWithBreakpoint &res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int req_start_pos_by_log_id_2(const obrpc::ObLogReqStartPosByLogIdRequestWithBreakpoint &req, obrpc::ObLogReqStartPosByLogIdResponseWithBreakpoint &res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int req_start_pos_by_log_id( const obrpc::ObLogReqStartPosByLogIdRequest& req, obrpc::ObLogReqStartPosByLogIdResponse& res) { UNUSED(req); UNUSED(res); // Timeout. _D_("mock rpc req pos by log id", K(req), K(res)); return OB_TIMEOUT; } virtual int fetch_log( const obrpc::ObLogExternalFetchLogRequest& req, obrpc::ObLogExternalFetchLogResponse& res) { UNUSED(req); UNUSED(res); // Timeout. _D_("mock rpc req pos by log id", K(req), K(res)); return OB_TIMEOUT; } virtual int req_heartbeat_info( const obrpc::ObLogReqHeartbeatInfoRequest& req, obrpc::ObLogReqHeartbeatInfoResponse& res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int req_leader_heartbeat( const obrpc::ObLogLeaderHeartbeatReq &req, obrpc::ObLogLeaderHeartbeatResp &res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int open_stream(const obrpc::ObLogOpenStreamReq &req, obrpc::ObLogOpenStreamResp &res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int fetch_stream_log(const obrpc::ObLogStreamFetchLogReq &req, obrpc::ObLogStreamFetchLogResp &res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int get_log_svr(const ObPartitionKey& pkey, const uint64_t log_id, ObSvrs& svrs, int& leader_cnt) { UNUSED(log_id); leader_cnt = 0; for (int64_t svr_idx = 0, svr_cnt = svrs_.size(); svr_idx < svr_cnt; ++svr_idx) { Svr &svr = svrs_.at(svr_idx); for (int64_t pidx = 0, pcnt = svr.partitions_.size(); pidx < pcnt; ++pidx) { const Partition &p = svr.partitions_.at(pidx); if (pkey == p.pkey_) { svrs.push_back(svr.svr_); if (p.is_leader_) { std::swap(svrs.at(leader_cnt), svrs.at(svrs.count() - 1)); leader_cnt += 1; } } } } _D_("mock rpc req log servers", K(pkey), K(svrs), K(leader_cnt)); return OB_SUCCESS; } virtual int req_svr_feedback(const ReqLogSvrFeedback &feedback) { UNUSED(feedback); return OB_SUCCESS; } private: // Target svr. ObAddr svr_; // Data set. static std::vector svrs_; }; // Static data set. So all instances could access it. std::vector MockRpcInterface2::svrs_; /* * Factory. */ class MockRpcInterface2Factory : public IFetcherRpcInterfaceFactory { public: virtual int new_fetcher_rpc_interface(IFetcherRpcInterface*& rpc) { rpc = new MockRpcInterface2(); return OB_SUCCESS; } virtual int delete_fetcher_rpc_interface(IFetcherRpcInterface* rpc) { delete rpc; return OB_SUCCESS; } }; /* * Mock Rpc Interface 3. * It owns some servers and partitions, can return * svr addr and heartbeat timestamps. * Notice: user set tuples * as results. * Used to test: * - Heartbeat facilities. */ class MockRpcInterface3 : public IFetcherRpcInterface { struct Entry { ObAddr svr_; ObPartitionKey pkey_; uint64_t log_id_; int64_t tstamp_; bool operator<(const Entry &other) { return log_id_ < other.log_id_; } }; typedef std::vector EntryVec; public: static void clear_result() { entry_vec_.clear(); } static void add_result(const ObAddr &svr, const ObPartitionKey &pkey, const uint64_t log_id, const int64_t tstamp) { Entry entry = { svr, pkey, log_id, tstamp }; entry_vec_.push_back(entry); } public: virtual void set_svr(const common::ObAddr& svr) { UNUSED(svr); } virtual const ObAddr& get_svr() const { static ObAddr svr; return svr; } virtual void set_timeout(const int64_t timeout) { UNUSED(timeout); } virtual int req_start_log_id_by_ts( const obrpc::ObLogReqStartLogIdByTsRequest& req, obrpc::ObLogReqStartLogIdByTsResponse& res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int req_start_log_id_by_ts_2(const obrpc::ObLogReqStartLogIdByTsRequestWithBreakpoint &req, obrpc::ObLogReqStartLogIdByTsResponseWithBreakpoint &res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int req_start_pos_by_log_id_2(const obrpc::ObLogReqStartPosByLogIdRequestWithBreakpoint &req, obrpc::ObLogReqStartPosByLogIdResponseWithBreakpoint &res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int req_start_pos_by_log_id( const obrpc::ObLogReqStartPosByLogIdRequest& req, obrpc::ObLogReqStartPosByLogIdResponse& res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int fetch_log( const obrpc::ObLogExternalFetchLogRequest& req, obrpc::ObLogExternalFetchLogResponse& res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int req_heartbeat_info( const obrpc::ObLogReqHeartbeatInfoRequest& req, obrpc::ObLogReqHeartbeatInfoResponse& res) { typedef obrpc::ObLogReqHeartbeatInfoRequest::Param Param; typedef obrpc::ObLogReqHeartbeatInfoResponse::Result Result; // Itor params. for (int64_t idx = 0, cnt = req.get_params().count(); idx < cnt; ++idx) { const Param ¶m = req.get_params().at(idx); // Find result. bool done = false; for (int64_t idx2 = 0, cnt2 = entry_vec_.size(); !done && idx2 < cnt2; ++idx2) { const Entry &entry = entry_vec_[idx2]; if (param.pkey_ == entry.pkey_ && param.log_id_ == entry.log_id_) { done = true; Result result; result.err_ = OB_SUCCESS; result.tstamp_ = entry.tstamp_; res.append_result(result); } } if (!done) { Result result; result.err_ = OB_NEED_RETRY; res.append_result(result); } } _D_("mock rpc: req heartbeat", K(req), K(res)); return OB_SUCCESS; } virtual int req_leader_heartbeat( const obrpc::ObLogLeaderHeartbeatReq &req, obrpc::ObLogLeaderHeartbeatResp &res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int open_stream(const obrpc::ObLogOpenStreamReq &req, obrpc::ObLogOpenStreamResp &res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int fetch_stream_log(const obrpc::ObLogStreamFetchLogReq &req, obrpc::ObLogStreamFetchLogResp &res) { UNUSED(req); UNUSED(res); return OB_NOT_IMPLEMENT; } virtual int get_log_svr(const ObPartitionKey& pkey, const uint64_t log_id, ObSvrs& svrs, int& leader_cnt) { // Todo. In this version, only one result is enough, log_id is not used. UNUSED(log_id); UNUSED(leader_cnt); for (int64_t idx = 0, cnt = entry_vec_.size(); idx < cnt; ++idx) { const Entry &entry = entry_vec_[idx]; if (pkey == entry.pkey_) { svrs.push_back(entry.svr_); break; } } return OB_SUCCESS; } virtual int req_svr_feedback(const ReqLogSvrFeedback &feedback) { UNUSED(feedback); return OB_SUCCESS; } private: static EntryVec entry_vec_; }; MockRpcInterface3::EntryVec MockRpcInterface3::entry_vec_; /* * Factory. */ class MockRpcInterface3Factory : public IFetcherRpcInterfaceFactory { public: virtual int new_fetcher_rpc_interface(IFetcherRpcInterface*& rpc) { rpc = new MockRpcInterface3(); return OB_SUCCESS; } virtual int delete_fetcher_rpc_interface(IFetcherRpcInterface* rpc) { delete rpc; return OB_SUCCESS; } }; /* * TransLog Generator 1. * Generate single partition transaction logs. * Support get trans logs in CORRECT order. * Use: * - Call next_trans(), specify trans params. * - Get logs in correct order: redo, redo, ..., prepare, commit/abort. */ struct TransParam1 { // Params used in trans log. ObPartitionKey pkey_; ObTransID trans_id_; ObAddr scheduler_; ObPartitionKey coordinator_; ObPartitionArray participants_; ObStartTransParam trans_param_; }; class TransLogGenerator1 { public: TransLogGenerator1() : param_(), redo_(), prepare_(), commit_(), abort_() { } virtual ~TransLogGenerator1() { } public: void next_trans(const TransParam1 ¶m) { param_ = param; } const ObTransRedoLog& next_redo(const uint64_t log_id) { int err = OB_SUCCESS; uint64_t tenant_id = 100; const uint64_t cluster_id = 1000; redo_.reset(); ObVersion active_memstore_version(1); err = redo_.init(OB_LOG_TRANS_REDO, param_.pkey_, param_.trans_id_, tenant_id, log_id, param_.scheduler_, param_.coordinator_, param_.participants_, param_.trans_param_, cluster_id, active_memstore_version); EXPECT_EQ(OB_SUCCESS, err); ObTransMutator &mutator = redo_.get_mutator(); if (NULL == mutator.get_mutator_buf()) { mutator.init(true); } const char *data = "fly"; char *buf = static_cast(mutator.alloc(strlen(data))); strcpy(buf, data); return redo_; } const ObTransPrepareLog& next_prepare(const ObRedoLogIdArray &all_redos) { int err = OB_SUCCESS; uint64_t tenant_id = 100; const uint64_t cluster_id = 1000; prepare_.reset(); ObVersion active_memstore_version(1); err = prepare_.init(OB_LOG_TRANS_PREPARE, param_.pkey_, param_.trans_id_, tenant_id, param_.scheduler_, param_.coordinator_, param_.participants_, param_.trans_param_, OB_SUCCESS, all_redos, 0, cluster_id, active_memstore_version); EXPECT_EQ(OB_SUCCESS, err); return prepare_; } const ObTransCommitLog& next_commit(const uint64_t prepare_log_id) { int err = OB_SUCCESS; const uint64_t cluster_id = 1000; ObPartitionLogInfo ptl_id(param_.pkey_, prepare_log_id, get_timestamp()); PartitionLogInfoArray ptl_ids; if (OB_INVALID_ID == prepare_log_id) { // Pass. For prepare-commit trans log. } else { err = ptl_ids.push_back(ptl_id); EXPECT_EQ(OB_SUCCESS, err); } commit_.reset(); err = commit_.init(OB_LOG_TRANS_COMMIT, param_.pkey_, param_.trans_id_, ptl_ids, 1, 0, cluster_id); EXPECT_EQ(OB_SUCCESS, err); return commit_; } const ObTransAbortLog& next_abort() { int err = OB_SUCCESS; const uint64_t cluster_id = 1000; PartitionLogInfoArray array; abort_.reset(); err = abort_.init(OB_LOG_TRANS_ABORT, param_.pkey_, param_.trans_id_, array, cluster_id); EXPECT_EQ(OB_SUCCESS, err); return abort_; } private: TransParam1 param_; ObTransRedoLog redo_; ObTransPrepareLog prepare_; ObTransCommitLog commit_; ObTransAbortLog abort_; }; /* * Transaction Log Entry Generator 1. * Generate log entries of transactions. */ class TransLogEntryGenerator1 { public: TransLogEntryGenerator1(const ObPartitionKey &pkey) : pkey_(pkey), log_id_(0), remain_log_cnt_(0), commit_(false), param_(), trans_log_gen_(), prepare_id_(0), redos_(), data_len_(0) { ObAddr addr(ObAddr::IPV4, "127.0.0.1", 5566); param_.pkey_ = pkey_; param_.trans_id_ = ObTransID(addr); param_.scheduler_ = addr; param_.coordinator_ = pkey_; int err = param_.participants_.push_back(pkey_); EXPECT_EQ(OB_SUCCESS, err); param_.trans_param_.set_access_mode(ObTransAccessMode::READ_WRITE); param_.trans_param_.set_isolation(ObTransIsolation::READ_COMMITED); param_.trans_param_.set_type(ObTransType::TRANS_NORMAL); buf_ = new char[buf_len_]; EXPECT_TRUE(NULL != buf_); } virtual ~TransLogEntryGenerator1() { delete[] buf_; } // Generate normal trans. // Start a new trans. void next_trans(const int64_t redo_cnt, bool commit) { remain_log_cnt_ = 2 + redo_cnt; commit_ = commit; redos_.reset(); trans_log_gen_.next_trans(param_); } // Get next log entry. int next_log_entry(ObLogEntry &log_entry) { int ret = OB_SUCCESS; if (2 < remain_log_cnt_) { next_redo_(log_entry); // Store redo id. int err = redos_.push_back(log_id_); EXPECT_EQ(OB_SUCCESS, err); log_id_ += 1; remain_log_cnt_ -= 1; } else if (2 == remain_log_cnt_) { next_prepare_(log_entry); prepare_id_ = log_id_; log_id_ += 1; remain_log_cnt_ -= 1; } else if (1 == remain_log_cnt_ && commit_) { next_commit_(log_entry); log_id_ += 1; remain_log_cnt_ -= 1; } else if (1 == remain_log_cnt_ && !commit_) { next_abort_(log_entry); log_id_ += 1; remain_log_cnt_ -= 1; } else { ret = OB_ITER_END; } return ret; } // Generate: redo, redo, redo, prepare-commit. int next_log_entry_2(ObLogEntry &log_entry) { int ret = OB_SUCCESS; if (2 < remain_log_cnt_) { next_redo_(log_entry); // Store redo id. int err = redos_.push_back(log_id_); EXPECT_EQ(OB_SUCCESS, err); log_id_ += 1; remain_log_cnt_ -= 1; } else if (2 == remain_log_cnt_ && commit_) { next_prepare_with_commit(log_entry); log_id_ += 1; remain_log_cnt_ -= 2; } else if (2 == remain_log_cnt_ && !commit_) { next_prepare_(log_entry); log_id_ += 1; remain_log_cnt_ -= 1; } else if (1 == remain_log_cnt_ && !commit_) { next_abort_(log_entry); log_id_ += 1; remain_log_cnt_ -= 1; } else { ret = OB_ITER_END; } return ret; } private: void next_redo_(ObLogEntry &log_entry) { int err = OB_SUCCESS; // Gen trans log. const ObTransRedoLog &redo = trans_log_gen_.next_redo(log_id_); int64_t pos = 0; err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_REDO); EXPECT_EQ(OB_SUCCESS, err); err = serialization::encode_i64(buf_, buf_len_, pos, 0); EXPECT_EQ(OB_SUCCESS, err); err = redo.serialize(buf_, buf_len_, pos); EXPECT_EQ(OB_SUCCESS, err); data_len_ = pos; // Gen entry header. ObLogEntryHeader header; header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_, data_len_, get_timestamp(), get_timestamp(), ObProposalID(), get_timestamp(), ObVersion(0)); // Gen log entry. log_entry.generate_entry(header, buf_); } void next_prepare_(ObLogEntry &log_entry) { int err = OB_SUCCESS; // Gen trans log. const ObTransPrepareLog &prepare= trans_log_gen_.next_prepare(redos_); int64_t pos = 0; err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_PREPARE); EXPECT_EQ(OB_SUCCESS, err); err = serialization::encode_i64(buf_, buf_len_, pos, 0); EXPECT_EQ(OB_SUCCESS, err); err = prepare.serialize(buf_, buf_len_, pos); EXPECT_EQ(OB_SUCCESS, err); data_len_ = pos; // Gen entry header. ObLogEntryHeader header; header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_, data_len_, get_timestamp(), get_timestamp(), ObProposalID(), get_timestamp(), ObVersion(0)); // Gen log entry. log_entry.generate_entry(header, buf_); } void next_commit_(ObLogEntry &log_entry) { int err = OB_SUCCESS; // Gen trans log. const ObTransCommitLog &commit = trans_log_gen_.next_commit(prepare_id_); int64_t pos = 0; err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_COMMIT); EXPECT_EQ(OB_SUCCESS, err); err = serialization::encode_i64(buf_, buf_len_, pos, 0); EXPECT_EQ(OB_SUCCESS, err); err = commit.serialize(buf_, buf_len_, pos); EXPECT_EQ(OB_SUCCESS, err); data_len_ = pos; // Gen entry header. ObLogEntryHeader header; header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_, data_len_, get_timestamp(), get_timestamp(), ObProposalID(), get_timestamp(), ObVersion(0)); // Gen log entry. log_entry.generate_entry(header, buf_); } void next_abort_(ObLogEntry &log_entry) { int err = OB_SUCCESS; // Gen trans log. const ObTransAbortLog &abort = trans_log_gen_.next_abort(); int64_t pos = 0; err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_ABORT); EXPECT_EQ(OB_SUCCESS, err); err = serialization::encode_i64(buf_, buf_len_, pos, 0); EXPECT_EQ(OB_SUCCESS, err); err = abort.serialize(buf_, buf_len_, pos); EXPECT_EQ(OB_SUCCESS, err); data_len_ = pos; // Gen entry header. ObLogEntryHeader header; header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_, data_len_, get_timestamp(), get_timestamp(), ObProposalID(), get_timestamp(), ObVersion(0)); // Gen log entry. log_entry.generate_entry(header, buf_); } void next_prepare_with_commit(ObLogEntry &log_entry) { int err = OB_SUCCESS; // Gen trans log. const ObTransPrepareLog &prepare= trans_log_gen_.next_prepare(redos_); const ObTransCommitLog &commit = trans_log_gen_.next_commit(OB_INVALID_ID); int64_t pos = 0; err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_PREPARE_WITH_COMMIT); EXPECT_EQ(OB_SUCCESS, err); err = serialization::encode_i64(buf_, buf_len_, pos, 0); EXPECT_EQ(OB_SUCCESS, err); err = prepare.serialize(buf_, buf_len_, pos); EXPECT_EQ(OB_SUCCESS, err); err = commit.serialize(buf_, buf_len_, pos); EXPECT_EQ(OB_SUCCESS, err); data_len_ = pos; // Gen entry header. ObLogEntryHeader header; header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_, data_len_, get_timestamp(), get_timestamp(), ObProposalID(), get_timestamp(), ObVersion(0)); // Gen log entry. log_entry.generate_entry(header, buf_); } private: // Params. ObPartitionKey pkey_; uint64_t log_id_; int64_t remain_log_cnt_; bool commit_; // Gen. TransParam1 param_; TransLogGenerator1 trans_log_gen_; uint64_t prepare_id_; ObRedoLogIdArray redos_; // Buf. int64_t data_len_; static const int64_t buf_len_ = 2 * _M_; char *buf_; }; /* * Mock Parser 1. * Read Task, revert it immediately, and count Task number. */ class MockParser1 : public IObLogParser { public: MockParser1() : trans_cnt_(0) { } virtual ~MockParser1() { } virtual int start() { return OB_SUCCESS; } virtual void stop() { } virtual void mark_stop_flag() { } virtual int push(PartTransTask* task, const int64_t timeout) { UNUSED(timeout); if (NULL != task && task->is_normal_trans()) { task->revert(); trans_cnt_ += 1; // Debug. // _I_(">>> push parser", "req", task->get_seq()); } return OB_SUCCESS; } int64_t get_trans_cnt() const { return trans_cnt_; } private: int64_t trans_cnt_; }; /* * Mock Fetcher Error Handler. */ class MockFetcherErrHandler1 : public IErrHandler { public: virtual ~MockFetcherErrHandler1() { } public: virtual void handle_err(int err_no, const char* fmt, ...) { UNUSED(err_no); va_list ap; va_start(ap, fmt); __E__(fmt, ap); va_end(ap); abort(); } }; /* * Mock Liboblog Error Handler. */ class MockLiboblogErrHandler1 : public IObLogErrHandler { public: virtual void handle_error(int err_no, const char* fmt, ...) { UNUSED(err_no); va_list ap; va_start(ap, fmt); __E__(fmt, ap); va_end(ap); } }; /* * Mock SvrProvider. * User set svrs into it. */ class MockSvrProvider1 : public sqlclient::ObMySQLServerProvider { public: virtual ~MockSvrProvider1() { } void add_svr(const ObAddr &svr) { svrs_.push_back(svr); } public: virtual int get_cluster_list(common::ObIArray &cluster_list) { int ret = OB_SUCCESS; if (svrs_.size() > 0) { if (OB_FAIL(cluster_list.push_back(common::OB_INVALID_ID))) { LOG_WARN("fail to push back cluster_id", K(ret)); } } return ret; } virtual int get_server(const int64_t cluster_id, const int64_t svr_idx, ObAddr& server) { UNUSED(cluster_id); int ret = OB_SUCCESS; if (0 <= svr_idx && svr_idx < static_cast(svrs_.size())) { server = svrs_[svr_idx]; } else { ret = OB_ERR_UNEXPECTED; } return ret; } virtual int64_t get_cluster_count() const { return svrs_.size() > 0 ? 1 : 0; } virtual int64_t get_server_count(const int64_t cluster_id) const { UNUSED(cluster_id) return static_cast(svrs_.size()); } virtual int refresh_server_list() { return OB_SUCCESS; } private: std::vector svrs_; }; /* * Test Dataset Generator. */ /* * Svr Config. * Set svr address and mysql port. */ struct SvrCfg { // Svr. const char *svr_addr_; int internal_port_; // Mysql. int mysql_port_; const char *mysql_user_; const char *mysql_password_; const char *mysql_db_; int64_t mysql_timeout_; }; /* * Configuration for mysql connector. */ inline ConnectorConfig prepare_cfg_1(const SvrCfg &svr_cfg) { ConnectorConfig cfg; cfg.mysql_addr_ = svr_cfg.svr_addr_; cfg.mysql_port_ = svr_cfg.mysql_port_; cfg.mysql_user_ = svr_cfg.mysql_user_; cfg.mysql_password_ = svr_cfg.mysql_password_; cfg.mysql_db_ = svr_cfg.mysql_db_; cfg.mysql_timeout_ = svr_cfg.mysql_timeout_; return cfg; } /* * Build table names. */ inline const char** prepare_table_name_1() { static const char* tnames[] = { "table1", "table2", "table3", "table4", "table5", "table6", "table7", "table8", "table9", "table10", "table11", "table12", "table13", "table14", "table15", "table16" }; return tnames; } /* * Build table schema. */ inline const char* prepare_table_schema_1() { return "c1 int primary key"; } /* * Create table. */ class CreateTable : public MySQLQueryBase { public: CreateTable(const char *tname, const char *schema) { snprintf(buf_, 512, "create table %s(%s)", tname, schema); sql_ = buf_; sql_len_ = strlen(sql_); } private: char buf_[512]; }; /* * Drop table. */ class DropTable : public MySQLQueryBase { public: DropTable(const char *tname) { snprintf(buf_, 512, "drop table if exists %s", tname); sql_ = buf_; sql_len_ = strlen(sql_); } private: char buf_[512]; }; /* * Get table id. */ class GetTableId : public MySQLQueryBase { public: GetTableId(const char *tname) { snprintf(buf_, 512, "select table_id " "from __all_table where table_name='%s'", tname); sql_ = buf_; sql_len_ = strlen(sql_); } int get_tid(uint64_t &tid) { int ret = common::OB_SUCCESS; while (common::OB_SUCCESS == (ret = next_row())) { uint64_t table_id = 0; if (OB_SUCC(ret)) { if (common::OB_SUCCESS != (ret = get_uint(0, table_id))) { OBLOG_LOG(WARN, "err get uint", K(ret)); } } tid = table_id; } ret = (common::OB_ITER_END == ret) ? common::OB_SUCCESS : ret; return ret; } private: char buf_[512]; }; /* * Get partition key by table id from system table. */ class GetPartitionKey : public MySQLQueryBase { public: GetPartitionKey(const uint64_t tid) { snprintf(buf_, 512, "select table_id, partition_id, partition_cnt " "from __all_meta_table where table_id=%lu", tid); sql_ = buf_; sql_len_ = strlen(sql_); } int get_pkeys(ObArray &pkeys) { int ret = common::OB_SUCCESS; while (common::OB_SUCCESS == (ret = next_row())) { uint64_t table_id = 0; int32_t partition_id = 0; int32_t partition_cnt = 0; if (OB_SUCC(ret)) { if (common::OB_SUCCESS != (ret = get_uint(0, table_id))) { OBLOG_LOG(WARN, "err get uint", K(ret)); } } if (OB_SUCC(ret)) { int64_t val = 0; if (common::OB_SUCCESS != (ret = get_int(1, val))) { OBLOG_LOG(WARN, "err get int", K(ret)); } else { partition_id = static_cast(val); } } if (OB_SUCC(ret)) { int64_t val = 0; if (common::OB_SUCCESS != (ret = get_int(2, val))) { OBLOG_LOG(WARN, "err get int", K(ret)); } else { partition_cnt = static_cast(val); } } ObPartitionKey pkey; pkey.init(table_id, partition_id, partition_cnt); pkeys.push_back(pkey); } ret = (common::OB_ITER_END == ret) ? common::OB_SUCCESS : ret; return ret; } private: char buf_[512]; }; /* * Create table and return their partition keys. */ inline void prepare_table_1(const SvrCfg& svr_cfg, const char** tnames, const int64_t tcnt, const char* schema, ObArray& pkeys) { ObLogMySQLConnector conn; ConnectorConfig cfg = prepare_cfg_1(svr_cfg); int ret = conn.init(cfg); EXPECT_EQ(OB_SUCCESS, ret); // Prepare tables. for (int64_t idx = 0; idx < tcnt; ++idx) { // Drop. DropTable drop_table(tnames[idx]); ret = conn.exec(drop_table); EXPECT_EQ(OB_SUCCESS, ret); // Create. CreateTable create_table(tnames[idx], schema); ret = conn.exec(create_table); EXPECT_EQ(OB_SUCCESS, ret); // Get tid. GetTableId get_tid(tnames[idx]); ret = conn.query(get_tid); EXPECT_EQ(OB_SUCCESS, ret); uint64_t tid = OB_INVALID_ID; ret = get_tid.get_tid(tid); EXPECT_EQ(OB_SUCCESS, ret); // Get pkeys. GetPartitionKey get_pkey(tid); ret = conn.query(get_pkey); EXPECT_EQ(OB_SUCCESS, ret); ret = get_pkey.get_pkeys(pkeys); EXPECT_EQ(OB_SUCCESS, ret); } ret = conn.destroy(); EXPECT_EQ(OB_SUCCESS, ret); } /* * Data generator. * Insert a bunch of data in server. * For schema 1. */ class DataGenerator1 : public Runnable { class Inserter : public MySQLQueryBase { public: void set_data(const char *tname, const int64_t data) { reuse(); snprintf(buf_, 512, "insert into %s (c1) values (%ld)", tname, data); sql_ = buf_; sql_len_ = strlen(sql_); } private: char buf_[512]; }; public: DataGenerator1(const ConnectorConfig &cfg) : conn_(), cfg_(cfg), tname_(NULL), start_(0), end_(0) { int err = conn_.init(cfg_); EXPECT_EQ(OB_SUCCESS, err); } ~DataGenerator1() { conn_.destroy(); } void insert(const char *tname, const int64_t start, const int64_t end) { tname_ = tname; start_ = start; end_ = end; create(); } private: int routine() { Inserter inserter; int err = OB_SUCCESS; for (int64_t cur = start_; OB_SUCCESS == err && cur < end_; cur++) { inserter.set_data(tname_, cur); err = conn_.exec(inserter); EXPECT_EQ(OB_SUCCESS, err); } return OB_SUCCESS; } private: ObLogMySQLConnector conn_; ConnectorConfig cfg_; const char *tname_; int64_t start_; int64_t end_; }; /* * End of Test Dataset Generator. */ } }