1495 lines
		
	
	
		
			40 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1495 lines
		
	
	
		
			40 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/**
 | 
						|
 * Copyright (c) 2021 OceanBase
 | 
						|
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
						|
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
 * You may obtain a copy of Mulan PubL v2 at:
 | 
						|
 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
 * See the Mulan PubL v2 for more details.
 | 
						|
 */
 | 
						|
 | 
						|
#include <gtest/gtest.h>
 | 
						|
#include <vector>
 | 
						|
#include <algorithm>
 | 
						|
 | 
						|
#include "share/ob_define.h"
 | 
						|
#include "storage/ob_storage_log_type.h"
 | 
						|
#include "storage/transaction/ob_trans_log.h"
 | 
						|
 | 
						|
#include "liboblog/src/ob_log_instance.h"
 | 
						|
#include "liboblog/src/ob_log_fetcher_stream.h"
 | 
						|
#include "liboblog/src/ob_log_fetcher_part_stream.h"
 | 
						|
 | 
						|
#include "ob_log_utils.h"     // get_timestamp
 | 
						|
 | 
						|
using namespace oceanbase;
 | 
						|
using namespace common;
 | 
						|
using namespace liboblog;
 | 
						|
using namespace fetcher;
 | 
						|
using namespace transaction;
 | 
						|
using namespace storage;
 | 
						|
using namespace clog;
 | 
						|
 | 
						|
