161 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			161 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/**
 | 
						|
 * Copyright (c) 2021 OceanBase
 | 
						|
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
						|
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
 * You may obtain a copy of Mulan PubL v2 at:
 | 
						|
 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
 * See the Mulan PubL v2 for more details.
 | 
						|
 */
 | 
						|
 | 
						|
#define USING_LOG_PREFIX SERVER
 | 
						|
#include <gtest/gtest.h>
 | 
						|
#include <gmock/gmock.h>
 | 
						|
#include "observer/net/ob_rpc_reverse_keepalive.h"
 | 
						|
#include "rpc/obrpc/ob_rpc_reverse_keepalive_struct.h"
 | 
						|
#include "rpc/frame/ob_req_deliver.h"
 | 
						|
#include "lib/net/ob_net_util.h"
 | 
						|
 | 
						|
#define private public
 | 
						|
 | 
						|
using namespace oceanbase::rpc::frame;
 | 
						|
namespace oceanbase
 | 
						|
{
 | 
						|
namespace obrpc
 | 
						|
{
 | 
						|
class TestRpcReverseKeepAliveService : public testing::Test
 | 
						|
{
 | 
						|
public:
 | 
						|
  TestRpcReverseKeepAliveService()
 | 
						|
  {}
 | 
						|
  virtual ~TestRpcReverseKeepAliveService()
 | 
						|
  {}
 | 
						|
  static int find_port(int start_port = 20000)
 | 
						|
  {
 | 
						|
    int sock_fd = -1;
 | 
						|
    int port = -1;
 | 
						|
    if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
 | 
						|
      // failed to create socket
 | 
						|
    } else {
 | 
						|
      struct sockaddr_in serv_addr;
 | 
						|
      memset(&serv_addr, 0, sizeof(serv_addr));
 | 
						|
      serv_addr.sin_family = AF_INET;
 | 
						|
      for (port = start_port; port <= 65535; ++port) {
 | 
						|
        serv_addr.sin_port = htons(port);
 | 
						|
        serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
 | 
						|
        if (bind(sock_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) == 0) {
 | 
						|
          close(sock_fd);
 | 
						|
          break;
 | 
						|
        }
 | 
						|
      }
 | 
						|
      close(sock_fd);
 | 
						|
    }
 | 
						|
    return port;
 | 
						|
  }
 | 
						|
};
 | 
						|
class ObTestRpcQHandler
 | 
						|
    : public rpc::frame::ObiReqQHandler
 | 
						|
{
 | 
						|
public:
 | 
						|
  ObTestRpcQHandler() {}
 | 
						|
  int onThreadCreated(obsys::CThread *) override final { return OB_SUCCESS; };
 | 
						|
  int onThreadDestroy(obsys::CThread *) override final { return OB_SUCCESS; };
 | 
						|
  bool handlePacketQueue(rpc::ObRequest *req, void *args) override final
 | 
						|
  {
 | 
						|
    ObIAllocator &alloc = THIS_WORKER.get_sql_arena_allocator();
 | 
						|
    const observer::ObGlobalContext gctx;
 | 
						|
    ObReqProcessor *processor = NULL;
 | 
						|
    ObRpcPacketCode pcode = reinterpret_cast<const obrpc::ObRpcPacket &>(req->get_packet()).get_pcode();
 | 
						|
    if (OB_RPC_REVERSE_KEEPALIVE == pcode) {
 | 
						|
      processor = OB_NEWx(observer::ObRpcReverseKeepaliveP, &alloc, gctx);
 | 
						|
    }
 | 
						|
    if (OB_NOT_NULL(processor)) {
 | 
						|
      processor->init();
 | 
						|
      processor->set_ob_request(*req);
 | 
						|
      processor->run();
 | 
						|
      processor->~ObReqProcessor();
 | 
						|
      THIS_WORKER.get_sql_arena_allocator().free(processor);
 | 
						|
      processor = NULL;
 | 
						|
    }
 | 
						|
    return true;
 | 
						|
  }
 | 
						|
 | 
						|
};
 | 
						|
class ObTestRpcDeliver
 | 
						|
  : public rpc::frame::ObReqQDeliver
 | 
						|
{
 | 
						|
public:
 | 
						|
  ObTestRpcDeliver() : ObReqQDeliver(qhandler_) {};
 | 
						|
  int deliver(rpc::ObRequest &req) override final
 | 
						|
  {
 | 
						|
    int ret = OB_SUCCESS;
 | 
						|
    LOG_INFO("deliver rpc request", K(req));
 | 
						|
    qhandler_.handlePacketQueue(&req, NULL);
 | 
						|
    return ret;
 | 
						|
  }
 | 
						|
  int init() {return OB_SUCCESS;}
 | 
						|
  void stop() override final
 | 
						|
  {};
 | 
						|
  ObTestRpcQHandler qhandler_;
 | 
						|
};
 | 
						|
bool enable_pkt_nio(bool start_as_client) {
 | 
						|
  return true;
 | 
						|
}
 | 
						|
 | 
						|
TEST_F(TestRpcReverseKeepAliveService, reverse_keepalive_service)
 | 
						|
{
 | 
						|
  // init
 | 
						|
  int ret = common::OB_SUCCESS;
 | 
						|
  obrpc::ObSrvRpcProxy srv_rpc_proxy;
 | 
						|
  ObReqTransport dummy_transport(NULL, NULL);
 | 
						|
  srv_rpc_proxy.init(&dummy_transport);
 | 
						|
  ObTestRpcDeliver deliver;
 | 
						|
  int port = find_port();
 | 
						|
  ObAddr dst;
 | 
						|
  uint32_t ip_value = 0;
 | 
						|
  if (OB_FAIL(obsys::ObNetUtil::get_local_addr_ipv4("eth0", ip_value))
 | 
						|
    && OB_FAIL(obsys::ObNetUtil::get_local_addr_ipv4("bond0", ip_value))) {
 | 
						|
    dst.set_ip_addr("127.0.0.1", port);
 | 
						|
  } else {
 | 
						|
    ip_value = ntohl(ip_value);
 | 
						|
    dst.set_ipv4_addr(ip_value, port);
 | 
						|
    LOG_INFO("get local ip", K(dst));
 | 
						|
  }
 | 
						|
 | 
						|
  // test
 | 
						|
  ret = rpc_reverse_keepalive_instance.init(&srv_rpc_proxy);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
  ret = global_poc_server.start(port, 1, &deliver);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
 | 
						|
  uint32_t pkt_id = 1;
 | 
						|
  stream_rpc_register(pkt_id, ObTimeUtility::current_time());
 | 
						|
  ObRpcReverseKeepaliveArg a(dst, ObTimeUtility::current_time(), pkt_id);
 | 
						|
  ret = stream_rpc_reverse_probe(a);
 | 
						|
  ASSERT_EQ(ret, OB_SUCCESS);
 | 
						|
  stream_rpc_unregister(pkt_id);
 | 
						|
  ret = stream_rpc_reverse_probe(a);
 | 
						|
  ASSERT_EQ(ret, OB_ENTRY_NOT_EXIST);
 | 
						|
 | 
						|
  pkt_id = 2;
 | 
						|
  int64_t send_time_us = ObTimeUtility::current_time();
 | 
						|
  stream_rpc_register(pkt_id, send_time_us + 100000);
 | 
						|
  ObRpcReverseKeepaliveArg a2(dst, send_time_us, pkt_id);
 | 
						|
  ret = stream_rpc_reverse_probe(a2);
 | 
						|
  ASSERT_EQ(ret, OB_HASH_NOT_EXIST);
 | 
						|
 | 
						|
  rpc_reverse_keepalive_instance.destroy();
 | 
						|
}
 | 
						|
 | 
						|
} // end namespace obrpc
 | 
						|
} // end namespace oceanbase
 | 
						|
 | 
						|
int main(int argc, char **argv)
 | 
						|
{
 | 
						|
  OB_LOGGER.set_file_name("test_reverse_keepalive.log", true);
 | 
						|
  testing::InitGoogleTest(&argc, argv);
 | 
						|
  return RUN_ALL_TESTS();
 | 
						|
}
 |