Files
oceanbase/unittest/obcdc/nopretest_test_ext_fetcher.cpp
2022-03-25 18:10:38 +08:00

365 lines
10 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#include "common/ob_queue_thread.h"
#include "ob_log_fetcher_rpc_interface.h"
#include "clog/ob_log_entry.h"
namespace oceanbase
{
using namespace common;
using namespace obrpc;
using namespace liboblog;
using namespace liboblog::fetcher;
namespace unittest
{
class MockFectherInterface : public IFetcherRpcInterface
{
public:
MockFectherInterface(ObNetClient &net_client,
const uint64_t tenant_id = OB_SYS_TENANT_ID)
: net_client_(net_client),
tenant_id_(tenant_id)
{
svr_finder_ = NULL;
}
void set_svr(const ObAddr &svr)
{
svr_ = svr;
}
virtual const ObAddr& get_svr() const
{
return svr_;
}
void set_timeout(const int64_t timeout)
{
timeout_ = timeout;
}
virtual int req_start_log_id_by_ts(const ObLogReqStartLogIdByTsRequest &req,
ObLogReqStartLogIdByTsResponse &resp)
{
UNUSED(req);
UNUSED(resp);
return OB_SUCCESS;
}
virtual int req_start_log_id_by_ts_2(
const ObLogReqStartLogIdByTsRequestWithBreakpoint &req,
ObLogReqStartLogIdByTsResponseWithBreakpoint &resp)
{
UNUSED(req);
UNUSED(resp);
return OB_SUCCESS;
}
virtual int req_start_pos_by_log_id(
const ObLogReqStartPosByLogIdRequest &req,
ObLogReqStartPosByLogIdResponse &resp)
{
UNUSED(req);
UNUSED(resp);
return OB_SUCCESS;
}
virtual int req_start_pos_by_log_id_2(
const ObLogReqStartPosByLogIdRequestWithBreakpoint& req,
ObLogReqStartPosByLogIdResponseWithBreakpoint& resp)
{
UNUSED(req);
UNUSED(resp);
return OB_SUCCESS;
}
virtual int fetch_log(const ObLogExternalFetchLogRequest& req,
ObLogExternalFetchLogResponse& resp)
{
UNUSED(req);
UNUSED(resp);
return OB_SUCCESS;
}
virtual int req_heartbeat_info(const ObLogReqHeartbeatInfoRequest& req,
ObLogReqHeartbeatInfoResponse& resp)
{
UNUSED(req);
UNUSED(resp);
return OB_SUCCESS;
}
virtual int req_leader_heartbeat(
const obrpc::ObLogLeaderHeartbeatReq &req,
obrpc::ObLogLeaderHeartbeatResp &res)
{
UNUSED(req);
UNUSED(res);
return OB_SUCCESS;
}
virtual int req_svr_feedback(const ReqLogSvrFeedback &feedback)
{
UNUSED(feedback);
return OB_SUCCESS;
}
virtual int open_stream(const ObLogOpenStreamReq &req,
ObLogOpenStreamResp &resp)
{
int ret = OB_SUCCESS;
ObLogExternalProxy proxy;
if (OB_SUCCESS != (ret = net_client_.get_proxy(proxy))) {
_E_("err get proxy", K(ret));
} else {
ret = proxy.to(svr_).by(tenant_id_).timeout(timeout_).open_stream(req, resp);
int err = proxy.get_result_code().rcode_;
if (_FAIL_(ret) && _FAIL_(err)) {
_W_("err rpc req heartbeat info", K(ret), "result_code", err,
"svr", get_svr(), K(req));
resp.reset();
resp.set_err(OB_ERR_SYS);
ret = OB_SUCCESS;
}
else { }
_D_("rpc: open_stream", K(ret), "svr", get_svr(), K(req), K(resp));
}
return ret;
}
virtual int fetch_stream_log(const ObLogStreamFetchLogReq &req,
ObLogStreamFetchLogResp &resp)
{
int ret = OB_SUCCESS;
ObLogExternalProxy proxy;
if (OB_SUCCESS != (ret = net_client_.get_proxy(proxy))) {
_E_("err get proxy", K(ret));
} else {
ret = proxy.to(svr_).by(tenant_id_).timeout(timeout_).stream_fetch_log(req, resp);
int err = proxy.get_result_code().rcode_;
if (_FAIL_(ret) && _FAIL_(err)) {
_W_("err rpc req heartbeat info", K(ret), "result_code", err,
"svr", get_svr(), K(req));
resp.reset();
resp.set_err(OB_ERR_SYS);
ret = OB_SUCCESS;
}
else { }
_D_("rpc: stream_fetch_log", K(ret), "svr", get_svr(), K(req), K(resp));
}
return ret;
}
private:
ObNetClient &net_client_;
SvrFinder *svr_finder_;
ObAddr svr_;
uint64_t tenant_id_;
int64_t timeout_;
};
}
}
using namespace oceanbase::common;
using namespace oceanbase::common::sqlclient;
using namespace oceanbase::obrpc;
using namespace oceanbase::liboblog;
using namespace oceanbase::unittest;
using namespace oceanbase::clog;
ObAddr get_svr_addr()
{
ObAddr svr;
int32_t port = 27800;
svr.set_ip_addr("100.81.140.76", port);
// int32_t port = 27800;
// svr.set_ip_addr("10.210.170.16", port);
return svr;
}
int64_t get_timeout()
{
return 60L * 1000 * 1000;
}
//#define PKEY_COUNT 1
#define PKEY_COUNT 2
ObPartitionKey pks[PKEY_COUNT];
ObCond table_ready;
int64_t trans_log_count_recved[PKEY_COUNT];
uint64_t start_log_id[PKEY_COUNT];
#define INSERT_COUNT 9
#define LIFE_TIME (1000 * 1000 * 60)
void init_env()
{
const int64_t table_id = 1101710651081591;
// const int64_t table_id = 1101710651081589;
for (int i = 0; i < PKEY_COUNT; i++) {
pks[i].init(table_id + i, 0, 1);
trans_log_count_recved[i] = 0;
start_log_id[i] = 1;
}
}
void report_log_recved()
{
for (int i = 0; i < PKEY_COUNT; i++) {
fprintf(stdout, "pkey.table_id = %ld, trans_log_num = %ld, next_log_id = %ld\n", static_cast<int64_t>(pks[i].table_id_), trans_log_count_recved[i], start_log_id[i]);
}
}
void recv_log(ObLogStreamFetchLogResp &fetch_resp)
{
int ret = OB_SUCCESS;
const int64_t log_num = fetch_resp.get_log_num();
const char *buf = fetch_resp.get_log_entry_buf();
ObLogEntry entry;
int64_t pos = 0;
int p = 0;
for (int64_t idx = 0; idx < log_num; ++idx) {
ret = entry.deserialize(buf, OB_MAX_LOG_BUFFER_SIZE, pos);
ASSERT_EQ(OB_SUCCESS, ret);
const ObLogEntryHeader &header = entry.get_header();
_I_("recv clog_entry", K(ret), K(entry));
for (p = 0; p < PKEY_COUNT && pks[p] != header.get_partition_key(); p++);
ASSERT_TRUE(p < PKEY_COUNT);
if (OB_LOG_SUBMIT == header.get_log_type()) {
trans_log_count_recved[p]++;
_I_("trans_log_count_recved", K(p), "pkey", pks[p], "trans_cnt", trans_log_count_recved[p]);
}
ASSERT_TRUE(header.get_log_id() == start_log_id[p]);
start_log_id[p]++;
}
}
bool recv_all()
{
int i = 0;
for (i = 0; (trans_log_count_recved[i] == INSERT_COUNT) && i < PKEY_COUNT; i++);
// return i == PKEY_COUNT;
return false;
}
void start_fetch()
{
int ret = OB_SUCCESS;
ObNetClient net_client;
ASSERT_EQ(OB_SUCCESS, net_client.init());
MockFectherInterface rpc(net_client);
rpc.set_svr(get_svr_addr());
rpc.set_timeout(get_timeout());
int64_t c1 = 0;
int64_t c2 = 0;
int err = OB_SUCCESS;
while (!recv_all()) {
c1++;
ObLogOpenStreamReq open_req;
ObLogOpenStreamResp open_resp;
for (int i = 0; OB_SUCC(ret) && i < PKEY_COUNT; i++) {
ObLogOpenStreamReq::Param param;
param.pkey_ = pks[i];
param.start_log_id_ = start_log_id[i];
ASSERT_EQ(OB_SUCCESS, open_req.append_param(param));
}
open_req.set_stream_lifetime(LIFE_TIME);
ret = rpc.open_stream(open_req, open_resp);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_TRUE(open_resp.get_stream_seq().is_valid());
_I_("open_stream success", K(open_resp));
const ObStreamSeq &seq = open_resp.get_stream_seq();
const int64_t upper_lmt_ts = 100000000000000000L; // large enough
const int64_t step = 100;
c2 = 0;
while (!recv_all()) {
c2++;
ObLogStreamFetchLogReq fetch_req;
ObLogStreamFetchLogResp fetch_resp;
ASSERT_EQ(OB_SUCCESS, fetch_req.set_stream_seq(seq));
ASSERT_EQ(OB_SUCCESS, fetch_req.set_upper_limit_ts(upper_lmt_ts));
ASSERT_EQ(OB_SUCCESS, fetch_req.set_log_cnt_per_part_per_round(step));
ret = rpc.fetch_stream_log(fetch_req, fetch_resp);
ASSERT_EQ(OB_SUCCESS, ret);
err = fetch_resp.get_err();
if (OB_SUCCESS == err) {
recv_log(fetch_resp);
} else if (OB_STREAM_NOT_EXIST == err) {
fprintf(stdout, "stream not exist\n");
break;
} else {
fprintf(stdout, "error ret=%d\n", err);
ASSERT_TRUE(false);
}
_I_("fetch", K(c1), K(c2));
if (true && REACH_TIME_INTERVAL(1000 * 1000)) {
fprintf(stdout, "--------------------------------------------------\n");
fprintf(stdout, "fetch, c1 = %ld, c2 = %ld\n", c1, c2);
report_log_recved();
}
usleep(1000 * 1000);
}
}
}
void del_stale()
{
int ret = OB_SUCCESS;
ObNetClient net_client;
ASSERT_EQ(OB_SUCCESS, net_client.init());
MockFectherInterface rpc(net_client);
rpc.set_svr(get_svr_addr());
rpc.set_timeout(get_timeout());
ObLogOpenStreamReq open_req;
ObLogOpenStreamResp open_resp;
for (int i = 0; OB_SUCC(ret) && i < PKEY_COUNT; i++) {
ObLogOpenStreamReq::Param param;
param.pkey_ = pks[i];
param.start_log_id_ = start_log_id[i];
ASSERT_EQ(OB_SUCCESS, open_req.append_param(param));
}
open_req.set_stream_lifetime(LIFE_TIME);
ret = rpc.open_stream(open_req, open_resp);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_TRUE(open_resp.get_stream_seq().is_valid());
_I_("open_stream success", K(open_resp));
const ObStreamSeq &first_seq = open_resp.get_stream_seq();
ObLogOpenStreamReq open_req2;
ObLogOpenStreamResp open_resp2;
for (int i = 0; OB_SUCC(ret) && i < PKEY_COUNT; i++) {
ObLogOpenStreamReq::Param param;
param.pkey_ = pks[i];
param.start_log_id_ = start_log_id[i];
ASSERT_EQ(OB_SUCCESS, open_req2.append_param(param));
}
open_req2.set_stale_stream(first_seq);
open_req2.set_stream_lifetime(LIFE_TIME);
ret = rpc.open_stream(open_req2, open_resp2);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_TRUE(open_resp2.get_stream_seq().is_valid());
_I_("open_stream success", K(open_resp2));
}
int main(int argc, char** argv)
{
UNUSED(argc);
UNUSED(argv);
system("rm els.log");
OB_LOGGER.set_file_name("els.log", true);
ObLogger::get_logger().set_mod_log_levels("ALL.*:INFO, TLOG.*:DEBUG");
init_env();
start_fetch();
// del_stale();
return 0;
}