365 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			365 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /**
 | |
|  * Copyright (c) 2021 OceanBase
 | |
|  * OceanBase CE is licensed under Mulan PubL v2.
 | |
|  * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | |
|  * You may obtain a copy of Mulan PubL v2 at:
 | |
|  *          http://license.coscl.org.cn/MulanPubL-2.0
 | |
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | |
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | |
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | |
|  * See the Mulan PubL v2 for more details.
 | |
|  */
 | |
| 
 | |
| #include <gtest/gtest.h>
 | |
| #include "common/ob_queue_thread.h"
 | |
| #include "ob_log_fetcher_rpc_interface.h"
 | |
| #include "clog/ob_log_entry.h"
 | |
| 
 | |
| namespace oceanbase
 | |
| {
 | |
| using namespace common;
 | |
| using namespace obrpc;
 | |
| using namespace liboblog;
 | |
| using namespace liboblog::fetcher;
 | |
| namespace unittest
 | |
| {
 | |
| 
 | |
| class MockFectherInterface : public IFetcherRpcInterface
 | |
| {
 | |
| public:
 | |
|   MockFectherInterface(ObNetClient &net_client,
 | |
|                        const uint64_t tenant_id = OB_SYS_TENANT_ID)
 | |
|     : net_client_(net_client),
 | |
|       tenant_id_(tenant_id)
 | |
|   {
 | |
|     svr_finder_ = NULL;
 | |
|   }
 | |
|   void set_svr(const ObAddr &svr)
 | |
|   {
 | |
|     svr_ = svr;
 | |
|   }
 | |
|   virtual const ObAddr& get_svr() const
 | |
|   {
 | |
|     return svr_;
 | |
|   }
 | |
|   void set_timeout(const int64_t timeout)
 | |
|   {
 | |
|     timeout_ = timeout;
 | |
|   }
 | |
|   virtual int req_start_log_id_by_ts(const ObLogReqStartLogIdByTsRequest &req,
 | |
|                                      ObLogReqStartLogIdByTsResponse &resp)
 | |
|   {
 | |
|     UNUSED(req);
 | |
|     UNUSED(resp);
 | |
|     return OB_SUCCESS;
 | |
|   }
 | |
|   virtual int req_start_log_id_by_ts_2(
 | |
|       const ObLogReqStartLogIdByTsRequestWithBreakpoint &req,
 | |
|       ObLogReqStartLogIdByTsResponseWithBreakpoint &resp)
 | |
|   {
 | |
|     UNUSED(req);
 | |
|     UNUSED(resp);
 | |
|     return OB_SUCCESS;
 | |
|   }
 | |
| 
 | |
|   virtual int req_start_pos_by_log_id(
 | |
|       const ObLogReqStartPosByLogIdRequest &req,
 | |
|       ObLogReqStartPosByLogIdResponse &resp)
 | |
|   {
 | |
|     UNUSED(req);
 | |
|     UNUSED(resp);
 | |
|     return OB_SUCCESS;
 | |
|   }
 | |
|   virtual int req_start_pos_by_log_id_2(
 | |
|       const ObLogReqStartPosByLogIdRequestWithBreakpoint& req,
 | |
|       ObLogReqStartPosByLogIdResponseWithBreakpoint& resp)
 | |
|   {
 | |
|     UNUSED(req);
 | |
|     UNUSED(resp);
 | |
|     return OB_SUCCESS;
 | |
|   }
 | |
|   virtual int fetch_log(const ObLogExternalFetchLogRequest& req,
 | |
|                         ObLogExternalFetchLogResponse& resp)
 | |
|   {
 | |
|     UNUSED(req);
 | |
|     UNUSED(resp);
 | |
|     return OB_SUCCESS;
 | |
|   }
 | |
|   virtual int req_heartbeat_info(const ObLogReqHeartbeatInfoRequest& req,
 | |
|                                  ObLogReqHeartbeatInfoResponse& resp)
 | |
|   {
 | |
|     UNUSED(req);
 | |
|     UNUSED(resp);
 | |
|     return OB_SUCCESS;
 | |
|   }
 | |
|   virtual int req_leader_heartbeat(
 | |
|     const obrpc::ObLogLeaderHeartbeatReq &req,
 | |
|     obrpc::ObLogLeaderHeartbeatResp &res)
 | |
|   {
 | |
|     UNUSED(req);
 | |
|     UNUSED(res);
 | |
|     return OB_SUCCESS;
 | |
|   }
 | |
|   virtual int req_svr_feedback(const ReqLogSvrFeedback &feedback)
 | |
|   {
 | |
|     UNUSED(feedback);
 | |
|     return OB_SUCCESS;
 | |
|   }
 | |
| 
 | |
|   virtual int open_stream(const ObLogOpenStreamReq &req,
 | |
|                           ObLogOpenStreamResp &resp)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObLogExternalProxy proxy;
 | |
|     if (OB_SUCCESS != (ret = net_client_.get_proxy(proxy))) {
 | |
|       _E_("err get proxy", K(ret));
 | |
|     } else {
 | |
|       ret = proxy.to(svr_).by(tenant_id_).timeout(timeout_).open_stream(req, resp);
 | |
|       int err = proxy.get_result_code().rcode_;
 | |
|       if (_FAIL_(ret) && _FAIL_(err)) {
 | |
|         _W_("err rpc req heartbeat info", K(ret), "result_code", err,
 | |
|             "svr", get_svr(), K(req));
 | |
|         resp.reset();
 | |
|         resp.set_err(OB_ERR_SYS);
 | |
|         ret = OB_SUCCESS;
 | |
|       }
 | |
|       else { }
 | |
|       _D_("rpc: open_stream", K(ret), "svr", get_svr(), K(req), K(resp));
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
|   virtual int fetch_stream_log(const ObLogStreamFetchLogReq &req,
 | |
|                                ObLogStreamFetchLogResp &resp)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     ObLogExternalProxy proxy;
 | |
|     if (OB_SUCCESS != (ret = net_client_.get_proxy(proxy))) {
 | |
|       _E_("err get proxy", K(ret));
 | |
|     } else {
 | |
|       ret = proxy.to(svr_).by(tenant_id_).timeout(timeout_).stream_fetch_log(req, resp);
 | |
|       int err = proxy.get_result_code().rcode_;
 | |
|       if (_FAIL_(ret) && _FAIL_(err)) {
 | |
|         _W_("err rpc req heartbeat info", K(ret), "result_code", err,
 | |
|             "svr", get_svr(), K(req));
 | |
|         resp.reset();
 | |
|         resp.set_err(OB_ERR_SYS);
 | |
|         ret = OB_SUCCESS;
 | |
|       }
 | |
|       else { }
 | |
|       _D_("rpc: stream_fetch_log", K(ret), "svr", get_svr(), K(req), K(resp));
 | |
|     }
 | |
|     return ret;
 | |
|   }
 | |
| private:
 | |
|   ObNetClient &net_client_;
 | |
|   SvrFinder *svr_finder_;
 | |
|   ObAddr svr_;
 | |
|   uint64_t tenant_id_;
 | |
|   int64_t timeout_;
 | |
| };
 | |
| }
 | |
| }
 | |
