228 lines
5.2 KiB
C++
228 lines
5.2 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#define USING_LOG_PREFIX RPC
|
|
#include <iostream>
|
|
#include "rpc/ob_request.h"
|
|
#include "rpc/frame/ob_net_easy.h"
|
|
#include "rpc/frame/ob_req_deliver.h"
|
|
#include "rpc/obrpc/ob_rpc_handler.h"
|
|
#include "rpc/obrpc/ob_rpc_packet.h"
|
|
#include "rpc/obrpc/ob_rpc_proxy.h"
|
|
#include "rpc/obrpc/ob_rpc_processor.h"
|
|
#include "common/data_buffer.h"
|
|
|
|
using namespace oceanbase;
|
|
using namespace oceanbase::lib;
|
|
using namespace oceanbase::rpc;
|
|
using namespace oceanbase::rpc::frame;
|
|
using namespace oceanbase::obrpc;
|
|
using namespace oceanbase::common;
|
|
using namespace std;
|
|
using namespace oceanbase::obsys;
|
|
|
|
#define PORT 3124
|
|
#define IO_CNT 1
|
|
#define SEND_CNT 1
|
|
#define PROC_TH_CNT 4
|
|
|
|
int client_count = 1;
|
|
int io_count = 1;
|
|
int worker_count = 1;
|
|
|
|
class TestProxy : public ObRpcProxy {
|
|
public:
|
|
DEFINE_TO(TestProxy);
|
|
|
|
RPC_S(@PR5 test, OB_TEST_PCODE);
|
|
RPC_AP(@PR5 test2, OB_TEST2_PCODE);
|
|
};
|
|
|
|
int64_t total_fly_time = 0;
|
|
int64_t total_net_time = 0;
|
|
int64_t total_count = 0;
|
|
int64_t total_send_count = 0;
|
|
|
|
class MyProcessor : public TestProxy::Processor<OB_TEST_PCODE> {
|
|
public:
|
|
int process()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
const int64_t fly_time = get_receive_timestamp() - get_send_timestamp();
|
|
const int64_t net_time = get_run_timestamp() - get_receive_timestamp();
|
|
|
|
ATOMIC_FAA(&total_fly_time, fly_time);
|
|
ATOMIC_FAA(&total_net_time, net_time);
|
|
ATOMIC_FAA(&total_count, 1);
|
|
|
|
return ret;
|
|
}
|
|
};
|
|
|
|
class QHandler : public ObiReqQHandler {
|
|
public:
|
|
bool handlePacketQueue(ObRequest* req, void* args)
|
|
{
|
|
UNUSED(args);
|
|
MyProcessor p;
|
|
p.set_ob_request(*req);
|
|
p.run();
|
|
|
|
return true;
|
|
}
|
|
|
|
int onThreadCreated(obsys::CThread*)
|
|
{
|
|
return OB_SUCCESS;
|
|
}
|
|
int onThreadDestroy(obsys::CThread*)
|
|
{
|
|
return OB_SUCCESS;
|
|
}
|
|
};
|
|
|
|
class ObTestDeliver : public rpc::frame::ObReqQDeliver {
|
|
public:
|
|
ObTestDeliver() : ObReqQDeliver(qhandler_)
|
|
{}
|
|
|
|
int init()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
queue_.get_thread().set_thread_count(worker_count);
|
|
queue_.set_qhandler(&qhandler_);
|
|
|
|
return ret;
|
|
}
|
|
|
|
int deliver(rpc::ObRequest& req)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
|
|
req.get_request()->opacket = const_cast<rpc::ObPacket*>(&req.get_packet());
|
|
req.set_request_rtcode(EASY_OK);
|
|
easy_request_wakeup(req.get_request());
|
|
|
|
const int64_t fly_time = req.get_receive_timestamp() - req.get_send_timestamp();
|
|
// const int64_t net_time
|
|
// = req.get_run_timestamp() - req.get_receive_timestamp();
|
|
|
|
ATOMIC_FAA(&total_fly_time, fly_time);
|
|
// ATOMIC_FAA(&total_net_time, net_time);
|
|
ATOMIC_FAA(&total_count, 1);
|
|
|
|
// if (!queue_->push(&req, 100000)) {
|
|
// ret = OB_ERR_UNEXPECTED;
|
|
// }
|
|
|
|
return ret;
|
|
}
|
|
|
|
void stop(){};
|
|
|
|
private:
|
|
ObReqQueueThread queue_;
|
|
QHandler qhandler_;
|
|
};
|
|
|
|
class Client : public CoKThread {
|
|
public:
|
|
Client(const TestProxy& proxy, int th_cnt, ObAddr dst, uint32_t sleep_time)
|
|
: CoKThread(th_cnt), proxy_(proxy), dst_(dst), sleep_time_(sleep_time)
|
|
{}
|
|
|
|
void run(int64_t)
|
|
{
|
|
while (!has_set_stop()) {
|
|
proxy_.to(dst_).test2(NULL);
|
|
ATOMIC_FAA(&total_send_count, 1);
|
|
this_routine::usleep(sleep_time_);
|
|
}
|
|
}
|
|
|
|
private:
|
|
const TestProxy& proxy_;
|
|
const ObAddr dst_;
|
|
const uint32_t sleep_time_;
|
|
};
|
|
|
|
int main(int argc, char* argv[])
|
|
{
|
|
UNUSED(argc);
|
|
UNUSED(argv);
|
|
int ret = OB_SUCCESS;
|
|
|
|
if (argc > 1) {
|
|
client_count = atoi(argv[1]);
|
|
}
|
|
if (argc > 2) {
|
|
io_count = atoi(argv[2]);
|
|
}
|
|
if (argc > 3) {
|
|
worker_count = atoi(argv[3]);
|
|
}
|
|
|
|
rpc::frame::ObNetEasy net_;
|
|
ObTestDeliver server;
|
|
ObRpcHandler handler_(server);
|
|
rpc::frame::ObReqTransport* transport_;
|
|
|
|
if (OB_FAIL(server.init())) {
|
|
exit(1);
|
|
}
|
|
|
|
ObNetOptions opts;
|
|
opts.rpc_io_cnt_ = io_count;
|
|
net_.init(opts);
|
|
|
|
if (OB_FAIL(net_.add_rpc_listen(PORT, handler_, transport_))) {
|
|
exit(1);
|
|
}
|
|
net_.start();
|
|
|
|
TestProxy proxy;
|
|
proxy.init(transport_);
|
|
|
|
ObAddr dst(ObAddr::IPV4, "127.0.0.1", PORT);
|
|
Client client(proxy, client_count, dst, 1);
|
|
client.start();
|
|
|
|
// ObAddr dst2(ObAddr::IPV4, "127.0.0.1", PORT+1);
|
|
// Client client2(proxy, 1, dst2, 1);
|
|
// client2.start();
|
|
|
|
const int64_t start_ts = ObTimeUtility::current_time();
|
|
sleep(10);
|
|
const int64_t end_ts = ObTimeUtility::current_time();
|
|
|
|
client.stop();
|
|
client.wait();
|
|
|
|
// client2.stop();
|
|
// client2.wait();
|
|
|
|
net_.stop();
|
|
net_.wait();
|
|
|
|
cout << endl
|
|
<< "==== RESULT ====" << endl
|
|
<< "total send: \t" << total_send_count << endl
|
|
<< "total proc: \t" << total_count << endl
|
|
<< "elapse sec: \t" << (end_ts - start_ts) / 1000000L << endl
|
|
<< "avg fly time: " << total_fly_time / (total_count + 1) << endl
|
|
<< "avg net time: " << total_net_time / (total_count + 1) << endl
|
|
<< endl;
|
|
|
|
return 0;
|
|
}
|