581 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			581 lines
		
	
	
		
			17 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/**
 | 
						|
 * Copyright (c) 2021 OceanBase
 | 
						|
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
						|
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
 * You may obtain a copy of Mulan PubL v2 at:
 | 
						|
 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
 * See the Mulan PubL v2 for more details.
 | 
						|
 */
 | 
						|
 | 
						|
#include <stdio.h>        // fprintf
 | 
						|
#include <getopt.h>       // getopt_long
 | 
						|
#include <stdlib.h>       // strtoull
 | 
						|
 | 
						|
#include "share/ob_define.h"
 | 
						|
#include "lib/file/file_directory_utils.h"
 | 
						|
#include "liboblog/src/ob_log_fetcher_impl.h"
 | 
						|
 | 
						|
using namespace oceanbase;
 | 
						|
using namespace common;
 | 
						|
using namespace liboblog;
 | 
						|
using namespace fetcher;
 | 
						|
 | 
						|
#define OB_LOGGER ::oceanbase::common::ObLogger::get_logger()
 | 
						|
 | 
						|
#define EXPECT_EQ(EXP, VAL) \
 | 
						|
  do { \
 | 
						|
    if ((EXP) != (VAL)) { _E_("assert failed", #EXP, (EXP), #VAL, (VAL)); exit(1); } \
 | 
						|
  } while(0)
 | 
						|
 | 
						|
namespace oceanbase
 | 
						|
{
 | 
						|
namespace liboblog
 | 
						|
{
 | 
						|
namespace integrationtesting
 | 
						|
{
 | 
						|
 | 
						|
/*
 | 
						|
 * HeartbeatTest:
 | 
						|
 *  - add n partitions, find m servers for each one, let it dispatch and create new workers
 | 
						|
 *  - no log, in time heartbeat
 | 
						|
 *  - parser prints current min and max process
 | 
						|
 *  - discard all, quit
 | 
						|
 */
 | 
						|
class HeartbeatTest
 | 
						|
{
 | 
						|
public:
 | 
						|
  int64_t partition_cnt_;
 | 
						|
  int64_t server_cnt_;
 | 
						|
  int64_t runtime_; // usec
 | 
						|
public:
 | 
						|
  /*
 | 
						|
   * Mock systable helper.
 | 
						|
   *  - provide all m servers for each request, 127.0.0.[1-m]
 | 
						|
   */
 | 
						|
  class MockSystableHelper : public ObILogSysTableHelper
 | 
						|
  {
 | 
						|
  public:
 | 
						|
    int64_t server_cnt_;
 | 
						|
    int64_t now_;
 | 
						|
    void init(const int64_t svr_cnt)
 | 
						|
    {
 | 
						|
      server_cnt_ = svr_cnt;
 | 
						|
      now_ = get_timestamp();
 | 
						|
    }
 | 
						|
  public:
 | 
						|
    virtual int query_all_clog_history_info_by_log_id_1(
 | 
						|
        const common::ObPartitionKey &pkey, const uint64_t log_id,
 | 
						|
        AllClogHistoryInfos &records) {
 | 
						|
      // Generate random results.
 | 
						|
      int ret = OB_SUCCESS;
 | 
						|
      records.reset();
 | 
						|
      AllClogHistoryInfoRecord rec;
 | 
						|
      const int64_t cnt = server_cnt_;
 | 
						|
      for (int64_t idx = 0; idx < cnt; ++idx) {
 | 
						|
        rec.reset();
 | 
						|
        rec.table_id_ = (uint64_t)(pkey.table_id_);
 | 
						|
        rec.partition_idx_ = static_cast<int32_t>(pkey.get_partition_id());//int64_t
 | 
						|
        rec.partition_cnt_ = pkey.get_partition_cnt();//partition cnt
 | 
						|
        rec.start_log_id_ = log_id;
 | 
						|
        rec.end_log_id_ = log_id + 10000;
 | 
						|
        rec.start_log_timestamp_ = now_;
 | 
						|
        rec.end_log_timestamp_ = now_ + 1 * _HOUR_;
 | 
						|
        snprintf(rec.svr_ip_, common::MAX_IP_ADDR_LENGTH + 1, "127.0.0.%ld", 1 + idx);
 | 
						|
        rec.svr_port_ = 8888;
 | 
						|
        records.push_back(rec);
 | 
						|
      }
 | 
						|
      return ret;
 | 
						|
    }
 | 
						|
 | 
						|
    virtual int query_all_clog_history_info_by_timestamp_1(
 | 
						|
        const common::ObPartitionKey &pkey, const int64_t timestamp,
 | 
						|
        AllClogHistoryInfos &records) {
 | 
						|
      // Generate random results.
 | 
						|
      int ret = OB_SUCCESS;
 | 
						|
      records.reset();
 | 
						|
      AllClogHistoryInfoRecord rec;
 | 
						|
      const int64_t cnt = server_cnt_;
 | 
						|
      for (int64_t idx = 0; idx < cnt; ++idx) {
 | 
						|
        rec.reset();
 | 
						|
        rec.table_id_ = (uint64_t)(pkey.table_id_);
 | 
						|
        rec.partition_idx_ = (int32_t)(pkey.get_partition_id());
 | 
						|
        rec.partition_cnt_ = pkey.get_partition_cnt();
 | 
						|
        rec.start_log_id_ = 0;
 | 
						|
        rec.end_log_id_ = 65536;
 | 
						|
        rec.start_log_timestamp_ = timestamp;
 | 
						|
        rec.end_log_timestamp_ = timestamp + (1 * _HOUR_);
 | 
						|
        snprintf(rec.svr_ip_, common::MAX_IP_ADDR_LENGTH + 1, "127.0.0.%ld", 1 + idx);
 | 
						|
        rec.svr_port_ = 8888;
 | 
						|
        records.push_back(rec);
 | 
						|
      }
 | 
						|
      return ret;
 | 
						|
    }
 | 
						|
    virtual int query_all_meta_table_1(
 | 
						|
        const common::ObPartitionKey &pkey, AllMetaTableRecords &records) {
 | 
						|
      // Generate random results.
 | 
						|
      int ret = OB_SUCCESS;
 | 
						|
      UNUSED(pkey);
 | 
						|
      records.reset();
 | 
						|
      AllMetaTableRecord rec;
 | 
						|
      const int64_t cnt = server_cnt_;
 | 
						|
      for (int64_t idx = 0; idx < cnt; ++idx) {
 | 
						|
        rec.reset();
 | 
						|
        snprintf(rec.svr_ip_, common::MAX_IP_ADDR_LENGTH + 1, "127.0.0.%ld", 1 + idx);
 | 
						|
        rec.svr_port_ = 8888;
 | 
						|
        rec.role_ = (0 == idx) ? LEADER : FOLLOWER;
 | 
						|
        records.push_back(rec);
 | 
						|
      }
 | 
						|
      return ret;
 | 
						|
    }
 | 
						|
 | 
						|
    virtual int query_all_meta_table_for_leader(
 | 
						|
        const common::ObPartitionKey &pkey,
 | 
						|
        bool &has_leader,
 | 
						|
        common::ObAddr &leader)
 | 
						|
    {
 | 
						|
      UNUSED(pkey);
 | 
						|
      has_leader = true;
 | 
						|
      leader.set_ip_addr("127.0.0.1", 8888);
 | 
						|
      return OB_SUCCESS;
 | 
						|
    }
 | 
						|
 | 
						|
    virtual int query_all_server_table_1(
 | 
						|
        AllServerTableRecords &records)
 | 
						|
    {
 | 
						|
      int ret = OB_SUCCESS;
 | 
						|
      records.reset();
 | 
						|
      AllServerTableRecord rec;
 | 
						|
      const int64_t cnt = server_cnt_;
 | 
						|
      for (int64_t idx = 0; idx < cnt; ++idx) {
 | 
						|
        rec.reset();
 | 
						|
        snprintf(rec.svr_ip_, common::MAX_IP_ADDR_LENGTH + 1, "127.0.0.%ld", 1 + idx);
 | 
						|
        rec.svr_port_ = 8888;
 | 
						|
        records.push_back(rec);
 | 
						|
      }
 | 
						|
      return ret;
 | 
						|
    }
 | 
						|
  };
 | 
						|
  /*
 | 
						|
   * Rpc.
 | 
						|
   *  - return start log id as 1
 | 
						|
   *  - in time heartbeat
 | 
						|
   *  - can open stream
 | 
						|
   *  - no log
 | 
						|
   */
 | 
						|
  class MockRpcInterface : public IFetcherRpcInterface
 | 
						|
  {
 | 
						|
  public:
 | 
						|
    ~MockRpcInterface() {}
 | 
						|
    virtual void set_svr(const common::ObAddr& svr) { UNUSED(svr); }
 | 
						|
    virtual const ObAddr& get_svr() const { static ObAddr svr; return svr; }
 | 
						|
    virtual void set_timeout(const int64_t timeout) { UNUSED(timeout); }
 | 
						|
    virtual int req_start_log_id_by_ts(
 | 
						|
        const obrpc::ObLogReqStartLogIdByTsRequest& req,
 | 
						|
        obrpc::ObLogReqStartLogIdByTsResponse& res)
 | 
						|
    {
 | 
						|
      UNUSED(req);
 | 
						|
      UNUSED(res);
 | 
						|
      return OB_NOT_IMPLEMENT;
 | 
						|
    }
 | 
						|
    virtual int req_start_log_id_by_ts_2(const obrpc::ObLogReqStartLogIdByTsRequestWithBreakpoint &req,
 | 
						|
                                         obrpc::ObLogReqStartLogIdByTsResponseWithBreakpoint &res) {
 | 
						|
      res.reset();
 | 
						|
      for (int64_t idx = 0, cnt = req.get_params().count(); idx < cnt; ++idx) {
 | 
						|
        obrpc::ObLogReqStartLogIdByTsResponseWithBreakpoint::Result result;
 | 
						|
        result.reset();
 | 
						|
        result.err_ = OB_SUCCESS;
 | 
						|
        result.start_log_id_ = 1;
 | 
						|
        res.append_result(result);
 | 
						|
      }
 | 
						|
      _D_(">>> req start log id", K(req), K(res));
 | 
						|
      return OB_SUCCESS;
 | 
						|
    }
 | 
						|
    virtual int req_start_pos_by_log_id_2(const obrpc::ObLogReqStartPosByLogIdRequestWithBreakpoint &req,
 | 
						|
                                          obrpc::ObLogReqStartPosByLogIdResponseWithBreakpoint &res) {
 | 
						|
      UNUSED(req);
 | 
						|
      UNUSED(res);
 | 
						|
      return OB_NOT_IMPLEMENT;
 | 
						|
    }
 | 
						|
    virtual int req_start_pos_by_log_id(
 | 
						|
        const obrpc::ObLogReqStartPosByLogIdRequest& req,
 | 
						|
        obrpc::ObLogReqStartPosByLogIdResponse& res)
 | 
						|
    {
 | 
						|
      UNUSED(req);
 | 
						|
      UNUSED(res);
 | 
						|
      return OB_NOT_IMPLEMENT;
 | 
						|
    }
 | 
						|
    virtual int fetch_log(
 | 
						|
        const obrpc::ObLogExternalFetchLogRequest& req,
 | 
						|
        obrpc::ObLogExternalFetchLogResponse& res)
 | 
						|
    {
 | 
						|
      UNUSED(req);
 | 
						|
      UNUSED(res);
 | 
						|
      return OB_NOT_IMPLEMENT;
 | 
						|
    }
 | 
						|
    virtual int req_heartbeat_info(
 | 
						|
        const obrpc::ObLogReqHeartbeatInfoRequest& req,
 | 
						|
        obrpc::ObLogReqHeartbeatInfoResponse& res)
 | 
						|
    {
 | 
						|
      res.reset();
 | 
						|
      for (int64_t idx = 0, cnt = req.get_params().count(); idx < cnt; ++idx) {
 | 
						|
        obrpc::ObLogReqHeartbeatInfoResponse::Result result;
 | 
						|
        result.reset();
 | 
						|
        result.err_ = OB_SUCCESS;
 | 
						|
        result.tstamp_ = get_timestamp();
 | 
						|
        res.append_result(result);
 | 
						|
      }
 | 
						|
      _D_(">>> heartbeat", K(req), K(res));
 | 
						|
      return OB_SUCCESS;
 | 
						|
    }
 | 
						|
 | 
						|
    virtual int req_leader_heartbeat(
 | 
						|
        const obrpc::ObLogLeaderHeartbeatReq &req,
 | 
						|
        obrpc::ObLogLeaderHeartbeatResp &res)
 | 
						|
    {
 | 
						|
      res.reset();
 | 
						|
      res.set_err(OB_SUCCESS);
 | 
						|
      res.set_debug_err(OB_SUCCESS);
 | 
						|
      for (int64_t idx = 0, cnt = req.get_params().count(); idx < cnt; ++idx) {
 | 
						|
        obrpc::ObLogLeaderHeartbeatResp::Result result;
 | 
						|
        const obrpc::ObLogLeaderHeartbeatReq::Param ¶m = req.get_params().at(idx);
 | 
						|
 | 
						|
        result.reset();
 | 
						|
        result.err_ = OB_SUCCESS;
 | 
						|
        result.next_served_log_id_ = param.next_log_id_;
 | 
						|
        result.next_served_ts_ = get_timestamp();
 | 
						|
 | 
						|
        EXPECT_EQ(OB_SUCCESS, res.append_result(result));
 | 
						|
      }
 | 
						|
 | 
						|
      _D_(">>> heartbeat", K(req), K(res));
 | 
						|
      return OB_SUCCESS;
 | 
						|
    }
 | 
						|
 | 
						|
    virtual int open_stream(const obrpc::ObLogOpenStreamReq &req,
 | 
						|
                            obrpc::ObLogOpenStreamResp &res) {
 | 
						|
      int ret = OB_SUCCESS;
 | 
						|
      UNUSED(req);
 | 
						|
      obrpc::ObStreamSeq seq;
 | 
						|
      seq.reset();
 | 
						|
      seq.self_.set_ip_addr("127.0.0.1", 8888);
 | 
						|
      seq.seq_ts_ = get_timestamp();
 | 
						|
      res.reset();
 | 
						|
      res.set_err(OB_SUCCESS);
 | 
						|
      res.set_debug_err(OB_SUCCESS);
 | 
						|
      res.set_stream_seq(seq);
 | 
						|
      _D_(">>> open stream", K(req), K(res));
 | 
						|
      return ret;
 | 
						|
    }
 | 
						|
    virtual int fetch_stream_log(const obrpc::ObLogStreamFetchLogReq &req,
 | 
						|
                                 obrpc::ObLogStreamFetchLogResp &res) {
 | 
						|
      UNUSED(req);
 | 
						|
      res.reset();
 | 
						|
      res.set_err(OB_SUCCESS);
 | 
						|
      res.set_debug_err(OB_SUCCESS);
 | 
						|
      _D_(">>> fetch log", K(req), K(res));
 | 
						|
      return OB_SUCCESS;
 | 
						|
    }
 | 
						|
    virtual int req_svr_feedback(const ReqLogSvrFeedback &feedback)
 | 
						|
    {
 | 
						|
      UNUSED(feedback);
 | 
						|
      return OB_SUCCESS;
 | 
						|
    }
 | 
						|
  };
 | 
						|
  /*
 | 
						|
   * Factory.
 | 
						|
   */
 | 
						|
  class MockRpcInterfaceFactory : public IFetcherRpcInterfaceFactory
 | 
						|
  {
 | 
						|
  public:
 | 
						|
    virtual int new_fetcher_rpc_interface(IFetcherRpcInterface*& rpc)
 | 
						|
    {
 | 
						|
      rpc = new MockRpcInterface();
 | 
						|
      return OB_SUCCESS;
 | 
						|
    }
 | 
						|
    virtual int delete_fetcher_rpc_interface(IFetcherRpcInterface* rpc)
 | 
						|
    {
 | 
						|
      delete rpc;
 | 
						|
      return OB_SUCCESS;
 | 
						|
    }
 | 
						|
  };
 | 
						|
 | 
						|
  /*
 | 
						|
   * Mock parser.
 | 
						|
   *  - track process
 | 
						|
   *  - print min & max process
 | 
						|
   */
 | 
						|
  class MockParser : public IObLogParser
 | 
						|
  {
 | 
						|
    typedef common::ObLinearHashMap<common::ObPartitionKey, int64_t> ProcessMap;
 | 
						|
    struct Updater
 | 
						|
    {
 | 
						|
      int64_t tstamp_;
 | 
						|
      bool operator()(const common::ObPartitionKey &pkey, int64_t &val)
 | 
						|
      {
 | 
						|
        UNUSED(pkey);
 | 
						|
        if (val < tstamp_) { val = tstamp_; }
 | 
						|
        return true;
 | 
						|
      }
 | 
						|
    };
 | 
						|
    struct ProcessGetter
 | 
						|
    {
 | 
						|
      int64_t min_process_;
 | 
						|
      int64_t max_process_;
 | 
						|
      bool operator()(const common::ObPartitionKey &pkey, const int64_t &val)
 | 
						|
      {
 | 
						|
        UNUSED(pkey);
 | 
						|
        if (OB_INVALID_TIMESTAMP == min_process_ || val < min_process_) {
 | 
						|
          min_process_ = val;
 | 
						|
        }
 | 
						|
        if (OB_INVALID_TIMESTAMP == max_process_ || max_process_ < val) {
 | 
						|
          max_process_ = val;
 | 
						|
        }
 | 
						|
        return true;
 | 
						|
      }
 | 
						|
      void reset() { min_process_ = OB_INVALID_TIMESTAMP; max_process_ = OB_INVALID_TIMESTAMP; }
 | 
						|
    };
 | 
						|
  public:
 | 
						|
    MockParser() : trans_cnt_(0) { EXPECT_EQ(OB_SUCCESS, process_map_.init()); }
 | 
						|
    virtual ~MockParser() { process_map_.reset(); process_map_.destroy(); }
 | 
						|
    virtual int start() { return OB_SUCCESS; }
 | 
						|
    virtual void stop() { }
 | 
						|
    virtual void mark_stop_flag() { }
 | 
						|
    virtual int push(PartTransTask* task, const int64_t timeout)
 | 
						|
    {
 | 
						|
      UNUSED(timeout);
 | 
						|
      if (NULL != task) {
 | 
						|
        if (task->is_heartbeat()) {
 | 
						|
          const common::ObPartitionKey &pkey = task->get_partition();
 | 
						|
          const int64_t tstamp = task->get_timestamp();
 | 
						|
          Updater updater;
 | 
						|
          updater.tstamp_ = tstamp;
 | 
						|
          EXPECT_EQ(OB_SUCCESS, process_map_.operate(pkey, updater));
 | 
						|
        }
 | 
						|
        task->revert();
 | 
						|
        trans_cnt_ += 1;
 | 
						|
        // Debug.
 | 
						|
        // _I_(">>> push parser", "req", task->get_seq());
 | 
						|
      }
 | 
						|
      return OB_SUCCESS;
 | 
						|
    }
 | 
						|
    int64_t get_trans_cnt() const { return trans_cnt_; }
 | 
						|
    void add_partition(const common::ObPartitionKey &pkey)
 | 
						|
    {
 | 
						|
      EXPECT_EQ(OB_SUCCESS, process_map_.insert(pkey, 0));
 | 
						|
    }
 | 
						|
    void print_process()
 | 
						|
    {
 | 
						|
      int64_t now = get_timestamp();
 | 
						|
      ProcessGetter process_getter;
 | 
						|
      process_getter.reset();
 | 
						|
      EXPECT_EQ(OB_SUCCESS, process_map_.for_each(process_getter));
 | 
						|
      int64_t max_delay_sec = (now - process_getter.min_process_) / 1000000;
 | 
						|
      int64_t max_delay_us = (now - process_getter.min_process_) % 1000000;
 | 
						|
      int64_t min_delay_sec = (now - process_getter.max_process_) / 1000000;
 | 
						|
      int64_t min_delay_us = (now - process_getter.max_process_) % 1000000;
 | 
						|
 | 
						|
      fprintf(stderr, ">>> parser process: %s-%s  DELAY=[%ld.%06ld, %ld.%06ld] sec\n",
 | 
						|
              TS_TO_STR(process_getter.min_process_),
 | 
						|
              TS_TO_STR(process_getter.max_process_),
 | 
						|
              min_delay_sec, min_delay_us, max_delay_sec, max_delay_us);
 | 
						|
    }
 | 
						|
  private:
 | 
						|
    int64_t trans_cnt_;
 | 
						|
    ProcessMap process_map_;
 | 
						|
  };
 | 
						|
  /*
 | 
						|
   * Err handler.
 | 
						|
   *  - exit on error
 | 
						|
   */
 | 
						|
  class MockFetcherErrHandler : public IErrHandler
 | 
						|
  {
 | 
						|
  public:
 | 
						|
    virtual ~MockFetcherErrHandler() { }
 | 
						|
  public:
 | 
						|
    virtual void handle_err(int err_no, const char* fmt, ...)
 | 
						|
    {
 | 
						|
      UNUSED(err_no);
 | 
						|
      va_list ap;
 | 
						|
      va_start(ap, fmt);
 | 
						|
      __E__(fmt, ap);
 | 
						|
      va_end(ap);
 | 
						|
      exit(1);
 | 
						|
    }
 | 
						|
  };
 | 
						|
 | 
						|
public:
 | 
						|
  void run()
 | 
						|
  {
 | 
						|
    int err = OB_SUCCESS;
 | 
						|
 | 
						|
    // Task Pool.
 | 
						|
    ObLogTransTaskPool<PartTransTask> task_pool;
 | 
						|
    ObConcurrentFIFOAllocator task_pool_alloc;
 | 
						|
    err = task_pool_alloc.init(128 * _G_, 8 * _M_, OB_MALLOC_NORMAL_BLOCK_SIZE);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = task_pool.init(&task_pool_alloc, 10240, 1024, 4 * 1024 * 1024, true);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
 | 
						|
    // Parser.
 | 
						|
    MockParser parser;
 | 
						|
 | 
						|
    // Err Handler.
 | 
						|
    MockFetcherErrHandler err_handler;
 | 
						|
 | 
						|
    // Rpc.
 | 
						|
    MockRpcInterfaceFactory rpc_factory;
 | 
						|
 | 
						|
    // Worker Pool.
 | 
						|
    FixedJobPerWorkerPool worker_pool;
 | 
						|
    err = worker_pool.init(1);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
 | 
						|
    // StartLogIdLocator.
 | 
						|
    ::oceanbase::liboblog::fetcher::StartLogIdLocator locator;
 | 
						|
    err = locator.init(&rpc_factory, &err_handler, &worker_pool, 3);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
 | 
						|
    // Heartbeater.
 | 
						|
    Heartbeater heartbeater;
 | 
						|
    err = heartbeater.init(&rpc_factory, &err_handler, &worker_pool, 3);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
 | 
						|
    // SvrFinder.
 | 
						|
    MockSystableHelper systable_helper;
 | 
						|
    systable_helper.init(server_cnt_);
 | 
						|
    ::oceanbase::liboblog::fetcher::SvrFinder svrfinder;
 | 
						|
    err = svrfinder.init(&systable_helper, &err_handler, &worker_pool, 3);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
 | 
						|
    // Fetcher Config.
 | 
						|
    FetcherConfig cfg;
 | 
						|
    cfg.reset();
 | 
						|
 | 
						|
    // Init.
 | 
						|
    ::oceanbase::liboblog::fetcher::Fetcher fetcher;
 | 
						|
    err = fetcher.init(&task_pool, &parser, &err_handler, &rpc_factory,
 | 
						|
                       &worker_pool, &svrfinder, &locator, &heartbeater, &cfg);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
 | 
						|
    // Add partition.
 | 
						|
    for (int64_t idx = 0, cnt = partition_cnt_; (idx < cnt); ++idx) {
 | 
						|
      ObPartitionKey p1(1001 + idx, 1, partition_cnt_);
 | 
						|
      err = fetcher.fetch_partition(p1, 1, OB_INVALID_ID);
 | 
						|
      EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
      parser.add_partition(p1);
 | 
						|
    }
 | 
						|
 | 
						|
    // Run.
 | 
						|
    err = fetcher.start();
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
 | 
						|
    // Runtime.
 | 
						|
    int64_t start = get_timestamp();
 | 
						|
    int64_t last_print_process = start;
 | 
						|
    while ((get_timestamp() - start) < runtime_) {
 | 
						|
      usec_sleep(500 * _MSEC_);
 | 
						|
      if (1 * _SEC_ < get_timestamp() - last_print_process) {
 | 
						|
        last_print_process = get_timestamp();
 | 
						|
        parser.print_process();
 | 
						|
      }
 | 
						|
    }
 | 
						|
 | 
						|
    // Discard partition.
 | 
						|
    for (int64_t idx = 0, cnt = partition_cnt_; (idx < cnt); ++idx) {
 | 
						|
      ObPartitionKey p1(1001 + idx, 1, partition_cnt_);
 | 
						|
      err = fetcher.discard_partition(p1);
 | 
						|
      EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    }
 | 
						|
 | 
						|
    // Stop.
 | 
						|
    err = fetcher.stop(true);
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
 | 
						|
    // Destroy.
 | 
						|
    err = fetcher.destroy();
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = locator.destroy();
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = svrfinder.destroy();
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    err = heartbeater.destroy();
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    worker_pool.destroy();
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
    task_pool.destroy();
 | 
						|
    EXPECT_EQ(OB_SUCCESS, err);
 | 
						|
  }
 | 
						|
};
 | 
						|
 | 
						|
}
 | 
						|
}
 | 
						|
}
 | 
						|
 | 
						|
void print_usage(const char *prog_name)
 | 
						|
{
 | 
						|
  printf("USAGE: %s\n"
 | 
						|
         "   -p, --partition              partition count\n"
 | 
						|
         "   -s, --server                 server count\n"
 | 
						|
         "   -r, --runtime                run time in seconds, default -1, means to run forever\n",
 | 
						|
          prog_name);
 | 
						|
}
 | 
						|
int main(const int argc, char **argv)
 | 
						|
{
 | 
						|
  // option variables
 | 
						|
  int opt = -1;
 | 
						|
  const char *opt_string = "p:s:r:";
 | 
						|
  struct option long_opts[] =
 | 
						|
      {
 | 
						|
          {"partition", 1, NULL, 'p'},
 | 
						|
          {"server", 1, NULL, 's'},
 | 
						|
          {"runtime", 1, NULL, 'r'},
 | 
						|
          {0, 0, 0, 0}
 | 
						|
      };
 | 
						|
 | 
						|
  if (argc <= 1) {
 | 
						|
    print_usage(argv[0]);
 | 
						|
    return 1;
 | 
						|
  }
 | 
						|
 | 
						|
  // Params.
 | 
						|
  int64_t partition_cnt = 0;
 | 
						|
  int64_t server_cnt = 0;
 | 
						|
  int64_t runtime = 1 * ::oceanbase::liboblog::_YEAR_;
 | 
						|
 | 
						|
  // Parse command line
 | 
						|
  while ((opt = getopt_long(argc, argv, opt_string, long_opts, NULL)) != -1) {
 | 
						|
    switch (opt) {
 | 
						|
      case 'p': {
 | 
						|
        partition_cnt = strtoll(optarg, NULL, 10);
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      case 's': {
 | 
						|
        server_cnt = strtoll(optarg, NULL, 10);
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      case 'r': {
 | 
						|
        runtime = strtoll(optarg, NULL, 10);
 | 
						|
        break;
 | 
						|
      }
 | 
						|
      default:
 | 
						|
        print_usage(argv[0]);
 | 
						|
        break;
 | 
						|
    } // end switch
 | 
						|
  } // end while
 | 
						|
 | 
						|
  printf("partition_cnt:%ld server_cnt:%ld runtime:%ld sec\n", partition_cnt, server_cnt, runtime);
 | 
						|
 | 
						|
  // Logger.
 | 
						|
  ::oceanbase::liboblog::fetcher::FetcherLogLevelSetter::get_instance().set_mod_log_levels("TLOG.*:INFO");
 | 
						|
  // Run test.
 | 
						|
  ::oceanbase::liboblog::integrationtesting::HeartbeatTest test;
 | 
						|
  test.partition_cnt_ = partition_cnt;
 | 
						|
  test.server_cnt_ = server_cnt;
 | 
						|
  test.runtime_ = ::oceanbase::liboblog::_SEC_ * runtime;
 | 
						|
  test.run();
 | 
						|
  return 0;
 | 
						|
}
 |