Files
oceanbase/unittest/observer/net/test_rpc_reverse_keepalive.cpp
2024-08-31 12:52:27 +00:00

161 lines
4.8 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SERVER
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include "observer/net/ob_rpc_reverse_keepalive.h"
#include "rpc/obrpc/ob_rpc_reverse_keepalive_struct.h"
#include "rpc/frame/ob_req_deliver.h"
#include "lib/net/ob_net_util.h"
#define private public
using namespace oceanbase::rpc::frame;
namespace oceanbase
{
namespace obrpc
{
class TestRpcReverseKeepAliveService : public testing::Test
{
public:
TestRpcReverseKeepAliveService()
{}
virtual ~TestRpcReverseKeepAliveService()
{}
static int find_port(int start_port = 20000)
{
int sock_fd = -1;
int port = -1;
if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
// failed to create socket
} else {
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
for (port = start_port; port <= 65535; ++port) {
serv_addr.sin_port = htons(port);
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(sock_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) == 0) {
close(sock_fd);
break;
}
}
close(sock_fd);
}
return port;
}
};
class ObTestRpcQHandler
: public rpc::frame::ObiReqQHandler
{
public:
ObTestRpcQHandler() {}
int onThreadCreated(obsys::CThread *) override final { return OB_SUCCESS; };
int onThreadDestroy(obsys::CThread *) override final { return OB_SUCCESS; };
bool handlePacketQueue(rpc::ObRequest *req, void *args) override final
{
ObIAllocator &alloc = THIS_WORKER.get_sql_arena_allocator();
const observer::ObGlobalContext &gctx = observer::ObGlobalContext::get_instance();
ObReqProcessor *processor = NULL;
ObRpcPacketCode pcode = reinterpret_cast<const obrpc::ObRpcPacket &>(req->get_packet()).get_pcode();
if (OB_RPC_REVERSE_KEEPALIVE == pcode) {
processor = OB_NEWx(observer::ObRpcReverseKeepaliveP, &alloc, gctx);
}
if (OB_NOT_NULL(processor)) {
processor->init();
processor->set_ob_request(*req);
processor->run();
processor->~ObReqProcessor();
THIS_WORKER.get_sql_arena_allocator().free(processor);
processor = NULL;
}
return true;
}
};
class ObTestRpcDeliver
: public rpc::frame::ObReqQDeliver
{
public:
ObTestRpcDeliver() : ObReqQDeliver(qhandler_) {};
int deliver(rpc::ObRequest &req) override final
{
int ret = OB_SUCCESS;
LOG_INFO("deliver rpc request", K(req));
qhandler_.handlePacketQueue(&req, NULL);
return ret;
}
int init() {return OB_SUCCESS;}
void stop() override final
{};
ObTestRpcQHandler qhandler_;
};
bool enable_pkt_nio(bool start_as_client) {
return true;
}
TEST_F(TestRpcReverseKeepAliveService, reverse_keepalive_service)
{
// init
int ret = common::OB_SUCCESS;
obrpc::ObSrvRpcProxy srv_rpc_proxy;
ObReqTransport dummy_transport(NULL, NULL);
srv_rpc_proxy.init(&dummy_transport);
ObTestRpcDeliver deliver;
int port = find_port();
ObAddr dst;
uint32_t ip_value = 0;
if (OB_FAIL(obsys::ObNetUtil::get_local_addr_ipv4("eth0", ip_value))
&& OB_FAIL(obsys::ObNetUtil::get_local_addr_ipv4("bond0", ip_value))) {
dst.set_ip_addr("127.0.0.1", port);
} else {
ip_value = ntohl(ip_value);
dst.set_ipv4_addr(ip_value, port);
LOG_INFO("get local ip", K(dst));
}
// test
ret = rpc_reverse_keepalive_instance.init(&srv_rpc_proxy);
ASSERT_EQ(ret, OB_SUCCESS);
ret = global_poc_server.start(port, 1, &deliver);
ASSERT_EQ(ret, OB_SUCCESS);
uint32_t pkt_id = 1;
stream_rpc_register(pkt_id, ObTimeUtility::current_time());
ObRpcReverseKeepaliveArg a(dst, ObTimeUtility::current_time(), pkt_id);
ret = stream_rpc_reverse_probe(a);
ASSERT_EQ(ret, OB_SUCCESS);
stream_rpc_unregister(pkt_id);
ret = stream_rpc_reverse_probe(a);
ASSERT_EQ(ret, OB_ENTRY_NOT_EXIST);
pkt_id = 2;
int64_t send_time_us = ObTimeUtility::current_time();
stream_rpc_register(pkt_id, send_time_us + 100000);
ObRpcReverseKeepaliveArg a2(dst, send_time_us, pkt_id);
ret = stream_rpc_reverse_probe(a2);
ASSERT_EQ(ret, OB_HASH_NOT_EXIST);
rpc_reverse_keepalive_instance.destroy();
}
} // end namespace obrpc
} // end namespace oceanbase
int main(int argc, char **argv)
{
OB_LOGGER.set_file_name("test_reverse_keepalive.log", true);
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}