namespace oceanbase
 | 
						|
{
 | 
						|
namespace unittest
 | 
						|
{
 | 
						|
 | 
						|
/*
 | 
						|
 * Utils.
 | 
						|
 */
 | 
						|
typedef std::vector<ObAddr> Svrs;
 | 
						|
typedef std::vector<ObPartitionKey> PKeys;
 | 
						|
typedef std::vector<uint64_t> LogIds;
 | 
						|
typedef std::vector<int64_t> Tstamps;
 | 
						|
 | 
						|
/*
 | 
						|
 * Mock Rpc Interface 1.
 | 
						|
 * It owns N partitions, each has M log entries.
 | 
						|
 * It returns L log entries in each fetch_log() call.
 | 
						|
 * Log entry contains nothing.
 | 
						|
 * Used to test:
 | 
						|
 *  - add partition into Stream.
 | 
						|
 *  - update file id & offset.
 | 
						|
 *  - fetch log.
 | 
						|
 *  - kick out offline partitions.
 | 
						|
 *  - discard partitions.
 | 
						|
 */
 | 
						|
class MockRpcInterface1 : public IFetcherRpcInterface
 | 
						|
{
 | 
						|
  struct Entry
 | 
						|
  {
 | 
						|
    file_id_t file_id_;
 | 
						|
    offset_t offset_;
 | 
						|
    uint64_t log_id_;
 | 
						|
    ObPartitionKey pkey_;
 | 
						|
  };
 | 
						|
  // Use offset_ as index of Entry in EntryVec.
 | 
						|
  typedef std::vector<Entry> EntryVec;
 | 
						|
public:
 | 
						|
  MockRpcInterface1(const PKeys &pkeys,
 | 
						|
                    const int64_t log_entry_per_p,
 | 
						|
                    const int64_t log_entry_per_call)
 | 
						|
  {
 | 
						|
    log_entry_per_call_ = log_entry_per_call;
 | 
						|
    log_entry_per_p_ = log_entry_per_p;
 | 
						|
    addr_ = ObAddr(ObAddr::IPV4, "127.0.0.1", 5999);
 | 
						|
    // Gen entries.
 | 
						|
    int64_t log_entry_cnt = 0;
 | 
						|
    for (int64_t log_id = 1; log_id < log_entry_per_p + 1; ++log_id) {
 | 
						|
      for (int64_t pidx = 0, cnt = pkeys.size(); pidx < cnt; ++pidx) {
 | 
						|
        // Gen entry.
 | 
						|
        Entry entry;
 | 
						|
        entry.pkey_ = pkeys.at(pidx);
 | 
						|
        entry.file_id_ = 1;
 | 
						|
        entry.offset_ = static_cast<offset_t>(log_entry_cnt++);
 | 
						|
        entry.log_id_ = log_id;
 | 
						|
        // Save it.
 | 
						|
        entries_.push_back(entry);
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
 | 
						|
  virtual ~MockRpcInterface1() { }
 | 
						|
 | 
						|
  virtual void set_svr(const common::ObAddr& svr) { UNUSED(svr); }
 | 
						|
 | 
						|
  virtual const ObAddr& get_svr() const { static ObAddr svr; return svr; }
 | 
						|
 | 
						|
  virtual void set_timeout(const int64_t timeout) { UNUSED(timeout); }
 | 
						|
 | 
						|
  virtual int req_start_log_id_by_ts(
 | 
						|
    const obrpc::ObLogReqStartLogIdByTsRequest& req,
 | 
						|
    obrpc::ObLogReqStartLogIdByTsResponse& res)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_start_log_id_by_ts_2(const obrpc::ObLogReqStartLogIdByTsRequestWithBreakpoint &req,
 | 
						|
                                       obrpc::ObLogReqStartLogIdByTsResponseWithBreakpoint &res) {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_start_pos_by_log_id_2(const obrpc::ObLogReqStartPosByLogIdRequestWithBreakpoint &req,
 | 
						|
                                        obrpc::ObLogReqStartPosByLogIdResponseWithBreakpoint &res) {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_start_pos_by_log_id(
 | 
						|
    const obrpc::ObLogReqStartPosByLogIdRequest& req,
 | 
						|
    obrpc::ObLogReqStartPosByLogIdResponse& res)
 | 
						|
  {
 | 
						|
    /*
 | 
						|
     * Err Supportted:
 | 
						|
     *  - OB_SUCCESS
 | 
						|
     *  - OB_ENTRY_NOT_EXIST: partition exists without any log
 | 
						|
     *  - OB_ERR_OUT_OF_LOWER_BOUND: log id beyond lower bound
 | 
						|
     *  - OB_ERR_OUT_OF_UPPER_BOUND: log id beyond upper bound
 | 
						|
     */
 | 
						|
    typedef obrpc::ObLogReqStartPosByLogIdRequest::Param Param;
 | 
						|
    typedef obrpc::ObLogReqStartPosByLogIdResponse::Result Result;
 | 
						|
    for (int64_t idx = 0, cnt = req.get_params().count(); idx < cnt; ++idx) {
 | 
						|
      // Locate for a partition.
 | 
						|
      const Param ¶m = req.get_params().at(idx);
 | 
						|
      const ObPartitionKey &pkey = param.pkey_;
 | 
						|
      const uint64_t start_log_id = param.start_log_id_;
 | 
						|
      Result result;
 | 
						|
      result.reset();
 | 
						|
      // Search.
 | 
						|
      bool done = false;
 | 
						|
      bool partition_exist = false;
 | 
						|
      for (int64_t entry_idx = 0, entry_cnt = entries_.size();
 | 
						|
           _SUCC_(result.err_) && !done && entry_idx < entry_cnt;
 | 
						|
           ++entry_idx) {
 | 
						|
        const Entry &entry = entries_.at(entry_idx);
 | 
						|
        if (entry.pkey_ == pkey) {
 | 
						|
          partition_exist = true;
 | 
						|
          // Got it.
 | 
						|
          if (start_log_id == entry.log_id_) {
 | 
						|
            result.err_ = OB_SUCCESS;
 | 
						|
            result.file_id_ = 1;
 | 
						|
            result.offset_ = entry.offset_;
 | 
						|
            done = true;
 | 
						|
          }
 | 
						|
          // Too small log id.
 | 
						|
          else if (start_log_id < entry.log_id_) {
 | 
						|
            result.err_ = OB_ERR_OUT_OF_LOWER_BOUND;
 | 
						|
          }
 | 
						|
        }
 | 
						|
      }
 | 
						|
      if (!done && _SUCC_(result.err_)) {
 | 
						|
        // No log entry.
 | 
						|
        if (!partition_exist) {
 | 
						|
          result.err_ = OB_ENTRY_NOT_EXIST;
 | 
						|
        }
 | 
						|
        // Too large log id.
 | 
						|
        else {
 | 
						|
          result.err_ = OB_ERR_OUT_OF_UPPER_BOUND;
 | 
						|
        }
 | 
						|
      }
 | 
						|
      res.append_result(result);
 | 
						|
    }
 | 
						|
    _D_("mock rpc 1 req pos", K(req), K(res));
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int fetch_log(
 | 
						|
    const obrpc::ObLogExternalFetchLogRequest& req,
 | 
						|
    obrpc::ObLogExternalFetchLogResponse& res)
 | 
						|
  {
 | 
						|
    typedef obrpc::ObLogExternalFetchLogRequest::Param Param;
 | 
						|
    typedef obrpc::ObLogExternalFetchLogResponse::OfflinePartition OP;
 | 
						|
 | 
						|
    // Fetch log.
 | 
						|
    const offset_t offset = req.get_offset();
 | 
						|
    if (offset < 0) {
 | 
						|
      return OB_INVALID_ARGUMENT;
 | 
						|
    }
 | 
						|
    offset_t ret_offset = offset;
 | 
						|
    // Scan.
 | 
						|
    for (int64_t idx = static_cast<int64_t>(offset), cnt = entries_.size();
 | 
						|
         idx < cnt && res.get_log_num() < log_entry_per_call_;
 | 
						|
         ++idx) {
 | 
						|
      const Entry &entry = entries_.at(idx);
 | 
						|
      bool fetch = false;
 | 
						|
      for (int64_t pidx = 0, pcnt = req.get_params().count();
 | 
						|
           !fetch && pidx < pcnt;
 | 
						|
           ++pidx) {
 | 
						|
        const Param ¶m = req.get_params().at(pidx);
 | 
						|
        if (entry.pkey_ == param.pkey_
 | 
						|
            && param.start_log_id_ <= entry.log_id_
 | 
						|
            && entry.log_id_ <= param.last_log_id_) {
 | 
						|
          fetch = true;
 | 
						|
        }
 | 
						|
      }
 | 
						|
      if (fetch) {
 | 
						|
        ret_offset = static_cast<offset_t>(entry.offset_);
 | 
						|
        // Gen header.
 | 
						|
        int64_t ts = get_timestamp();
 | 
						|
        ObProposalID proposal_id;
 | 
						|
        proposal_id.addr_ = addr_;
 | 
						|
        proposal_id.ts_ = ts;
 | 
						|
        ObLogEntryHeader header;
 | 
						|
        header.generate_header(OB_LOG_SUBMIT, entry.pkey_,
 | 
						|
                               entry.log_id_, mock_load_, mock_load_len_,
 | 
						|
                               ts, ts, proposal_id, ts, ObVersion(1));
 | 
						|
        ObLogEntry log_entry;
 | 
						|
        log_entry.generate_entry(header, mock_load_);
 | 
						|
        res.append_log(log_entry);
 | 
						|
      }
 | 
						|
    }
 | 
						|
    res.set_file_id_offset(1, ret_offset + 1);
 | 
						|
 | 
						|
    // Handle offline partition.
 | 
						|
    // Here, if a partition reaches its last log, it is offline.
 | 
						|
    for (int64_t idx = 0, cnt = req.get_params().count(); idx < cnt; ++idx) {
 | 
						|
      const Param ¶m = req.get_params().at(idx);
 | 
						|
      const uint64_t last_log_id =  log_entry_per_p_;
 | 
						|
      if (last_log_id < param.start_log_id_) {
 | 
						|
        OP op;
 | 
						|
        op.pkey_ = param.pkey_;
 | 
						|
        // op.last_log_id_ = last_log_id;
 | 
						|
        op.sync_ts_ = last_log_id;
 | 
						|
        res.append_offline_partition(op);
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    _D_("mock rpc 1 fetch log", K(req), K(res));
 | 
						|
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_heartbeat_info(
 | 
						|
    const obrpc::ObLogReqHeartbeatInfoRequest& req,
 | 
						|
    obrpc::ObLogReqHeartbeatInfoResponse& res)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_leader_heartbeat(
 | 
						|
    const obrpc::ObLogLeaderHeartbeatReq &req,
 | 
						|
    obrpc::ObLogLeaderHeartbeatResp &res)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int open_stream(const obrpc::ObLogOpenStreamReq &req,
 | 
						|
                          obrpc::ObLogOpenStreamResp &res) {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int fetch_stream_log(const obrpc::ObLogStreamFetchLogReq &req,
 | 
						|
                               obrpc::ObLogStreamFetchLogResp &res) {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_svr_feedback(const ReqLogSvrFeedback &feedback)
 | 
						|
  {
 | 
						|
    UNUSED(feedback);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
private:
 | 
						|
  ObAddr addr_;
 | 
						|
  int64_t log_entry_per_call_;
 | 
						|
  int64_t log_entry_per_p_;
 | 
						|
  EntryVec entries_;
 | 
						|
  static const int64_t mock_load_len_ = 8;
 | 
						|
  char mock_load_[mock_load_len_];
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * Factory.
 | 
						|
 */
 | 
						|
class MockRpcInterface1Factory : public IFetcherRpcInterfaceFactory
 | 
						|
{
 | 
						|
public:
 | 
						|
  MockRpcInterface1Factory(const PKeys &pkeys,
 | 
						|
                           const int64_t log_entry_per_p,
 | 
						|
                           const int64_t log_entry_per_call)
 | 
						|
    : pkeys_(pkeys),
 | 
						|
    log_entry_per_p_(log_entry_per_p),
 | 
						|
    log_entry_per_call_(log_entry_per_call)
 | 
						|
  { }
 | 
						|
  virtual int new_fetcher_rpc_interface(IFetcherRpcInterface*& rpc)
 | 
						|
  {
 | 
						|
    rpc = new MockRpcInterface1(pkeys_, log_entry_per_p_, log_entry_per_call_);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  virtual int delete_fetcher_rpc_interface(IFetcherRpcInterface* rpc)
 | 
						|
  {
 | 
						|
    delete rpc;
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
private:
 | 
						|
  PKeys pkeys_;
 | 
						|
  int64_t log_entry_per_p_;
 | 
						|
  int64_t log_entry_per_call_;
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * Mock Rpc Interface 2.
 | 
						|
 * It owns N servers, each hold some partitions, one of them is
 | 
						|
 * the leader. When request start log id, a preseted value is returned.
 | 
						|
 * Used to test:
 | 
						|
 *  - fetch partition
 | 
						|
 *  - locate start log id
 | 
						|
 *  - activate partition stream
 | 
						|
 *  - discard partition stream
 | 
						|
 */
 | 
						|
class MockRpcInterface2 : public IFetcherRpcInterface
 | 
						|
{
 | 
						|
  struct Partition
 | 
						|
  {
 | 
						|
    ObPartitionKey pkey_;
 | 
						|
    uint64_t start_log_id_;
 | 
						|
    bool is_leader_;
 | 
						|
  };
 | 
						|
  struct Svr
 | 
						|
  {
 | 
						|
    ObAddr svr_;
 | 
						|
    std::vector<Partition> partitions_;
 | 
						|
    bool operator==(const Svr &other) const
 | 
						|
    {
 | 
						|
      return svr_ == other.svr_;
 | 
						|
    }
 | 
						|
    bool operator<(const Svr &other) const
 | 
						|
    {
 | 
						|
      return svr_ < other.svr_;
 | 
						|
    }
 | 
						|
  };
 | 
						|
public:
 | 
						|
  /*
 | 
						|
   * Set static result set. The first partition in svrs is the leader.
 | 
						|
   */
 | 
						|
  static void add_partition(const ObPartitionKey &pkey,
 | 
						|
                            uint64_t start_log_id,
 | 
						|
                            std::vector<ObAddr> svrs)
 | 
						|
  {
 | 
						|
    EXPECT_NE(0, svrs.size());
 | 
						|
    Partition pt = { pkey, start_log_id, false };
 | 
						|
    for (int64_t idx = 0, cnt = svrs.size();
 | 
						|
         idx < cnt;
 | 
						|
         ++idx) {
 | 
						|
      pt.is_leader_ = (0 == idx);
 | 
						|
      Svr target;
 | 
						|
      target.svr_ = svrs.at(idx);
 | 
						|
      std::vector<Svr>::iterator itor =
 | 
						|
        std::find(svrs_.begin(), svrs_.end(), target);
 | 
						|
      if (svrs_.end() == itor) {
 | 
						|
        target.partitions_.push_back(pt);
 | 
						|
        svrs_.push_back(target);
 | 
						|
        std::sort(svrs_.begin(), svrs_.end());
 | 
						|
      }
 | 
						|
      else {
 | 
						|
        (*itor).partitions_.push_back(pt);
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  /*
 | 
						|
   * Clear static result set.
 | 
						|
   */
 | 
						|
  static void clear_result_set()
 | 
						|
  {
 | 
						|
    svrs_.clear();
 | 
						|
  }
 | 
						|
public:
 | 
						|
  virtual void set_svr(const common::ObAddr& svr) { svr_ = svr; }
 | 
						|
 | 
						|
  virtual const ObAddr& get_svr() const { return svr_; }
 | 
						|
 | 
						|
  virtual void set_timeout(const int64_t timeout) { UNUSED(timeout); }
 | 
						|
 | 
						|
  virtual int req_start_log_id_by_ts(
 | 
						|
    const obrpc::ObLogReqStartLogIdByTsRequest& req,
 | 
						|
    obrpc::ObLogReqStartLogIdByTsResponse& res)
 | 
						|
  {
 | 
						|
    bool svr_exist = false;
 | 
						|
    for (int64_t idx = 0, cnt = req.get_params().count(); idx < cnt; ++idx) {
 | 
						|
      const ObPartitionKey &pkey = req.get_params().at(idx).pkey_;
 | 
						|
      bool done = false;
 | 
						|
      for (int64_t svr_idx = 0, svr_cnt = svrs_.size();
 | 
						|
           svr_idx < svr_cnt;
 | 
						|
           ++svr_idx) {
 | 
						|
        // Simulating sending rpc to svr.
 | 
						|
        if (svr_ == svrs_.at(svr_idx).svr_) {
 | 
						|
          svr_exist = true;
 | 
						|
          const Svr &svr = svrs_.at(svr_idx);
 | 
						|
          for (int64_t pidx = 0, pcnt = svr.partitions_.size();
 | 
						|
               pidx < pcnt;
 | 
						|
               ++pidx) {
 | 
						|
            const Partition &p = svr.partitions_.at(pidx);
 | 
						|
            if (pkey == p.pkey_) {
 | 
						|
              done = true;
 | 
						|
              typedef obrpc::ObLogReqStartLogIdByTsResponse::Result Result;
 | 
						|
              Result result = { OB_SUCCESS, p.start_log_id_, false};
 | 
						|
              res.append_result(result);
 | 
						|
            }
 | 
						|
          }
 | 
						|
          if (!done) {
 | 
						|
            res.set_err(OB_PARTITION_NOT_EXIST);
 | 
						|
          }
 | 
						|
        }
 | 
						|
      } // End for.
 | 
						|
    }
 | 
						|
 | 
						|
    _D_("mock rpc req start log id", K(req), K(res));
 | 
						|
 | 
						|
    return (svr_exist) ? OB_SUCCESS : OB_TIMEOUT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_start_log_id_by_ts_2(const obrpc::ObLogReqStartLogIdByTsRequestWithBreakpoint &req,
 | 
						|
                                       obrpc::ObLogReqStartLogIdByTsResponseWithBreakpoint &res) {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_start_pos_by_log_id_2(const obrpc::ObLogReqStartPosByLogIdRequestWithBreakpoint &req,
 | 
						|
                                        obrpc::ObLogReqStartPosByLogIdResponseWithBreakpoint &res) {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_start_pos_by_log_id(
 | 
						|
    const obrpc::ObLogReqStartPosByLogIdRequest& req,
 | 
						|
    obrpc::ObLogReqStartPosByLogIdResponse& res)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    // Timeout.
 | 
						|
    _D_("mock rpc req pos by log id", K(req), K(res));
 | 
						|
    return OB_TIMEOUT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int fetch_log(
 | 
						|
    const obrpc::ObLogExternalFetchLogRequest& req,
 | 
						|
    obrpc::ObLogExternalFetchLogResponse& res)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    // Timeout.
 | 
						|
    _D_("mock rpc req pos by log id", K(req), K(res));
 | 
						|
    return OB_TIMEOUT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_heartbeat_info(
 | 
						|
    const obrpc::ObLogReqHeartbeatInfoRequest& req,
 | 
						|
    obrpc::ObLogReqHeartbeatInfoResponse& res)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_leader_heartbeat(
 | 
						|
    const obrpc::ObLogLeaderHeartbeatReq &req,
 | 
						|
    obrpc::ObLogLeaderHeartbeatResp &res)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int open_stream(const obrpc::ObLogOpenStreamReq &req,
 | 
						|
                          obrpc::ObLogOpenStreamResp &res) {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int fetch_stream_log(const obrpc::ObLogStreamFetchLogReq &req,
 | 
						|
                               obrpc::ObLogStreamFetchLogResp &res) {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
  virtual int get_log_svr(const ObPartitionKey& pkey, const uint64_t log_id,
 | 
						|
                          ObSvrs& svrs, int& leader_cnt)
 | 
						|
  {
 | 
						|
    UNUSED(log_id);
 | 
						|
    leader_cnt = 0;
 | 
						|
    for (int64_t svr_idx = 0, svr_cnt = svrs_.size();
 | 
						|
         svr_idx < svr_cnt;
 | 
						|
         ++svr_idx) {
 | 
						|
      Svr &svr = svrs_.at(svr_idx);
 | 
						|
      for (int64_t pidx = 0, pcnt = svr.partitions_.size();
 | 
						|
           pidx < pcnt;
 | 
						|
           ++pidx) {
 | 
						|
        const Partition &p = svr.partitions_.at(pidx);
 | 
						|
        if (pkey == p.pkey_) {
 | 
						|
          svrs.push_back(svr.svr_);
 | 
						|
          if (p.is_leader_) {
 | 
						|
            std::swap(svrs.at(leader_cnt), svrs.at(svrs.count() - 1));
 | 
						|
            leader_cnt += 1;
 | 
						|
          }
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
    _D_("mock rpc req log servers", K(pkey), K(svrs), K(leader_cnt));
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  virtual int req_svr_feedback(const ReqLogSvrFeedback &feedback)
 | 
						|
  {
 | 
						|
    UNUSED(feedback);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
private:
 | 
						|
  // Target svr.
 | 
						|
  ObAddr svr_;
 | 
						|
  // Data set.
 | 
						|
  static std::vector<Svr> svrs_;
 | 
						|
};
 | 
						|
// Static data set. So all instances could access it.
 | 
						|
std::vector<MockRpcInterface2::Svr> MockRpcInterface2::svrs_;
 | 
						|
 | 
						|
/*
 | 
						|
 * Factory.
 | 
						|
 */
 | 
						|
class MockRpcInterface2Factory : public IFetcherRpcInterfaceFactory
 | 
						|
{
 | 
						|
public:
 | 
						|
  virtual int new_fetcher_rpc_interface(IFetcherRpcInterface*& rpc)
 | 
						|
  {
 | 
						|
    rpc = new MockRpcInterface2();
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  virtual int delete_fetcher_rpc_interface(IFetcherRpcInterface* rpc)
 | 
						|
  {
 | 
						|
    delete rpc;
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * Mock Rpc Interface 3.
 | 
						|
 * It owns some servers and partitions, can return
 | 
						|
 * svr addr and heartbeat timestamps.
 | 
						|
 * Notice: user set <svr, partition key, log id, tstamp> tuples
 | 
						|
 * as results.
 | 
						|
 * Used to test:
 | 
						|
 *  - Heartbeat facilities.
 | 
						|
 */
 | 
						|
class MockRpcInterface3 : public IFetcherRpcInterface
 | 
						|
{
 | 
						|
  struct Entry
 | 
						|
  {
 | 
						|
    ObAddr svr_;
 | 
						|
    ObPartitionKey pkey_;
 | 
						|
    uint64_t log_id_;
 | 
						|
    int64_t tstamp_;
 | 
						|
    bool operator<(const Entry &other)
 | 
						|
    {
 | 
						|
      return log_id_ < other.log_id_;
 | 
						|
    }
 | 
						|
  };
 | 
						|
  typedef std::vector<Entry> EntryVec;
 | 
						|
public:
 | 
						|
  static void clear_result()
 | 
						|
  {
 | 
						|
    entry_vec_.clear();
 | 
						|
  }
 | 
						|
  static void add_result(const ObAddr &svr, const ObPartitionKey &pkey,
 | 
						|
                         const uint64_t log_id, const int64_t tstamp)
 | 
						|
  {
 | 
						|
    Entry entry = { svr, pkey, log_id, tstamp };
 | 
						|
    entry_vec_.push_back(entry);
 | 
						|
  }
 | 
						|
public:
 | 
						|
  virtual void set_svr(const common::ObAddr& svr) { UNUSED(svr); }
 | 
						|
 | 
						|
  virtual const ObAddr& get_svr() const { static ObAddr svr; return svr; }
 | 
						|
 | 
						|
  virtual void set_timeout(const int64_t timeout) { UNUSED(timeout); }
 | 
						|
 | 
						|
  virtual int req_start_log_id_by_ts(
 | 
						|
    const obrpc::ObLogReqStartLogIdByTsRequest& req,
 | 
						|
    obrpc::ObLogReqStartLogIdByTsResponse& res)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_start_log_id_by_ts_2(const obrpc::ObLogReqStartLogIdByTsRequestWithBreakpoint &req,
 | 
						|
                                       obrpc::ObLogReqStartLogIdByTsResponseWithBreakpoint &res) {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_start_pos_by_log_id_2(const obrpc::ObLogReqStartPosByLogIdRequestWithBreakpoint &req,
 | 
						|
                                        obrpc::ObLogReqStartPosByLogIdResponseWithBreakpoint &res) {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_start_pos_by_log_id(
 | 
						|
    const obrpc::ObLogReqStartPosByLogIdRequest& req,
 | 
						|
    obrpc::ObLogReqStartPosByLogIdResponse& res)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int fetch_log(
 | 
						|
    const obrpc::ObLogExternalFetchLogRequest& req,
 | 
						|
    obrpc::ObLogExternalFetchLogResponse& res)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_heartbeat_info(
 | 
						|
    const obrpc::ObLogReqHeartbeatInfoRequest& req,
 | 
						|
    obrpc::ObLogReqHeartbeatInfoResponse& res)
 | 
						|
  {
 | 
						|
    typedef obrpc::ObLogReqHeartbeatInfoRequest::Param Param;
 | 
						|
    typedef obrpc::ObLogReqHeartbeatInfoResponse::Result Result;
 | 
						|
    // Itor params.
 | 
						|
    for (int64_t idx = 0, cnt = req.get_params().count(); idx < cnt; ++idx) {
 | 
						|
      const Param ¶m = req.get_params().at(idx);
 | 
						|
      // Find result.
 | 
						|
      bool done = false;
 | 
						|
      for (int64_t idx2 = 0, cnt2 = entry_vec_.size();
 | 
						|
           !done && idx2 < cnt2;
 | 
						|
           ++idx2) {
 | 
						|
        const Entry &entry = entry_vec_[idx2];
 | 
						|
        if (param.pkey_ == entry.pkey_
 | 
						|
            && param.log_id_ == entry.log_id_) {
 | 
						|
          done = true;
 | 
						|
          Result result;
 | 
						|
          result.err_ = OB_SUCCESS;
 | 
						|
          result.tstamp_ = entry.tstamp_;
 | 
						|
          res.append_result(result);
 | 
						|
        }
 | 
						|
      }
 | 
						|
      if (!done) {
 | 
						|
        Result result;
 | 
						|
        result.err_ = OB_NEED_RETRY;
 | 
						|
        res.append_result(result);
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    _D_("mock rpc: req heartbeat", K(req), K(res));
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_leader_heartbeat(
 | 
						|
    const obrpc::ObLogLeaderHeartbeatReq &req,
 | 
						|
    obrpc::ObLogLeaderHeartbeatResp &res)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int open_stream(const obrpc::ObLogOpenStreamReq &req,
 | 
						|
                          obrpc::ObLogOpenStreamResp &res) {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int fetch_stream_log(const obrpc::ObLogStreamFetchLogReq &req,
 | 
						|
                               obrpc::ObLogStreamFetchLogResp &res) {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_NOT_IMPLEMENT;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int get_log_svr(const ObPartitionKey& pkey, const uint64_t log_id,
 | 
						|
                          ObSvrs& svrs, int& leader_cnt)
 | 
						|
  {
 | 
						|
    // Todo. In this version, only one result is enough, log_id is not used.
 | 
						|
    UNUSED(log_id);
 | 
						|
    UNUSED(leader_cnt);
 | 
						|
 | 
						|
    for (int64_t idx = 0, cnt = entry_vec_.size(); idx < cnt; ++idx) {
 | 
						|
      const Entry &entry = entry_vec_[idx];
 | 
						|
      if (pkey == entry.pkey_) {
 | 
						|
        svrs.push_back(entry.svr_);
 | 
						|
        break;
 | 
						|
      }
 | 
						|
    }
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_svr_feedback(const ReqLogSvrFeedback &feedback)
 | 
						|
  {
 | 
						|
    UNUSED(feedback);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
private:
 | 
						|
  static EntryVec entry_vec_;
 | 
						|
};
 | 
						|
 | 
						|
MockRpcInterface3::EntryVec MockRpcInterface3::entry_vec_;
 | 
						|
 | 
						|
/*
 | 
						|
 * Factory.
 | 
						|
 */
 | 
						|
class MockRpcInterface3Factory : public IFetcherRpcInterfaceFactory
 | 
						|
{
 | 
						|
public:
 | 
						|
  virtual int new_fetcher_rpc_interface(IFetcherRpcInterface*& rpc)
 | 
						|
  {
 | 
						|
    rpc = new MockRpcInterface3();
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  virtual int delete_fetcher_rpc_interface(IFetcherRpcInterface* rpc)
 | 
						|
  {
 | 
						|
    delete rpc;
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * TransLog Generator 1.
 | 
						|
 * Generate single partition transaction logs.
 | 
						|
 * Support get trans logs in CORRECT order.
 | 
						|
 * Use:
 | 
						|
 *  - Call next_trans(), specify trans params.
 | 
						|
 *  - Get logs in correct order: redo, redo, ..., prepare, commit/abort.
 | 
						|
 */
 | 
						|
struct TransParam1
 | 
						|
{
 | 
						|
  // Params used in trans log.
 | 
						|
  ObPartitionKey pkey_;
 | 
						|
  ObTransID trans_id_;
 | 
						|
  ObAddr scheduler_;
 | 
						|
  ObPartitionKey coordinator_;
 | 
						|
  ObPartitionArray participants_;
 | 
						|
  ObStartTransParam trans_param_;
 | 
						|
};
 | 
						|
 | 
						|
class TransLogGenerator1
 | 
						|
{
 | 
						|
public:
 | 
						|
  TransLogGenerator1()
 | 
						|
    : param_(),
 | 
						|
    redo_(),
 | 
						|
    prepare_(),
 | 
						|
    commit_(),
 | 
						|
    abort_()
 | 
						|
  { }
 | 
						|
  virtual ~TransLogGenerator1() { }
 | 
						|
public:
 | 
						|
  void next_trans(const TransParam1 ¶m)
 | 
						|
  {
 | 
						|
    param_ = param;
 | 
						|
  }
 | 
						|
  const ObTransRedoLog& next_redo(const uint64_t log_id)
 | 
						|
  {
 | 
						|
    int err = OB_SUCCESS;
 | 
						|
    uint64_t tenant_id = 100;
 | 
						|
    const uint64_t cluster_id = 1000;
 | 
						|
    redo_.reset();
 | 
						|
    ObVersion active_memstore_version(1);
 | 
						|
    err = redo_.init(OB_LOG_TRANS_REDO, param_.pkey_, param_.trans_id_,
 | 
						|
                     tenant_id, log_id, param_.scheduler_, param_.coordinator_,
 | 
						|
                     param_.participants_, param_.trans_param_, cluster_id, active_memstore_version);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    ObTransMutator &mutator = redo_.get_mutator();
 | 
						|
    if (NULL == mutator.get_mutator_buf()) {
 | 
						|
      mutator.init(true);
 | 
						|
    }
 | 
						|
    const char *data = "fly";
 | 
						|
    char *buf = static_cast<char*>(mutator.alloc(strlen(data)));
 | 
						|
    strcpy(buf, data);
 | 
						|
    return redo_;
 | 
						|
  }
 | 
						|
  const ObTransPrepareLog& next_prepare(const ObRedoLogIdArray &all_redos)
 | 
						|
  {
 | 
						|
    int err = OB_SUCCESS;
 | 
						|
    uint64_t tenant_id = 100;
 | 
						|
    const uint64_t cluster_id = 1000;
 | 
						|
    prepare_.reset();
 | 
						|
    ObVersion active_memstore_version(1);
 | 
						|
    err = prepare_.init(OB_LOG_TRANS_PREPARE, param_.pkey_, param_.trans_id_,
 | 
						|
                        tenant_id, param_.scheduler_, param_.coordinator_,
 | 
						|
                        param_.participants_, param_.trans_param_,
 | 
						|
                        OB_SUCCESS, all_redos, 0, cluster_id, active_memstore_version);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    return prepare_;
 | 
						|
  }
 | 
						|
  const ObTransCommitLog& next_commit(const uint64_t prepare_log_id)
 | 
						|
  {
 | 
						|
    int err = OB_SUCCESS;
 | 
						|
    const uint64_t cluster_id = 1000;
 | 
						|
    ObPartitionLogInfo ptl_id(param_.pkey_, prepare_log_id, get_timestamp());
 | 
						|
    PartitionLogInfoArray ptl_ids;
 | 
						|
    if (OB_INVALID_ID == prepare_log_id) {
 | 
						|
      // Pass. For prepare-commit trans log.
 | 
						|
    }
 | 
						|
    else {
 | 
						|
      err = ptl_ids.push_back(ptl_id);
 | 
						|
      EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    }
 | 
						|
    commit_.reset();
 | 
						|
    err = commit_.init(OB_LOG_TRANS_COMMIT, param_.pkey_, param_.trans_id_,
 | 
						|
                       ptl_ids, 1, 0, cluster_id);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    return commit_;
 | 
						|
  }
 | 
						|
  const ObTransAbortLog& next_abort()
 | 
						|
  {
 | 
						|
    int err = OB_SUCCESS;
 | 
						|
    const uint64_t cluster_id = 1000;
 | 
						|
    PartitionLogInfoArray array;
 | 
						|
    abort_.reset();
 | 
						|
    err = abort_.init(OB_LOG_TRANS_ABORT, param_.pkey_, param_.trans_id_, array, cluster_id);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    return abort_;
 | 
						|
  }
 | 
						|
private:
 | 
						|
  TransParam1 param_;
 | 
						|
  ObTransRedoLog redo_;
 | 
						|
  ObTransPrepareLog prepare_;
 | 
						|
  ObTransCommitLog commit_;
 | 
						|
  ObTransAbortLog abort_;
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * Transaction Log Entry Generator 1.
 | 
						|
 * Generate log entries of transactions.
 | 
						|
 */
 | 
						|
class TransLogEntryGenerator1
 | 
						|
{
 | 
						|
public:
 | 
						|
  TransLogEntryGenerator1(const ObPartitionKey &pkey)
 | 
						|
    : pkey_(pkey),
 | 
						|
    log_id_(0),
 | 
						|
    remain_log_cnt_(0),
 | 
						|
    commit_(false),
 | 
						|
    param_(),
 | 
						|
    trans_log_gen_(),
 | 
						|
    prepare_id_(0),
 | 
						|
    redos_(),
 | 
						|
    data_len_(0)
 | 
						|
  {
 | 
						|
    ObAddr addr(ObAddr::IPV4, "127.0.0.1", 5566);
 | 
						|
    param_.pkey_ = pkey_;
 | 
						|
    param_.trans_id_ = ObTransID(addr);
 | 
						|
    param_.scheduler_ = addr;
 | 
						|
    param_.coordinator_ = pkey_;
 | 
						|
    int err = param_.participants_.push_back(pkey_);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    param_.trans_param_.set_access_mode(ObTransAccessMode::READ_WRITE);
 | 
						|
    param_.trans_param_.set_isolation(ObTransIsolation::READ_COMMITED);
 | 
						|
    param_.trans_param_.set_type(ObTransType::TRANS_NORMAL);
 | 
						|
 | 
						|
    buf_ = new char[buf_len_];
 | 
						|
    EXPECT_TRUE(NULL != buf_);
 | 
						|
  }
 | 
						|
  virtual ~TransLogEntryGenerator1()
 | 
						|
  {
 | 
						|
    delete[] buf_;
 | 
						|
  }
 | 
						|
  // Generate normal trans.
 | 
						|
  // Start a new trans.
 | 
						|
  void next_trans(const int64_t redo_cnt, bool commit)
 | 
						|
  {
 | 
						|
    remain_log_cnt_ = 2 + redo_cnt;
 | 
						|
    commit_ = commit;
 | 
						|
    redos_.reset();
 | 
						|
    trans_log_gen_.next_trans(param_);
 | 
						|
  }
 | 
						|
  // Get next log entry.
 | 
						|
  int next_log_entry(ObLogEntry &log_entry)
 | 
						|
  {
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
    if (2 < remain_log_cnt_) {
 | 
						|
      next_redo_(log_entry);
 | 
						|
      // Store redo id.
 | 
						|
      int err = redos_.push_back(log_id_);
 | 
						|
      EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
      log_id_ += 1;
 | 
						|
      remain_log_cnt_ -= 1;
 | 
						|
    }
 | 
						|
    else if (2 == remain_log_cnt_) {
 | 
						|
      next_prepare_(log_entry);
 | 
						|
      prepare_id_ = log_id_;
 | 
						|
      log_id_ += 1;
 | 
						|
      remain_log_cnt_ -= 1;
 | 
						|
    }
 | 
						|
    else if (1 == remain_log_cnt_ && commit_) {
 | 
						|
      next_commit_(log_entry);
 | 
						|
      log_id_ += 1;
 | 
						|
      remain_log_cnt_ -= 1;
 | 
						|
    }
 | 
						|
    else if (1 == remain_log_cnt_ && !commit_) {
 | 
						|
      next_abort_(log_entry);
 | 
						|
      log_id_ += 1;
 | 
						|
      remain_log_cnt_ -= 1;
 | 
						|
    }
 | 
						|
    else {
 | 
						|
      ret = OB_ITER_END;
 | 
						|
    }
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
  // Generate: redo, redo, redo, prepare-commit.
 | 
						|
  int next_log_entry_2(ObLogEntry &log_entry)
 | 
						|
  {
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
    if (2 < remain_log_cnt_) {
 | 
						|
      next_redo_(log_entry);
 | 
						|
      // Store redo id.
 | 
						|
      int err = redos_.push_back(log_id_);
 | 
						|
      EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
      log_id_ += 1;
 | 
						|
      remain_log_cnt_ -= 1;
 | 
						|
    }
 | 
						|
    else if (2 == remain_log_cnt_ && commit_) {
 | 
						|
      next_prepare_with_commit(log_entry);
 | 
						|
      log_id_ += 1;
 | 
						|
      remain_log_cnt_ -= 2;
 | 
						|
    }
 | 
						|
    else if (2 == remain_log_cnt_ && !commit_) {
 | 
						|
      next_prepare_(log_entry);
 | 
						|
      log_id_ += 1;
 | 
						|
      remain_log_cnt_ -= 1;
 | 
						|
    }
 | 
						|
    else if (1 == remain_log_cnt_ && !commit_) {
 | 
						|
      next_abort_(log_entry);
 | 
						|
      log_id_ += 1;
 | 
						|
      remain_log_cnt_ -= 1;
 | 
						|
    }
 | 
						|
    else {
 | 
						|
      ret = OB_ITER_END;
 | 
						|
    }
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
private:
 | 
						|
  void next_redo_(ObLogEntry &log_entry)
 | 
						|
  {
 | 
						|
    int err = OB_SUCCESS;
 | 
						|
    // Gen trans log.
 | 
						|
    const ObTransRedoLog &redo = trans_log_gen_.next_redo(log_id_);
 | 
						|
    int64_t pos = 0;
 | 
						|
    err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_REDO);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = serialization::encode_i64(buf_, buf_len_, pos, 0);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = redo.serialize(buf_, buf_len_, pos);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    data_len_ = pos;
 | 
						|
    // Gen entry header.
 | 
						|
    ObLogEntryHeader header;
 | 
						|
    header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_,
 | 
						|
                           data_len_, get_timestamp(), get_timestamp(),
 | 
						|
                           ObProposalID(), get_timestamp(), ObVersion(0));
 | 
						|
    // Gen log entry.
 | 
						|
    log_entry.generate_entry(header, buf_);
 | 
						|
  }
 | 
						|
  void next_prepare_(ObLogEntry &log_entry)
 | 
						|
  {
 | 
						|
    int err = OB_SUCCESS;
 | 
						|
    // Gen trans log.
 | 
						|
    const ObTransPrepareLog &prepare= trans_log_gen_.next_prepare(redos_);
 | 
						|
    int64_t pos = 0;
 | 
						|
    err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_PREPARE);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = serialization::encode_i64(buf_, buf_len_, pos, 0);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = prepare.serialize(buf_, buf_len_, pos);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    data_len_ = pos;
 | 
						|
    // Gen entry header.
 | 
						|
    ObLogEntryHeader header;
 | 
						|
    header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_,
 | 
						|
                           data_len_, get_timestamp(), get_timestamp(),
 | 
						|
                           ObProposalID(), get_timestamp(), ObVersion(0));
 | 
						|
    // Gen log entry.
 | 
						|
    log_entry.generate_entry(header, buf_);
 | 
						|
  }
 | 
						|
  void next_commit_(ObLogEntry &log_entry)
 | 
						|
  {
 | 
						|
    int err = OB_SUCCESS;
 | 
						|
    // Gen trans log.
 | 
						|
    const ObTransCommitLog &commit = trans_log_gen_.next_commit(prepare_id_);
 | 
						|
    int64_t pos = 0;
 | 
						|
    err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_COMMIT);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = serialization::encode_i64(buf_, buf_len_, pos, 0);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = commit.serialize(buf_, buf_len_, pos);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    data_len_ = pos;
 | 
						|
    // Gen entry header.
 | 
						|
    ObLogEntryHeader header;
 | 
						|
    header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_,
 | 
						|
    data_len_, get_timestamp(), get_timestamp(),
 | 
						|
    ObProposalID(), get_timestamp(), ObVersion(0));
 | 
						|
    // Gen log entry.
 | 
						|
    log_entry.generate_entry(header, buf_);
 | 
						|
  }
 | 
						|
  void next_abort_(ObLogEntry &log_entry)
 | 
						|
  {
 | 
						|
    int err = OB_SUCCESS;
 | 
						|
    // Gen trans log.
 | 
						|
    const ObTransAbortLog &abort = trans_log_gen_.next_abort();
 | 
						|
    int64_t pos = 0;
 | 
						|
    err = serialization::encode_i64(buf_, buf_len_, pos, OB_LOG_TRANS_ABORT);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = serialization::encode_i64(buf_, buf_len_, pos, 0);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = abort.serialize(buf_, buf_len_, pos);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    data_len_ = pos;
 | 
						|
    // Gen entry header.
 | 
						|
    ObLogEntryHeader header;
 | 
						|
    header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_,
 | 
						|
    data_len_, get_timestamp(), get_timestamp(),
 | 
						|
    ObProposalID(), get_timestamp(), ObVersion(0));
 | 
						|
    // Gen log entry.
 | 
						|
    log_entry.generate_entry(header, buf_);
 | 
						|
  }
 | 
						|
  void next_prepare_with_commit(ObLogEntry &log_entry)
 | 
						|
  {
 | 
						|
    int err = OB_SUCCESS;
 | 
						|
    // Gen trans log.
 | 
						|
    const ObTransPrepareLog &prepare= trans_log_gen_.next_prepare(redos_);
 | 
						|
    const ObTransCommitLog &commit = trans_log_gen_.next_commit(OB_INVALID_ID);
 | 
						|
    int64_t pos = 0;
 | 
						|
    err = serialization::encode_i64(buf_, buf_len_,
 | 
						|
                                    pos, OB_LOG_TRANS_PREPARE_WITH_COMMIT);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = serialization::encode_i64(buf_, buf_len_, pos, 0);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = prepare.serialize(buf_, buf_len_, pos);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = commit.serialize(buf_, buf_len_, pos);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    data_len_ = pos;
 | 
						|
    // Gen entry header.
 | 
						|
    ObLogEntryHeader header;
 | 
						|
    header.generate_header(OB_LOG_SUBMIT, pkey_, log_id_, buf_,
 | 
						|
                           data_len_, get_timestamp(), get_timestamp(),
 | 
						|
                           ObProposalID(), get_timestamp(), ObVersion(0));
 | 
						|
    // Gen log entry.
 | 
						|
    log_entry.generate_entry(header, buf_);
 | 
						|
  }
 | 
						|
private:
 | 
						|
  // Params.
 | 
						|
  ObPartitionKey pkey_;
 | 
						|
  uint64_t log_id_;
 | 
						|
  int64_t remain_log_cnt_;
 | 
						|
  bool commit_;
 | 
						|
  // Gen.
 | 
						|
  TransParam1 param_;
 | 
						|
  TransLogGenerator1 trans_log_gen_;
 | 
						|
  uint64_t prepare_id_;
 | 
						|
  ObRedoLogIdArray redos_;
 | 
						|
  // Buf.
 | 
						|
  int64_t data_len_;
 | 
						|
  static const int64_t buf_len_ = 2 * _M_;
 | 
						|
  char *buf_;
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * Mock Parser 1.
 | 
						|
 * Read Task, revert it immediately, and count Task number.
 | 
						|
 */
 | 
						|
class MockParser1 : public IObLogParser
 | 
						|
{
 | 
						|
public:
 | 
						|
  MockParser1() : trans_cnt_(0) { }
 | 
						|
  virtual ~MockParser1() { }
 | 
						|
  virtual int start() { return OB_SUCCESS; }
 | 
						|
  virtual void stop() { }
 | 
						|
  virtual void mark_stop_flag() { }
 | 
						|
  virtual int push(PartTransTask* task, const int64_t timeout)
 | 
						|
  {
 | 
						|
    UNUSED(timeout);
 | 
						|
    if (NULL != task && task->is_normal_trans()) {
 | 
						|
      task->revert();
 | 
						|
      trans_cnt_ += 1;
 | 
						|
      // Debug.
 | 
						|
      // _I_(">>> push parser", "req", task->get_seq());
 | 
						|
    }
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  int64_t get_trans_cnt() const { return trans_cnt_; }
 | 
						|
private:
 | 
						|
  int64_t trans_cnt_;
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * Mock Fetcher Error Handler.
 | 
						|
 */
 | 
						|
class MockFetcherErrHandler1 : public IErrHandler
 | 
						|
{
 | 
						|
public:
 | 
						|
  virtual ~MockFetcherErrHandler1() { }
 | 
						|
public:
 | 
						|
  virtual void handle_err(int err_no, const char* fmt, ...)
 | 
						|
  {
 | 
						|
    UNUSED(err_no);
 | 
						|
    va_list ap;
 | 
						|
    va_start(ap, fmt);
 | 
						|
    __E__(fmt, ap);
 | 
						|
    va_end(ap);
 | 
						|
    abort();
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * Mock Liboblog Error Handler.
 | 
						|
 */
 | 
						|
class MockLiboblogErrHandler1 : public IObLogErrHandler
 | 
						|
{
 | 
						|
public:
 | 
						|
  virtual void handle_error(int err_no, const char* fmt, ...)
 | 
						|
  {
 | 
						|
    UNUSED(err_no);
 | 
						|
    va_list ap;
 | 
						|
    va_start(ap, fmt);
 | 
						|
    __E__(fmt, ap);
 | 
						|
    va_end(ap);
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * Mock SvrProvider.
 | 
						|
 * User set svrs into it.
 | 
						|
 */
 | 
						|
class MockSvrProvider1 : public sqlclient::ObMySQLServerProvider
 | 
						|
{
 | 
						|
public:
 | 
						|
  virtual ~MockSvrProvider1() { }
 | 
						|
  void add_svr(const ObAddr &svr) { svrs_.push_back(svr); }
 | 
						|
public:
 | 
						|
  virtual int get_cluster_list(common::ObIArray<int64_t> &cluster_list)
 | 
						|
  {
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
    if (svrs_.size() > 0) {
 | 
						|
      if (OB_FAIL(cluster_list.push_back(common::OB_INVALID_ID))) {
 | 
						|
        LOG_WARN("fail to push back cluster_id", K(ret));
 | 
						|
      }
 | 
						|
    }
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
  virtual int get_server(const int64_t cluster_id, const int64_t svr_idx, ObAddr& server)
 | 
						|
  {
 | 
						|
    UNUSED(cluster_id);
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
    if (0 <= svr_idx && svr_idx < static_cast<int64_t>(svrs_.size())) {
 | 
						|
      server = svrs_[svr_idx];
 | 
						|
    }
 | 
						|
    else {
 | 
						|
      ret = OB_ERR_UNEXPECTED;
 | 
						|
    }
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
  virtual int64_t get_cluster_count() const
 | 
						|
  {
 | 
						|
    return svrs_.size() > 0 ? 1 : 0;
 | 
						|
  }
 | 
						|
  virtual int64_t get_server_count(const int64_t cluster_id) const
 | 
						|
  {
 | 
						|
    UNUSED(cluster_id)
 | 
						|
    return static_cast<int64_t>(svrs_.size());
 | 
						|
  }
 | 
						|
  virtual int refresh_server_list() { return OB_SUCCESS; }
 | 
						|
private:
 | 
						|
  std::vector<ObAddr> svrs_;
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * Test Dataset Generator.
 | 
						|
 */
 | 
						|
 | 
						|
/*
 | 
						|
 * Svr Config.
 | 
						|
 * Set svr address and mysql port.
 | 
						|
 */
 | 
						|
struct SvrCfg
 | 
						|
{
 | 
						|
  // Svr.
 | 
						|
  const char *svr_addr_;
 | 
						|
  int internal_port_;
 | 
						|
  // Mysql.
 | 
						|
  int mysql_port_;
 | 
						|
  const char *mysql_user_;
 | 
						|
  const char *mysql_password_;
 | 
						|
  const char *mysql_db_;
 | 
						|
  int64_t mysql_timeout_;
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * Configuration for mysql connector.
 | 
						|
 */
 | 
						|
inline ConnectorConfig prepare_cfg_1(const SvrCfg &svr_cfg)
 | 
						|
{
 | 
						|
  ConnectorConfig cfg;
 | 
						|
  cfg.mysql_addr_ = svr_cfg.svr_addr_;
 | 
						|
  cfg.mysql_port_ = svr_cfg.mysql_port_;
 | 
						|
  cfg.mysql_user_ = svr_cfg.mysql_user_;
 | 
						|
  cfg.mysql_password_ = svr_cfg.mysql_password_;
 | 
						|
  cfg.mysql_db_ = svr_cfg.mysql_db_;
 | 
						|
  cfg.mysql_timeout_ = svr_cfg.mysql_timeout_;
 | 
						|
  return cfg;
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * Build table names.
 | 
						|
 */
 | 
						|
inline const char** prepare_table_name_1()
 | 
						|
{
 | 
						|
  static const char* tnames[] = {
 | 
						|
    "table1",
 | 
						|
    "table2",
 | 
						|
    "table3",
 | 
						|
    "table4",
 | 
						|
    "table5",
 | 
						|
    "table6",
 | 
						|
    "table7",
 | 
						|
    "table8",
 | 
						|
    "table9",
 | 
						|
    "table10",
 | 
						|
    "table11",
 | 
						|
    "table12",
 | 
						|
    "table13",
 | 
						|
    "table14",
 | 
						|
    "table15",
 | 
						|
    "table16"
 | 
						|
  };
 | 
						|
  return tnames;
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * Build table schema.
 | 
						|
 */
 | 
						|
inline const char* prepare_table_schema_1()
 | 
						|
{
 | 
						|
  return "c1 int primary key";
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * Create table.
 | 
						|
 */
 | 
						|
class CreateTable : public MySQLQueryBase
 | 
						|
{
 | 
						|
public:
 | 
						|
  CreateTable(const char *tname, const char *schema)
 | 
						|
  {
 | 
						|
    snprintf(buf_, 512, "create table %s(%s)", tname, schema);
 | 
						|
    sql_ = buf_;
 | 
						|
    sql_len_ = strlen(sql_);
 | 
						|
  }
 | 
						|
private:
 | 
						|
  char buf_[512];
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * Drop table.
 | 
						|
 */
 | 
						|
class DropTable : public MySQLQueryBase
 | 
						|
{
 | 
						|
public:
 | 
						|
  DropTable(const char *tname)
 | 
						|
  {
 | 
						|
    snprintf(buf_, 512, "drop table if exists %s", tname);
 | 
						|
    sql_ = buf_;
 | 
						|
    sql_len_ = strlen(sql_);
 | 
						|
  }
 | 
						|
private:
 | 
						|
  char buf_[512];
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * Get table id.
 | 
						|
 */
 | 
						|
class GetTableId : public MySQLQueryBase
 | 
						|
{
 | 
						|
public:
 | 
						|
  GetTableId(const char *tname)
 | 
						|
  {
 | 
						|
    snprintf(buf_, 512, "select table_id "
 | 
						|
                        "from __all_table where table_name='%s'", tname);
 | 
						|
    sql_ = buf_;
 | 
						|
    sql_len_ = strlen(sql_);
 | 
						|
  }
 | 
						|
  int get_tid(uint64_t &tid)
 | 
						|
  {
 | 
						|
    int ret = common::OB_SUCCESS;
 | 
						|
    while (common::OB_SUCCESS == (ret = next_row())) {
 | 
						|
      uint64_t table_id = 0;
 | 
						|
      if (OB_SUCC(ret)) {
 | 
						|
        if (common::OB_SUCCESS != (ret = get_uint(0, table_id))) {
 | 
						|
          OBLOG_LOG(WARN, "err get uint", K(ret));
 | 
						|
        }
 | 
						|
      }
 | 
						|
      tid = table_id;
 | 
						|
    }
 | 
						|
    ret = (common::OB_ITER_END == ret) ? common::OB_SUCCESS : ret;
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
private:
 | 
						|
  char buf_[512];
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * Get partition key by table id from system table.
 | 
						|
 */
 | 
						|
class GetPartitionKey : public MySQLQueryBase
 | 
						|
{
 | 
						|
public:
 | 
						|
  GetPartitionKey(const uint64_t tid)
 | 
						|
  {
 | 
						|
    snprintf(buf_, 512, "select table_id, partition_id, partition_cnt "
 | 
						|
                        "from __all_meta_table where table_id=%lu", tid);
 | 
						|
    sql_ = buf_;
 | 
						|
    sql_len_ = strlen(sql_);
 | 
						|
  }
 | 
						|
  int get_pkeys(ObArray<ObPartitionKey> &pkeys)
 | 
						|
  {
 | 
						|
    int ret = common::OB_SUCCESS;
 | 
						|
    while (common::OB_SUCCESS == (ret = next_row())) {
 | 
						|
      uint64_t table_id = 0;
 | 
						|
      int32_t partition_id = 0;
 | 
						|
      int32_t partition_cnt = 0;
 | 
						|
      if (OB_SUCC(ret)) {
 | 
						|
        if (common::OB_SUCCESS != (ret = get_uint(0, table_id))) {
 | 
						|
          OBLOG_LOG(WARN, "err get uint", K(ret));
 | 
						|
        }
 | 
						|
      }
 | 
						|
      if (OB_SUCC(ret)) {
 | 
						|
        int64_t val = 0;
 | 
						|
        if (common::OB_SUCCESS != (ret = get_int(1, val))) {
 | 
						|
          OBLOG_LOG(WARN, "err get int", K(ret));
 | 
						|
        } else {
 | 
						|
          partition_id = static_cast<int32_t>(val);
 | 
						|
        }
 | 
						|
      }
 | 
						|
      if (OB_SUCC(ret)) {
 | 
						|
        int64_t val = 0;
 | 
						|
        if (common::OB_SUCCESS != (ret = get_int(2, val))) {
 | 
						|
          OBLOG_LOG(WARN, "err get int", K(ret));
 | 
						|
        } else {
 | 
						|
          partition_cnt = static_cast<int32_t>(val);
 | 
						|
        }
 | 
						|
      }
 | 
						|
      ObPartitionKey pkey;
 | 
						|
      pkey.init(table_id, partition_id, partition_cnt);
 | 
						|
      pkeys.push_back(pkey);
 | 
						|
    }
 | 
						|
    ret = (common::OB_ITER_END == ret) ? common::OB_SUCCESS : ret;
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
private:
 | 
						|
  char buf_[512];
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * Create table and return their partition keys.
 | 
						|
 */
 | 
						|
inline void prepare_table_1(const SvrCfg& svr_cfg,
 | 
						|
                            const char** tnames,
 | 
						|
                            const int64_t tcnt,
 | 
						|
                            const char* schema,
 | 
						|
                            ObArray<ObPartitionKey>& pkeys)
 | 
						|
{
 | 
						|
  ObLogMySQLConnector conn;
 | 
						|
  ConnectorConfig cfg = prepare_cfg_1(svr_cfg);
 | 
						|
 | 
						|
  int ret = conn.init(cfg);
 | 
						|
  EXPECT_EQ(OB_SUCCESS, ret);
 | 
						|
 | 
						|
  // Prepare tables.
 | 
						|
 | 
						|
  for (int64_t idx = 0; idx < tcnt; ++idx) {
 | 
						|
    // Drop.
 | 
						|
    DropTable drop_table(tnames[idx]);
 | 
						|
    ret = conn.exec(drop_table);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, ret);
 | 
						|
    // Create.
 | 
						|
    CreateTable create_table(tnames[idx], schema);
 | 
						|
    ret = conn.exec(create_table);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, ret);
 | 
						|
    // Get tid.
 | 
						|
    GetTableId get_tid(tnames[idx]);
 | 
						|
    ret = conn.query(get_tid);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, ret);
 | 
						|
    uint64_t tid = OB_INVALID_ID;
 | 
						|
    ret = get_tid.get_tid(tid);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, ret);
 | 
						|
    // Get pkeys.
 | 
						|
    GetPartitionKey get_pkey(tid);
 | 
						|
    ret = conn.query(get_pkey);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, ret);
 | 
						|
    ret = get_pkey.get_pkeys(pkeys);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, ret);
 | 
						|
  }
 | 
						|
 | 
						|
  ret = conn.destroy();
 | 
						|
  EXPECT_EQ(OB_SUCCESS, ret);
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
 * Data generator.
 | 
						|
 * Insert a bunch of data in server.
 | 
						|
 * For schema 1.
 | 
						|
 */
 | 
						|
class DataGenerator1 : public Runnable
 | 
						|
{
 | 
						|
  class Inserter : public MySQLQueryBase
 | 
						|
  {
 | 
						|
  public:
 | 
						|
    void set_data(const char *tname, const int64_t data)
 | 
						|
    {
 | 
						|
      reuse();
 | 
						|
      snprintf(buf_, 512, "insert into %s (c1) values (%ld)", tname, data);
 | 
						|
      sql_ = buf_;
 | 
						|
      sql_len_ = strlen(sql_);
 | 
						|
    }
 | 
						|
  private:
 | 
						|
    char buf_[512];
 | 
						|
  };
 | 
						|
public:
 | 
						|
  DataGenerator1(const ConnectorConfig &cfg) :
 | 
						|
    conn_(), cfg_(cfg), tname_(NULL), start_(0), end_(0)
 | 
						|
  {
 | 
						|
    int err = conn_.init(cfg_);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
  }
 | 
						|
  ~DataGenerator1()
 | 
						|
  {
 | 
						|
    conn_.destroy();
 | 
						|
  }
 | 
						|
  void insert(const char *tname, const int64_t start, const int64_t end)
 | 
						|
  {
 | 
						|
    tname_ = tname;
 | 
						|
    start_ = start;
 | 
						|
    end_ = end;
 | 
						|
    create();
 | 
						|
  }
 | 
						|
private:
 | 
						|
  int routine()
 | 
						|
  {
 | 
						|
    Inserter inserter;
 | 
						|
    int err = OB_SUCCESS;
 | 
						|
    for (int64_t cur = start_;
 | 
						|
         OB_SUCCESS == err && cur < end_;
 | 
						|
         cur++) {
 | 
						|
      inserter.set_data(tname_, cur);
 | 
						|
      err = conn_.exec(inserter);
 | 
						|
      EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    }
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
private:
 | 
						|
  ObLogMySQLConnector conn_;
 | 
						|
  ConnectorConfig cfg_;
 | 
						|
  const char *tname_;
 | 
						|
  int64_t start_;
 | 
						|
  int64_t end_;
 | 
						|
};
 | 
						|
 | 
						|
/*
 | 
						|
 * End of Test Dataset Generator.
 | 
						|
 */
 | 
						|
 | 
						|
}
 | 
						|
}
 |