161 lines
4.8 KiB
C++
161 lines
4.8 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#define USING_LOG_PREFIX SERVER
|
|
#include <gtest/gtest.h>
|
|
#include <gmock/gmock.h>
|
|
#include "observer/net/ob_rpc_reverse_keepalive.h"
|
|
#include "rpc/obrpc/ob_rpc_reverse_keepalive_struct.h"
|
|
#include "rpc/frame/ob_req_deliver.h"
|
|
#include "lib/net/ob_net_util.h"
|
|
|
|
#define private public
|
|
|
|
using namespace oceanbase::rpc::frame;
|
|
namespace oceanbase
|
|
{
|
|
namespace obrpc
|
|
{
|
|
class TestRpcReverseKeepAliveService : public testing::Test
|
|
{
|
|
public:
|
|
TestRpcReverseKeepAliveService()
|
|
{}
|
|
virtual ~TestRpcReverseKeepAliveService()
|
|
{}
|
|
static int find_port(int start_port = 20000)
|
|
{
|
|
int sock_fd = -1;
|
|
int port = -1;
|
|
if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
|
|
// failed to create socket
|
|
} else {
|
|
struct sockaddr_in serv_addr;
|
|
memset(&serv_addr, 0, sizeof(serv_addr));
|
|
serv_addr.sin_family = AF_INET;
|
|
for (port = start_port; port <= 65535; ++port) {
|
|
serv_addr.sin_port = htons(port);
|
|
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
|
|
if (bind(sock_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) == 0) {
|
|
close(sock_fd);
|
|
break;
|
|
}
|
|
}
|
|
close(sock_fd);
|
|
}
|
|
return port;
|
|
}
|
|
};
|
|
class ObTestRpcQHandler
|
|
: public rpc::frame::ObiReqQHandler
|
|
{
|
|
public:
|
|
ObTestRpcQHandler() {}
|
|
int onThreadCreated(obsys::CThread *) override final { return OB_SUCCESS; };
|
|
int onThreadDestroy(obsys::CThread *) override final { return OB_SUCCESS; };
|
|
bool handlePacketQueue(rpc::ObRequest *req, void *args) override final
|
|
{
|
|
ObIAllocator &alloc = THIS_WORKER.get_sql_arena_allocator();
|
|
const observer::ObGlobalContext &gctx = observer::ObGlobalContext::get_instance();
|
|
ObReqProcessor *processor = NULL;
|
|
ObRpcPacketCode pcode = reinterpret_cast<const obrpc::ObRpcPacket &>(req->get_packet()).get_pcode();
|
|
if (OB_RPC_REVERSE_KEEPALIVE == pcode) {
|
|
processor = OB_NEWx(observer::ObRpcReverseKeepaliveP, &alloc, gctx);
|
|
}
|
|
if (OB_NOT_NULL(processor)) {
|
|
processor->init();
|
|
processor->set_ob_request(*req);
|
|
processor->run();
|
|
processor->~ObReqProcessor();
|
|
THIS_WORKER.get_sql_arena_allocator().free(processor);
|
|
processor = NULL;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
};
|
|
class ObTestRpcDeliver
|
|
: public rpc::frame::ObReqQDeliver
|
|
{
|
|
public:
|
|
ObTestRpcDeliver() : ObReqQDeliver(qhandler_) {};
|
|
int deliver(rpc::ObRequest &req) override final
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
LOG_INFO("deliver rpc request", K(req));
|
|
qhandler_.handlePacketQueue(&req, NULL);
|
|
return ret;
|
|
}
|
|
int init() {return OB_SUCCESS;}
|
|
void stop() override final
|
|
{};
|
|
ObTestRpcQHandler qhandler_;
|
|
};
|
|
bool enable_pkt_nio(bool start_as_client) {
|
|
return true;
|
|
}
|
|
|
|
TEST_F(TestRpcReverseKeepAliveService, reverse_keepalive_service)
|
|
{
|
|
// init
|
|
int ret = common::OB_SUCCESS;
|
|
obrpc::ObSrvRpcProxy srv_rpc_proxy;
|
|
ObReqTransport dummy_transport(NULL, NULL);
|
|
srv_rpc_proxy.init(&dummy_transport);
|
|
ObTestRpcDeliver deliver;
|
|
int port = find_port();
|
|
ObAddr dst;
|
|
uint32_t ip_value = 0;
|
|
if (OB_FAIL(obsys::ObNetUtil::get_local_addr_ipv4("eth0", ip_value))
|
|
&& OB_FAIL(obsys::ObNetUtil::get_local_addr_ipv4("bond0", ip_value))) {
|
|
dst.set_ip_addr("127.0.0.1", port);
|
|
} else {
|
|
ip_value = ntohl(ip_value);
|
|
dst.set_ipv4_addr(ip_value, port);
|
|
LOG_INFO("get local ip", K(dst));
|
|
}
|
|
|
|
// test
|
|
ret = rpc_reverse_keepalive_instance.init(&srv_rpc_proxy);
|
|
ASSERT_EQ(ret, OB_SUCCESS);
|
|
ret = global_poc_server.start(port, 1, &deliver);
|
|
ASSERT_EQ(ret, OB_SUCCESS);
|
|
|
|
uint32_t pkt_id = 1;
|
|
stream_rpc_register(pkt_id, ObTimeUtility::current_time());
|
|
ObRpcReverseKeepaliveArg a(dst, ObTimeUtility::current_time(), pkt_id);
|
|
ret = stream_rpc_reverse_probe(a);
|
|
ASSERT_EQ(ret, OB_SUCCESS);
|
|
stream_rpc_unregister(pkt_id);
|
|
ret = stream_rpc_reverse_probe(a);
|
|
ASSERT_EQ(ret, OB_ENTRY_NOT_EXIST);
|
|
|
|
pkt_id = 2;
|
|
int64_t send_time_us = ObTimeUtility::current_time();
|
|
stream_rpc_register(pkt_id, send_time_us + 100000);
|
|
ObRpcReverseKeepaliveArg a2(dst, send_time_us, pkt_id);
|
|
ret = stream_rpc_reverse_probe(a2);
|
|
ASSERT_EQ(ret, OB_HASH_NOT_EXIST);
|
|
|
|
rpc_reverse_keepalive_instance.destroy();
|
|
}
|
|
|
|
} // end namespace obrpc
|
|
} // end namespace oceanbase
|
|
|
|
int main(int argc, char **argv)
|
|
{
|
|
OB_LOGGER.set_file_name("test_reverse_keepalive.log", true);
|
|
testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|