| 
 | |
| using namespace oceanbase::common;
 | |
| using namespace oceanbase::common::sqlclient;
 | |
| using namespace oceanbase::obrpc;
 | |
| using namespace oceanbase::liboblog;
 | |
| using namespace oceanbase::unittest;
 | |
| using namespace oceanbase::clog;
 | |
| 
 | |
| ObAddr get_svr_addr()
 | |
| {
 | |
|   ObAddr svr;
 | |
|   int32_t port = 27800;
 | |
|   svr.set_ip_addr("100.81.140.76", port);
 | |
|   // int32_t port = 27800;
 | |
|   // svr.set_ip_addr("10.210.170.16", port);
 | |
|   return svr;
 | |
| }
 | |
| 
 | |
| int64_t get_timeout()
 | |
| {
 | |
|   return 60L * 1000 * 1000;
 | |
| }
 | |
| 
 | |
| //#define PKEY_COUNT 1
 | |
| #define PKEY_COUNT 2
 | |
| ObPartitionKey pks[PKEY_COUNT];
 | |
| ObCond table_ready;
 | |
| int64_t trans_log_count_recved[PKEY_COUNT];
 | |
| uint64_t start_log_id[PKEY_COUNT];
 | |
| 
 | |
| #define INSERT_COUNT 9
 | |
| #define LIFE_TIME (1000 * 1000 * 60)
 | |
| 
 | |
| void init_env()
 | |
