365 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			365 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/**
 | 
						|
 * Copyright (c) 2021 OceanBase
 | 
						|
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
						|
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
 * You may obtain a copy of Mulan PubL v2 at:
 | 
						|
 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
 * See the Mulan PubL v2 for more details.
 | 
						|
 */
 | 
						|
 | 
						|
#include <gtest/gtest.h>
 | 
						|
#include "common/ob_queue_thread.h"
 | 
						|
#include "ob_log_fetcher_rpc_interface.h"
 | 
						|
#include "clog/ob_log_entry.h"
 | 
						|
 | 
						|
namespace oceanbase
 | 
						|
{
 | 
						|
using namespace common;
 | 
						|
using namespace obrpc;
 | 
						|
using namespace liboblog;
 | 
						|
using namespace liboblog::fetcher;
 | 
						|
namespace unittest
 | 
						|
{
 | 
						|
 | 
						|
class MockFectherInterface : public IFetcherRpcInterface
 | 
						|
{
 | 
						|
public:
 | 
						|
  MockFectherInterface(ObNetClient &net_client,
 | 
						|
                       const uint64_t tenant_id = OB_SYS_TENANT_ID)
 | 
						|
    : net_client_(net_client),
 | 
						|
      tenant_id_(tenant_id)
 | 
						|
  {
 | 
						|
    svr_finder_ = NULL;
 | 
						|
  }
 | 
						|
  void set_svr(const ObAddr &svr)
 | 
						|
  {
 | 
						|
    svr_ = svr;
 | 
						|
  }
 | 
						|
  virtual const ObAddr& get_svr() const
 | 
						|
  {
 | 
						|
    return svr_;
 | 
						|
  }
 | 
						|
  void set_timeout(const int64_t timeout)
 | 
						|
  {
 | 
						|
    timeout_ = timeout;
 | 
						|
  }
 | 
						|
  virtual int req_start_log_id_by_ts(const ObLogReqStartLogIdByTsRequest &req,
 | 
						|
                                     ObLogReqStartLogIdByTsResponse &resp)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(resp);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  virtual int req_start_log_id_by_ts_2(
 | 
						|
      const ObLogReqStartLogIdByTsRequestWithBreakpoint &req,
 | 
						|
      ObLogReqStartLogIdByTsResponseWithBreakpoint &resp)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(resp);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int req_start_pos_by_log_id(
 | 
						|
      const ObLogReqStartPosByLogIdRequest &req,
 | 
						|
      ObLogReqStartPosByLogIdResponse &resp)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(resp);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  virtual int req_start_pos_by_log_id_2(
 | 
						|
      const ObLogReqStartPosByLogIdRequestWithBreakpoint& req,
 | 
						|
      ObLogReqStartPosByLogIdResponseWithBreakpoint& resp)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(resp);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  virtual int fetch_log(const ObLogExternalFetchLogRequest& req,
 | 
						|
                        ObLogExternalFetchLogResponse& resp)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(resp);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  virtual int req_heartbeat_info(const ObLogReqHeartbeatInfoRequest& req,
 | 
						|
                                 ObLogReqHeartbeatInfoResponse& resp)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(resp);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  virtual int req_leader_heartbeat(
 | 
						|
    const obrpc::ObLogLeaderHeartbeatReq &req,
 | 
						|
    obrpc::ObLogLeaderHeartbeatResp &res)
 | 
						|
  {
 | 
						|
    UNUSED(req);
 | 
						|
    UNUSED(res);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
  virtual int req_svr_feedback(const ReqLogSvrFeedback &feedback)
 | 
						|
  {
 | 
						|
    UNUSED(feedback);
 | 
						|
    return OB_SUCCESS;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int open_stream(const ObLogOpenStreamReq &req,
 | 
						|
                          ObLogOpenStreamResp &resp)
 | 
						|
  {
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
    ObLogExternalProxy proxy;
 | 
						|
    if (OB_SUCCESS != (ret = net_client_.get_proxy(proxy))) {
 | 
						|
      _E_("err get proxy", K(ret));
 | 
						|
    } else {
 | 
						|
      ret = proxy.to(svr_).by(tenant_id_).timeout(timeout_).open_stream(req, resp);
 | 
						|
      int err = proxy.get_result_code().rcode_;
 | 
						|
      if (_FAIL_(ret) && _FAIL_(err)) {
 | 
						|
        _W_("err rpc req heartbeat info", K(ret), "result_code", err,
 | 
						|
            "svr", get_svr(), K(req));
 | 
						|
        resp.reset();
 | 
						|
        resp.set_err(OB_ERR_SYS);
 | 
						|
        ret = OB_SUCCESS;
 | 
						|
      }
 | 
						|
      else { }
 | 
						|
      _D_("rpc: open_stream", K(ret), "svr", get_svr(), K(req), K(resp));
 | 
						|
    }
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
 | 
						|
  virtual int fetch_stream_log(const ObLogStreamFetchLogReq &req,
 | 
						|
                               ObLogStreamFetchLogResp &resp)
 | 
						|
  {
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
    ObLogExternalProxy proxy;
 | 
						|
    if (OB_SUCCESS != (ret = net_client_.get_proxy(proxy))) {
 | 
						|
      _E_("err get proxy", K(ret));
 | 
						|
    } else {
 | 
						|
      ret = proxy.to(svr_).by(tenant_id_).timeout(timeout_).stream_fetch_log(req, resp);
 | 
						|
      int err = proxy.get_result_code().rcode_;
 | 
						|
      if (_FAIL_(ret) && _FAIL_(err)) {
 | 
						|
        _W_("err rpc req heartbeat info", K(ret), "result_code", err,
 | 
						|
            "svr", get_svr(), K(req));
 | 
						|
        resp.reset();
 | 
						|
        resp.set_err(OB_ERR_SYS);
 | 
						|
        ret = OB_SUCCESS;
 | 
						|
      }
 | 
						|
      else { }
 | 
						|
      _D_("rpc: stream_fetch_log", K(ret), "svr", get_svr(), K(req), K(resp));
 | 
						|
    }
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
private:
 | 
						|
  ObNetClient &net_client_;
 | 
						|
  SvrFinder *svr_finder_;
 | 
						|
  ObAddr svr_;
 | 
						|
  uint64_t tenant_id_;
 | 
						|
  int64_t timeout_;
 | 
						|
};
 | 
						|
}
 | 
						|
}
 | 
						|
 | 
						|
using namespace oceanbase::common;
 | 
						|
using namespace oceanbase::common::sqlclient;
 | 
						|
using namespace oceanbase::obrpc;
 | 
						|
using namespace oceanbase::liboblog;
 | 
						|
using namespace oceanbase::unittest;
 | 
						|
using namespace oceanbase::clog;
 | 
						|
 | 
						|
ObAddr get_svr_addr()
 | 
						|
{
 | 
						|
  ObAddr svr;
 | 
						|
  int32_t port = 27800;
 | 
						|
  svr.set_ip_addr("100.81.140.76", port);
 | 
						|
  // int32_t port = 27800;
 | 
						|
  // svr.set_ip_addr("10.210.170.16", port);
 | 
						|
  return svr;
 | 
						|
}
 | 
						|
 | 
						|
int64_t get_timeout()
 | 
						|
{
 | 
						|
  return 60L * 1000 * 1000;
 | 
						|
}
 | 
						|
 | 
						|
//#define PKEY_COUNT 1
 | 
						|
#define PKEY_COUNT 2
 | 
						|
ObPartitionKey pks[PKEY_COUNT];
 | 
						|
ObCond table_ready;
 | 
						|
int64_t trans_log_count_recved[PKEY_COUNT];
 | 
						|
uint64_t start_log_id[PKEY_COUNT];
 | 
						|
 | 
						|
#define INSERT_COUNT 9
 | 
						|
#define LIFE_TIME (1000 * 1000 * 60)
 | 
						|
 | 
						|
void init_env()
 | 
						|
{
 | 
						|
  const int64_t table_id = 1101710651081591;
 | 
						|
  // const int64_t table_id = 1101710651081589;
 | 
						|
  for (int i = 0; i < PKEY_COUNT; i++) {
 | 
						|
    pks[i].init(table_id + i, 0, 1);
 | 
						|
    trans_log_count_recved[i] = 0;
 | 
						|
    start_log_id[i] = 1;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void report_log_recved()
 | 
						|
{
 | 
						|
  for (int i = 0; i < PKEY_COUNT; i++) {
 | 
						|
    fprintf(stdout, "pkey.table_id = %ld, trans_log_num = %ld, next_log_id = %ld\n", static_cast<int64_t>(pks[i].table_id_), trans_log_count_recved[i], start_log_id[i]);
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void recv_log(ObLogStreamFetchLogResp &fetch_resp)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  const int64_t log_num = fetch_resp.get_log_num();
 | 
						|
  const char *buf = fetch_resp.get_log_entry_buf();
 | 
						|
  ObLogEntry entry;
 | 
						|
  int64_t pos = 0;
 | 
						|
  int p = 0;
 | 
						|
  for (int64_t idx = 0; idx < log_num; ++idx) {
 | 
						|
    ret = entry.deserialize(buf, OB_MAX_LOG_BUFFER_SIZE, pos);
 | 
						|
    ASSERT_EQ(OB_SUCCESS, ret);
 | 
						|
    const ObLogEntryHeader &header = entry.get_header();
 | 
						|
    _I_("recv clog_entry", K(ret), K(entry));
 | 
						|
    for (p = 0; p < PKEY_COUNT && pks[p] != header.get_partition_key(); p++);
 | 
						|
    ASSERT_TRUE(p < PKEY_COUNT);
 | 
						|
    if (OB_LOG_SUBMIT == header.get_log_type()) {
 | 
						|
      trans_log_count_recved[p]++;
 | 
						|
      _I_("trans_log_count_recved", K(p), "pkey", pks[p], "trans_cnt", trans_log_count_recved[p]);
 | 
						|
    }
 | 
						|
    ASSERT_TRUE(header.get_log_id() == start_log_id[p]);
 | 
						|
    start_log_id[p]++;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
bool recv_all()
 | 
						|
{
 | 
						|
  int i = 0;
 | 
						|
  for (i = 0; (trans_log_count_recved[i] == INSERT_COUNT) && i < PKEY_COUNT; i++);
 | 
						|
  // return i == PKEY_COUNT;
 | 
						|
  return false;
 | 
						|
}
 | 
						|
 | 
						|
void start_fetch()
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  ObNetClient net_client;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, net_client.init());
 | 
						|
  MockFectherInterface rpc(net_client);
 | 
						|
  rpc.set_svr(get_svr_addr());
 | 
						|
  rpc.set_timeout(get_timeout());
 | 
						|
 | 
						|
  int64_t c1 = 0;
 | 
						|
  int64_t c2 = 0;
 | 
						|
  int err = OB_SUCCESS;
 | 
						|
  while (!recv_all()) {
 | 
						|
    c1++;
 | 
						|
    ObLogOpenStreamReq open_req;
 | 
						|
    ObLogOpenStreamResp open_resp;
 | 
						|
    for (int i = 0; OB_SUCC(ret) && i < PKEY_COUNT; i++) {
 | 
						|
      ObLogOpenStreamReq::Param param;
 | 
						|
      param.pkey_ = pks[i];
 | 
						|
      param.start_log_id_ = start_log_id[i];
 | 
						|
      ASSERT_EQ(OB_SUCCESS, open_req.append_param(param));
 | 
						|
    }
 | 
						|
    open_req.set_stream_lifetime(LIFE_TIME);
 | 
						|
    ret = rpc.open_stream(open_req, open_resp);
 | 
						|
    ASSERT_EQ(OB_SUCCESS, ret);
 | 
						|
    ASSERT_TRUE(open_resp.get_stream_seq().is_valid());
 | 
						|
 | 
						|
    _I_("open_stream success", K(open_resp));
 | 
						|
 | 
						|
    const ObStreamSeq &seq = open_resp.get_stream_seq();
 | 
						|
    const int64_t upper_lmt_ts = 100000000000000000L; // large enough
 | 
						|
    const int64_t step = 100;
 | 
						|
    c2 = 0;
 | 
						|
    while (!recv_all()) {
 | 
						|
      c2++;
 | 
						|
      ObLogStreamFetchLogReq fetch_req;
 | 
						|
      ObLogStreamFetchLogResp fetch_resp;
 | 
						|
      ASSERT_EQ(OB_SUCCESS, fetch_req.set_stream_seq(seq));
 | 
						|
      ASSERT_EQ(OB_SUCCESS, fetch_req.set_upper_limit_ts(upper_lmt_ts));
 | 
						|
      ASSERT_EQ(OB_SUCCESS, fetch_req.set_log_cnt_per_part_per_round(step));
 | 
						|
 | 
						|
      ret = rpc.fetch_stream_log(fetch_req, fetch_resp);
 | 
						|
      ASSERT_EQ(OB_SUCCESS, ret);
 | 
						|
      err = fetch_resp.get_err();
 | 
						|
      if (OB_SUCCESS == err) {
 | 
						|
        recv_log(fetch_resp);
 | 
						|
      } else if (OB_STREAM_NOT_EXIST == err) {
 | 
						|
        fprintf(stdout, "stream not exist\n");
 | 
						|
        break;
 | 
						|
      } else {
 | 
						|
        fprintf(stdout, "error ret=%d\n", err);
 | 
						|
        ASSERT_TRUE(false);
 | 
						|
      }
 | 
						|
      _I_("fetch", K(c1), K(c2));
 | 
						|
      if (true && REACH_TIME_INTERVAL(1000 * 1000)) {
 | 
						|
        fprintf(stdout, "--------------------------------------------------\n");
 | 
						|
        fprintf(stdout, "fetch, c1 = %ld, c2 = %ld\n", c1, c2);
 | 
						|
        report_log_recved();
 | 
						|
      }
 | 
						|
      usleep(1000 * 1000);
 | 
						|
    }
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
void del_stale()
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  ObNetClient net_client;
 | 
						|
  ASSERT_EQ(OB_SUCCESS, net_client.init());
 | 
						|
  MockFectherInterface rpc(net_client);
 | 
						|
  rpc.set_svr(get_svr_addr());
 | 
						|
  rpc.set_timeout(get_timeout());
 | 
						|
 | 
						|
  ObLogOpenStreamReq open_req;
 | 
						|
  ObLogOpenStreamResp open_resp;
 | 
						|
  for (int i = 0; OB_SUCC(ret) && i < PKEY_COUNT; i++) {
 | 
						|
    ObLogOpenStreamReq::Param param;
 | 
						|
    param.pkey_ = pks[i];
 | 
						|
    param.start_log_id_ = start_log_id[i];
 | 
						|
    ASSERT_EQ(OB_SUCCESS, open_req.append_param(param));
 | 
						|
  }
 | 
						|
  open_req.set_stream_lifetime(LIFE_TIME);
 | 
						|
  ret = rpc.open_stream(open_req, open_resp);
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ret);
 | 
						|
  ASSERT_TRUE(open_resp.get_stream_seq().is_valid());
 | 
						|
 | 
						|
  _I_("open_stream success", K(open_resp));
 | 
						|
 | 
						|
  const ObStreamSeq &first_seq = open_resp.get_stream_seq();
 | 
						|
 | 
						|
  ObLogOpenStreamReq open_req2;
 | 
						|
  ObLogOpenStreamResp open_resp2;
 | 
						|
  for (int i = 0; OB_SUCC(ret) && i < PKEY_COUNT; i++) {
 | 
						|
    ObLogOpenStreamReq::Param param;
 | 
						|
    param.pkey_ = pks[i];
 | 
						|
    param.start_log_id_ = start_log_id[i];
 | 
						|
    ASSERT_EQ(OB_SUCCESS, open_req2.append_param(param));
 | 
						|
  }
 | 
						|
  open_req2.set_stale_stream(first_seq);
 | 
						|
  open_req2.set_stream_lifetime(LIFE_TIME);
 | 
						|
  ret = rpc.open_stream(open_req2, open_resp2);
 | 
						|
  ASSERT_EQ(OB_SUCCESS, ret);
 | 
						|
  ASSERT_TRUE(open_resp2.get_stream_seq().is_valid());
 | 
						|
 | 
						|
  _I_("open_stream success", K(open_resp2));
 | 
						|
}
 | 
						|
 | 
						|
int main(int argc, char** argv)
 | 
						|
{
 | 
						|
  UNUSED(argc);
 | 
						|
  UNUSED(argv);
 | 
						|
  system("rm els.log");
 | 
						|
  OB_LOGGER.set_file_name("els.log", true);
 | 
						|
  ObLogger::get_logger().set_mod_log_levels("ALL.*:INFO, TLOG.*:DEBUG");
 | 
						|
  init_env();
 | 
						|
  start_fetch();
 | 
						|
  // del_stale();
 | 
						|
  return 0;
 | 
						|
}
 |