| {
 | |
|   const int64_t table_id = 1101710651081591;
 | |
|   // const int64_t table_id = 1101710651081589;
 | |
|   for (int i = 0; i < PKEY_COUNT; i++) {
 | |
|     pks[i].init(table_id + i, 0, 1);
 | |
|     trans_log_count_recved[i] = 0;
 | |
|     start_log_id[i] = 1;
 | |
|   }
 | |
| }
 | |
| 
 | |
| void report_log_recved()
 | |
| {
 | |
|   for (int i = 0; i < PKEY_COUNT; i++) {
 | |
|     fprintf(stdout, "pkey.table_id = %ld, trans_log_num = %ld, next_log_id = %ld\n", static_cast<int64_t>(pks[i].table_id_), trans_log_count_recved[i], start_log_id[i]);
 | |
|   }
 | |
| }
 | |
| 
 | |
| void recv_log(ObLogStreamFetchLogResp &fetch_resp)
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   const int64_t log_num = fetch_resp.get_log_num();
 | |
|   const char *buf = fetch_resp.get_log_entry_buf();
 | |
|   ObLogEntry entry;
 | |
|   int64_t pos = 0;
 | |
|   int p = 0;
 | |
|   for (int64_t idx = 0; idx < log_num; ++idx) {
 | |
|     ret = entry.deserialize(buf, OB_MAX_LOG_BUFFER_SIZE, pos);
 | |
|     ASSERT_EQ(OB_SUCCESS, ret);
 | |
|     const ObLogEntryHeader &header = entry.get_header();
 | |
|     _I_("recv clog_entry", K(ret), K(entry));
 | |
|     for (p = 0; p < PKEY_COUNT && pks[p] != header.get_partition_key(); p++);
 | |
|     ASSERT_TRUE(p < PKEY_COUNT);
 | |
|     if (OB_LOG_SUBMIT == header.get_log_type()) {
 | |
|       trans_log_count_recved[p]++;
 | |
|       _I_("trans_log_count_recved", K(p), "pkey", pks[p], "trans_cnt", trans_log_count_recved[p]);
 | |
|     }
 | |
|     ASSERT_TRUE(header.get_log_id() == start_log_id[p]);
 | |
|     start_log_id[p]++;
 | |
|   }
 | |
| }
 | |
| 
 | |
| bool recv_all()
 | |
| {
 | |
|   int i = 0;
 | |
|   for (i = 0; (trans_log_count_recved[i] == INSERT_COUNT) && i < PKEY_COUNT; i++);
 | |
|   // return i == PKEY_COUNT;
 | |
|   return false;
 | |
| }
 | |
| 
 | |
| void start_fetch()
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObNetClient net_client;
 | |
|   ASSERT_EQ(OB_SUCCESS, net_client.init());
 | |
|   MockFectherInterface rpc(net_client);
 | |
|   rpc.set_svr(get_svr_addr());
 | |
|   rpc.set_timeout(get_timeout());
 | |
| 
 | |
|   int64_t c1 = 0;
 | |
|   int64_t c2 = 0;
 | |
|   int err = OB_SUCCESS;
 | |
|   while (!recv_all()) {
 | |
|     c1++;
 | |
|     ObLogOpenStreamReq open_req;
 | |
|     ObLogOpenStreamResp open_resp;
 | |
|     for (int i = 0; OB_SUCC(ret) && i < PKEY_COUNT; i++) {
 | |
|       ObLogOpenStreamReq::Param param;
 | |
|       param.pkey_ = pks[i];
 | |
|       param.start_log_id_ = start_log_id[i];
 | |
|       ASSERT_EQ(OB_SUCCESS, open_req.append_param(param));
 | |
|     }
 | |
|     open_req.set_stream_lifetime(LIFE_TIME);
 | |
|     ret = rpc.open_stream(open_req, open_resp);
 | |
|     ASSERT_EQ(OB_SUCCESS, ret);
 | |
|     ASSERT_TRUE(open_resp.get_stream_seq().is_valid());
 | |
| 
 | |
|     _I_("open_stream success", K(open_resp));
 | |
| 
 | |
|     const ObStreamSeq &seq = open_resp.get_stream_seq();
 | |
|     const int64_t upper_lmt_ts = 100000000000000000L; // large enough
 | |
|     const int64_t step = 100;
 | |
|     c2 = 0;
 | |
|     while (!recv_all()) {
 | |
|       c2++;
 | |
|       ObLogStreamFetchLogReq fetch_req;
 | |
|       ObLogStreamFetchLogResp fetch_resp;
 | |
|       ASSERT_EQ(OB_SUCCESS, fetch_req.set_stream_seq(seq));
 | |
|       ASSERT_EQ(OB_SUCCESS, fetch_req.set_upper_limit_ts(upper_lmt_ts));
 | |
|       ASSERT_EQ(OB_SUCCESS, fetch_req.set_log_cnt_per_part_per_round(step));
 | |
| 
 | |
|       ret = rpc.fetch_stream_log(fetch_req, fetch_resp);
 | |
|       ASSERT_EQ(OB_SUCCESS, ret);
 | |
|       err = fetch_resp.get_err();
 | |
|       if (OB_SUCCESS == err) {
 | |
|         recv_log(fetch_resp);
 | |
|       } else if (OB_STREAM_NOT_EXIST == err) {
 | |
|         fprintf(stdout, "stream not exist\n");
 | |
|         break;
 | |
|       } else {
 | |
|         fprintf(stdout, "error ret=%d\n", err);
 | |
|         ASSERT_TRUE(false);
 | |
|       }
 | |
|       _I_("fetch", K(c1), K(c2));
 | |
|       if (true && REACH_TIME_INTERVAL(1000 * 1000)) {
 | |
|         fprintf(stdout, "--------------------------------------------------\n");
 | |
|         fprintf(stdout, "fetch, c1 = %ld, c2 = %ld\n", c1, c2);
 | |
|         report_log_recved();
 | |
|       }
 | |
|       usleep(1000 * 1000);
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| void del_stale()
 | |
| {
 | |
|   int ret = OB_SUCCESS;
 | |
|   ObNetClient net_client;
 | |
|   ASSERT_EQ(OB_SUCCESS, net_client.init());
 | |
|   MockFectherInterface rpc(net_client);
 | |
|   rpc.set_svr(get_svr_addr());
 | |
|   rpc.set_timeout(get_timeout());
 | |
| 
 | |
|   ObLogOpenStreamReq open_req;
 | |
|   ObLogOpenStreamResp open_resp;
 | |
|   for (int i = 0; OB_SUCC(ret) && i < PKEY_COUNT; i++) {
 | |
|     ObLogOpenStreamReq::Param param;
 | |
|     param.pkey_ = pks[i];
 | |
|     param.start_log_id_ = start_log_id[i];
 | |
|     ASSERT_EQ(OB_SUCCESS, open_req.append_param(param));
 | |
|   }
 | |
|   open_req.set_stream_lifetime(LIFE_TIME);
 | |
|   ret = rpc.open_stream(open_req, open_resp);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   ASSERT_TRUE(open_resp.get_stream_seq().is_valid());
 | |
| 
 | |
|   _I_("open_stream success", K(open_resp));
 | |
| 
 | |
|   const ObStreamSeq &first_seq = open_resp.get_stream_seq();
 | |
| 
 | |
|   ObLogOpenStreamReq open_req2;
 | |
|   ObLogOpenStreamResp open_resp2;
 | |
|   for (int i = 0; OB_SUCC(ret) && i < PKEY_COUNT; i++) {
 | |
|     ObLogOpenStreamReq::Param param;
 | |
|     param.pkey_ = pks[i];
 | |
|     param.start_log_id_ = start_log_id[i];
 | |
|     ASSERT_EQ(OB_SUCCESS, open_req2.append_param(param));
 | |
|   }
 | |
|   open_req2.set_stale_stream(first_seq);
 | |
|   open_req2.set_stream_lifetime(LIFE_TIME);
 | |
|   ret = rpc.open_stream(open_req2, open_resp2);
 | |
|   ASSERT_EQ(OB_SUCCESS, ret);
 | |
|   ASSERT_TRUE(open_resp2.get_stream_seq().is_valid());
 | |
| 
 | |
|   _I_("open_stream success", K(open_resp2));
 | |
| }
 | |
| 
 | |
| int main(int argc, char** argv)
 | |
| {
 | |
|   UNUSED(argc);
 | |
|   UNUSED(argv);
 | |
|   system("rm els.log");
 | |
|   OB_LOGGER.set_file_name("els.log", true);
 | |
|   ObLogger::get_logger().set_mod_log_levels("ALL.*:INFO, TLOG.*:DEBUG");
 | |
|   init_env();
 | |
|   start_fetch();
 | |
|   // del_stale();
 | |
|   return 0;
 | |
| }
 | 
