Add stream rpc reverse keepalive

This commit is contained in:
liucc1997 2024-03-27 02:45:32 +00:00 committed by ob-robot
parent 6011bd19fb
commit 2bdb4a9bbc
28 changed files with 565 additions and 24 deletions

View File

@ -230,8 +230,8 @@ int32_t ObPocClientStub::get_proxy_group_id(ObRpcProxy& proxy) {
void ObPocClientStub::set_rcode(ObRpcProxy& proxy, const ObRpcResultCode& rcode) {
proxy.set_result_code(rcode);
}
void ObPocClientStub::set_handle(ObRpcProxy& proxy, Handle* handle, const ObRpcPacketCode& pcode, const ObRpcOpts& opts, bool is_stream_next, int64_t session_id) {
proxy.set_handle_attr(handle, pcode, opts, is_stream_next, session_id);
void ObPocClientStub::set_handle(ObRpcProxy& proxy, Handle* handle, const ObRpcPacketCode& pcode, const ObRpcOpts& opts, bool is_stream_next, int64_t session_id, int64_t pkt_id, int64_t send_ts) {
proxy.set_handle_attr(handle, pcode, opts, is_stream_next, session_id, pkt_id, send_ts);
}
int ObPocClientStub::translate_io_error(int io_err) {
return tranlate_to_ob_error(io_err);

View File

@ -105,7 +105,7 @@ public:
static int64_t get_proxy_timeout(ObRpcProxy& proxy);
static void set_rcode(ObRpcProxy& proxy, const ObRpcResultCode& rcode);
static int check_blacklist(const common::ObAddr& addr);
static void set_handle(ObRpcProxy& proxy, Handle* handle, const ObRpcPacketCode& pcode, const ObRpcOpts& opts, bool is_stream_next, int64_t session_id);
static void set_handle(ObRpcProxy& proxy, Handle* handle, const ObRpcPacketCode& pcode, const ObRpcOpts& opts, bool is_stream_next, int64_t session_id, int64_t pkt_id, int64_t send_ts);
static int32_t get_proxy_group_id(ObRpcProxy& proxy);
static uint8_t balance_assign_tidx()
{
@ -181,7 +181,8 @@ public:
}
set_rcode(proxy, rcode);
if (OB_SUCC(ret) && handle) {
set_handle(proxy, handle, pcode, opts, resp_pkt.is_stream_next(), resp_pkt.get_session_id());
int64_t pkt_id = static_cast<int64_t>(cb.pkt_id_);
set_handle(proxy, handle, pcode, opts, resp_pkt.is_stream_next(), resp_pkt.get_session_id(), pkt_id, start_ts);
}
}
rpc::RpcStatPiece piece;

View File

@ -39,6 +39,22 @@ bool __attribute__((weak)) enable_pkt_nio(bool start_as_client) {
int64_t __attribute__((weak)) get_max_rpc_packet_size() {
return OB_MAX_RPC_PACKET_LENGTH;
}
void __attribute__((weak)) stream_rpc_register(const int64_t pkt_id, int64_t send_time_us)
{
UNUSED(pkt_id);
UNUSED(send_time_us);
RPC_LOG_RET(WARN, OB_ERR_UNEXPECTED, "should not reach here");
}
void __attribute__((weak)) stream_rpc_unregister(const int64_t pkt_id)
{
UNUSED(pkt_id);
RPC_LOG_RET(WARN, OB_ERR_UNEXPECTED, "should not reach here");
}
int __attribute__((weak)) stream_rpc_reverse_probe(const ObRpcReverseKeepaliveArg& reverse_keepalive_arg)
{
UNUSED(reverse_keepalive_arg);
return OB_ERR_UNEXPECTED;
}
}; // end namespace obrpc
}; // end namespace oceanbase
@ -107,6 +123,10 @@ int ObPocServerHandleContext::create(int64_t resp_id, const char* buf, int64_t s
}
int64_t receive_ts = ObTimeUtility::current_time();
pkt->set_receive_ts(receive_ts);
int64_t pkt_id = pn_get_pkt_id(resp_id);
if (OB_LIKELY(pkt_id >= 0)) {
pkt->set_packet_id(pkt_id);
}
pkt->set_content(packet_data, tmp_pkt.get_clen());
req->set_server_handle_context(ctx);
req->set_packet(pkt);

View File

@ -22,6 +22,10 @@ namespace oceanbase
namespace obrpc
{
enum {
INVALID_RPC_PKT_ID = -1
};
struct ObRpcReverseKeepaliveArg;
class ObPocServerHandleContext
{
public:
@ -80,6 +84,9 @@ private:
extern ObPocRpcServer global_poc_server;
extern ObListener* global_ob_listener;
void stream_rpc_register(const int64_t pkt_id, int64_t send_time_us);
void stream_rpc_unregister(const int64_t pkt_id);
int stream_rpc_reverse_probe(const ObRpcReverseKeepaliveArg& reverse_keepalive_arg);
int64_t get_max_rpc_packet_size();
extern "C" {
int dispatch_to_ob_listener(int accept_fd);

View File

@ -1073,6 +1073,7 @@ PCODE_DEF(OB_DETECT_RPC_CALL, 0x1595)
// session info diagnosis
// PCODE_DEF(OB_SESS_INFO_DIAGNOSIS, 0x1596)
PCODE_DEF(OB_RPC_REVERSE_KEEPALIVE, 0x1597)
// tenant snapshot
// create tenant snapshot

View File

@ -541,7 +541,7 @@ int ObRpcProcessorBase::part_response_error(rpc::ObRequest* req, const int retco
}
return ret;
}
int ObRpcProcessorBase::flush(int64_t wait_timeout)
int ObRpcProcessorBase::flush(int64_t wait_timeout, const ObAddr *src_addr)
{
int ret = OB_SUCCESS;
is_stream_ = true;
@ -549,7 +549,7 @@ int ObRpcProcessorBase::flush(int64_t wait_timeout)
UNIS_VERSION_GUARD(unis_version_);
const int64_t stream_rpc_max_wait_timeout = get_stream_rpc_max_wait_timeout(tenant_id_);
if (0 == wait_timeout || wait_timeout > stream_rpc_max_wait_timeout) {
if (0 == wait_timeout) {
wait_timeout = stream_rpc_max_wait_timeout;
}
@ -560,6 +560,8 @@ int ObRpcProcessorBase::flush(int64_t wait_timeout)
RPC_OBRPC_LOG(WARN, "allocate stream condition object fail", K(ret));
}
}
int64_t tenant_id = OB_INVALID_TENANT_ID;
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(rpc_pkt_) || is_stream_end_) {
ret = OB_ERR_UNEXPECTED;
@ -568,7 +570,7 @@ int ObRpcProcessorBase::flush(int64_t wait_timeout)
} else if (rpc_pkt_ && rpc_pkt_->is_stream_last()) {
ret = OB_ITER_END;
RPC_OBRPC_LOG(WARN, "stream is end", K(ret), K(*rpc_pkt_));
} else if (OB_FAIL(sc_->prepare())) {
} else if (OB_FAIL(sc_->prepare(src_addr, rpc_pkt_))) {
RPC_OBRPC_LOG(WARN, "prepare stream session fail", K(ret));
} else if (OB_FAIL(part_response(common::OB_SUCCESS, false))) {
RPC_OBRPC_LOG(WARN, "response part result to peer fail", K(ret));

View File

@ -108,7 +108,7 @@ protected:
virtual int deserialize();
virtual int serialize();
virtual int response(const int retcode) { return part_response(retcode, true); }
virtual int flush(int64_t wait_timeout = 0);
virtual int flush(int64_t wait_timeout = 0, const ObAddr *src_addr = NULL);
void set_preserve_recv_data() { preserve_recv_data_ = true; }
void set_result_compress_type(common::ObCompressorType t) { result_compress_type_ = t; }

View File

@ -35,6 +35,7 @@ using namespace oceanbase::rpc::frame;
ObAddr ObRpcProxy::myaddr_;
Handle::Handle()
: has_more_(false),
dst_(),
@ -42,7 +43,8 @@ Handle::Handle()
opts_(),
transport_(NULL),
proxy_(),
pcode_(OB_INVALID_RPC_CODE)
pcode_(OB_INVALID_RPC_CODE),
first_pkt_id_(INVALID_RPC_PKT_ID)
{}
int ObRpcProxy::init(const ObReqTransport *transport,
@ -333,7 +335,7 @@ int ObRpcProxy::create_request(
return transport.create_request(req, addr, size, timeout, local_addr, do_ratelimit, is_bg_flow, ob_ssl_invited_nodes, cb);
}
void ObRpcProxy::set_handle_attr(Handle* handle, const ObRpcPacketCode& pcode, const ObRpcOpts& opts, bool is_stream_next, int64_t session_id) {
void ObRpcProxy::set_handle_attr(Handle* handle, const ObRpcPacketCode& pcode, const ObRpcOpts& opts, bool is_stream_next, int64_t session_id, int64_t pkt_id, int64_t send_ts) {
if (handle) {
handle->pcode_ = pcode;
handle->opts_ = opts;
@ -344,5 +346,9 @@ void ObRpcProxy::set_handle_attr(Handle* handle, const ObRpcPacketCode& pcode, c
handle->do_ratelimit_ = do_ratelimit_;
handle->is_bg_flow_ = is_bg_flow_;
handle->transport_ = NULL;
if (is_stream_next) {
handle->first_pkt_id_ = pkt_id;
stream_rpc_register(pkt_id, send_ts);
}
}
}

View File

@ -153,7 +153,7 @@ public:
rcode_.warnings_.reset();
rcode_.warnings_ = retcode.warnings_;
}
void set_handle_attr(Handle* handle, const ObRpcPacketCode& pcode, const ObRpcOpts& opts, bool is_stream_next, int64_t session_id);
void set_handle_attr(Handle* handle, const ObRpcPacketCode& pcode, const ObRpcOpts& opts, bool is_stream_next, int64_t session_id, int64_t pkt_id, int64_t send_ts);
bool need_increment_request_level(int pcode) const {
return ((pcode > OB_SQL_PCODE_START && pcode < OB_SQL_PCODE_END)
@ -299,7 +299,7 @@ protected:
ObRpcPacketCode pcode_;
bool do_ratelimit_;
int8_t is_bg_flow_;
int64_t first_pkt_id_;
private:
DISALLOW_COPY_AND_ASSIGN(Handle);
};

View File

@ -101,6 +101,10 @@ int SSHandle<pcodeStruct>::get_more(typename pcodeStruct::Response &result)
} else {
has_more_ = resp_pkt.is_stream_next();
}
if (OB_FAIL(ret) || !has_more_) {
stream_rpc_unregister(first_pkt_id_);
first_pkt_id_ = INVALID_RPC_PKT_ID;
}
} else if (OB_FAIL(ObRpcProxy::create_request(pcode_, *transport_,
req, dst_, PAYLOAD_SIZE, proxy_.timeout(), opts_.local_addr_, do_ratelimit_,
@ -246,6 +250,10 @@ int SSHandle<pcodeStruct>::abort()
}
has_more_ = false;
}
if (first_pkt_id_ > 0) {
stream_rpc_unregister(first_pkt_id_);
first_pkt_id_ = INVALID_RPC_PKT_ID;
}
} else if (OB_FAIL(ObRpcProxy::create_request(pcode_, *transport_,
req, dst_, PAYLOAD_SIZE, proxy_.timeout(), opts_.local_addr_, do_ratelimit_,

View File

@ -0,0 +1,70 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_OBSERVER_RPC_REVERSE_KEEPALIVE_STRUCT_H
#define OCEANBASE_OBSERVER_RPC_REVERSE_KEEPALIVE_STRUCT_H
#include "lib/utility/ob_unify_serialize.h"
#include "lib/net/ob_addr.h"
#include "lib/utility/ob_print_utils.h"
namespace oceanbase
{
namespace obrpc
{
struct ObRpcReverseKeepaliveArg
{
OB_UNIS_VERSION(1);
public:
ObRpcReverseKeepaliveArg()
: dst_(), first_send_ts_(OB_INVALID_TIMESTAMP), pkt_id_(-1)
{}
ObRpcReverseKeepaliveArg(const ObAddr& dst, int64_t first_send_ts_, const int64_t pkt_id)
: dst_(dst), first_send_ts_(first_send_ts_), pkt_id_(pkt_id)
{}
~ObRpcReverseKeepaliveArg()
{}
bool is_valid() const
{
return dst_.is_valid() && pkt_id_ >= 0 && pkt_id_ <= INT32_MAX && first_send_ts_ > 0;
}
int assign(const ObRpcReverseKeepaliveArg &other);
TO_STRING_KV(K_(dst), K_(pkt_id), K_(first_send_ts));
ObAddr dst_;
int64_t first_send_ts_;
int64_t pkt_id_;
DISALLOW_COPY_AND_ASSIGN(ObRpcReverseKeepaliveArg);
};
struct ObRpcReverseKeepaliveResp
{
OB_UNIS_VERSION(1);
public:
ObRpcReverseKeepaliveResp() : ret_(OB_SUCCESS)
{}
~ObRpcReverseKeepaliveResp()
{}
int assign(const ObRpcReverseKeepaliveResp &other)
{
ret_ = other.ret_;
return OB_SUCCESS;
}
TO_STRING_KV(K_(ret));
int32_t ret_;
private:
DISALLOW_COPY_AND_ASSIGN(ObRpcReverseKeepaliveResp);
};
}; // end namespace obrpc
}; // end namespace oceanbase
#endif /* !OCEANBASE_OBSERVER_RPC_REVERSE_KEEPALIVE_STRUCT_H */

View File

@ -18,6 +18,8 @@
#include "lib/thread_local/ob_tsi_utils.h"
#include "rpc/ob_request.h"
#include "rpc/obrpc/ob_rpc_packet.h"
#include "rpc/obrpc/ob_poc_rpc_server.h"
#include "rpc/obrpc/ob_rpc_reverse_keepalive_struct.h"
using namespace oceanbase::common;
using namespace oceanbase::rpc;
@ -178,7 +180,8 @@ int ObRpcSessionHandler::destroy_session(int64_t sessid)
int ObRpcSessionHandler::wait_for_next_request(int64_t sessid,
ObRequest *&req,
const int64_t timeout)
const int64_t timeout,
const ObRpcReverseKeepaliveArg& reverse_keepalive_arg)
{
int ret = OB_SUCCESS;
int hash_ret = 0;
@ -208,6 +211,8 @@ int ObRpcSessionHandler::wait_for_next_request(int64_t sessid,
K(ret), K(sessid), K(thid), K(wait_object.thid_), K(ret));
} else if (NULL == wait_object.req_) {
int64_t timeout_ms = timeout / 1000;
int64_t abs_timeout_us = ObTimeUtility::current_time() + timeout;
int64_t keepalive_timeout_us = abs_timeout_us - timeout + MAX_WAIT_TIMEOUT_MS * 1000;
if (timeout_ms < DEFAULT_WAIT_TIMEOUT_MS) {
timeout_ms = DEFAULT_WAIT_TIMEOUT_MS;
} else {
@ -215,7 +220,8 @@ int ObRpcSessionHandler::wait_for_next_request(int64_t sessid,
}
//Waking up by IO thread
while (timeout_ms > 0) {
int64_t current_time_us = 0;
while ((current_time_us = ObTimeUtility::current_time()) < abs_timeout_us) {
// Here we don't care about result of wait because:
//
// 1. If it return success, it may be caused by spurious wakeup.
@ -232,14 +238,22 @@ int ObRpcSessionHandler::wait_for_next_request(int64_t sessid,
break;
} else if (OB_ISNULL(wait_object.req_)) {
LOG_DEBUG("the stream request hasn't come");
// when waiting for OB_REMOTE_EXECUTE/OB_REMOTE_SYNC_EXECUTE/OB_INNER_SQL_SYNC_TRANSMIT request more than 30s,
// try to send reverse keepalive request.
if (current_time_us >= keepalive_timeout_us
&& reverse_keepalive_arg.is_valid()
&& OB_FAIL(stream_rpc_reverse_probe(reverse_keepalive_arg))) {
LOG_WARN("stream rpc sender has been aborted, unneed to wait", K(sessid), K(timeout), K(reverse_keepalive_arg));
break;
}
} else {
req = wait_object.req_;
break;
}
timeout_ms -= DEFAULT_WAIT_TIMEOUT_MS;
timeout_ms = (abs_timeout_us - current_time_us) / 1000;
}
if (timeout_ms <= 0) {
if (current_time_us >= abs_timeout_us) {
// Stop or time out
req = NULL;
ret = OB_WAIT_NEXT_TIMEOUT; //WAIT_TIMEOUT;

View File

@ -30,6 +30,7 @@ class ObRequest;
namespace obrpc
{
struct ObRpcReverseKeepaliveArg;
class ObRpcSessionHandler
{
public:
@ -53,7 +54,8 @@ public:
*/
virtual int wait_for_next_request(int64_t session_id,
rpc::ObRequest *&req,
const int64_t timeout);
const int64_t timeout,
const ObRpcReverseKeepaliveArg& reverse_keepalive_arg);
bool wakeup_next_thread(rpc::ObRequest &req);

View File

@ -16,13 +16,16 @@
#include "lib/oblog/ob_log.h"
#include "rpc/obrpc/ob_rpc_session_handler.h"
#include "rpc/obrpc/ob_poc_rpc_server.h"
#include "rpc/obrpc/ob_rpc_reverse_keepalive_struct.h"
using namespace oceanbase::common;
using namespace oceanbase::rpc;
using namespace oceanbase::obrpc;
ObRpcStreamCond::ObRpcStreamCond(ObRpcSessionHandler &handler)
: sessid_(0), handler_(handler)
: sessid_(0), handler_(handler),
first_pkt_id_(INVALID_RPC_PKT_ID), first_send_ts_(OB_INVALID_TIMESTAMP), src_addr_()
{
}
@ -31,7 +34,7 @@ ObRpcStreamCond::~ObRpcStreamCond()
destroy();
}
int ObRpcStreamCond::prepare()
int ObRpcStreamCond::prepare(const ObAddr *src_addr, const ObRpcPacket *packet)
{
int ret = OB_SUCCESS;
if (0 == sessid_) {
@ -41,6 +44,10 @@ int ObRpcStreamCond::prepare()
if (OB_FAIL(handler_.prepare_for_next_request(sessid_))) {
LOG_WARN("preapre stream rpc fail", K(ret));
} else if (src_addr != NULL) {
src_addr_ = *src_addr;
first_pkt_id_ = packet->get_packet_id();
first_send_ts_ = packet->get_timestamp();
}
}
return ret;
@ -49,10 +56,11 @@ int ObRpcStreamCond::prepare()
int ObRpcStreamCond::wait(ObRequest *&req, int64_t timeout)
{
int ret = OB_SUCCESS;
ObRpcReverseKeepaliveArg reverse_keepalive_arg(src_addr_, first_send_ts_, first_pkt_id_);
if (sessid_ <= 0) {
LOG_WARN("sessid is invalied", K_(sessid));
} else if (OB_FAIL(handler_.wait_for_next_request(
sessid_, req, timeout))) {
sessid_, req, timeout, reverse_keepalive_arg))) {
LOG_WARN("wait for next request failed", K(ret), K_(sessid));
} else {
//do nothing

View File

@ -14,6 +14,7 @@
#define OCEANBASE_RPC_OBRPC_OB_RPC_STREAM_COND_
#include <stdint.h>
#include "lib/ob_define.h"
#include "lib/net/ob_addr.h"
namespace oceanbase
{
@ -32,7 +33,7 @@ public:
explicit ObRpcStreamCond(ObRpcSessionHandler &handle);
virtual ~ObRpcStreamCond();
virtual int prepare();
virtual int prepare(const ObAddr *src_addr, const ObRpcPacket *packet);
virtual int wait(rpc::ObRequest *&req, int64_t timeout);
virtual int wakeup(rpc::ObRequest &req);
virtual int destroy();
@ -43,6 +44,9 @@ public:
private:
int64_t sessid_;
ObRpcSessionHandler &handler_;
int64_t first_pkt_id_;
int64_t first_send_ts_;
ObAddr src_addr_;
private:
DISALLOW_COPY_AND_ASSIGN(ObRpcStreamCond);
}; // end of class ObRpcStreamCond

View File

@ -667,6 +667,18 @@ PN_API int pn_get_fd(uint64_t req_id)
return fd;
}
PN_API int64_t pn_get_pkt_id(uint64_t req_id)
{
int64_t pkt_id = -1;
pn_resp_ctx_t* ctx = (typeof(ctx))req_id;
if (unlikely(NULL == ctx)) {
rk_warn("invalid arguments, req_id=%p", ctx);
} else {
pkt_id = ctx->pkt_id;
}
return pkt_id;
}
void pn_print_diag_info(pn_comm_t* pn_comm) {
pn_t* pn = (pn_t*)pn_comm;
int64_t client_cnt = 0;

View File

@ -89,6 +89,7 @@ PN_API int dispatch_accept_fd_to_certain_group(int fd, uint64_t gid);
PN_API void pn_stop(uint64_t gid);
PN_API void pn_wait(uint64_t gid);
PN_API int pn_get_fd(uint64_t req_id);
PN_API int64_t pn_get_pkt_id(uint64_t req_id);
PN_API int pn_terminate_pkt(uint64_t gtid, uint32_t pkt_id);
extern int64_t pnio_keepalive_timeout;
pn_comm_t* get_current_pnio();

View File

@ -104,6 +104,7 @@ ob_set_subtarget(ob_server net
net/ob_ingress_bw_alloc_service.cpp
net/ob_net_endpoint_ingress_rpc_proccessor.cpp
net/ob_net_endpoint_ingress_rpc_struct.cpp
net/ob_rpc_reverse_keepalive.cpp
)
ob_set_subtarget(ob_server omt

View File

@ -0,0 +1,143 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "observer/net/ob_rpc_reverse_keepalive.h"
#include "rpc/obrpc/ob_rpc_reverse_keepalive_struct.h"
#define USING_LOG_PREFIX RPC_FRAME
namespace oceanbase
{
namespace obrpc
{
int ObRpcReverseKeepaliveArg::assign(const ObRpcReverseKeepaliveArg &other)
{
int ret = OB_SUCCESS;
dst_ = other.dst_;
first_send_ts_ = other.first_send_ts_;
pkt_id_ = other.pkt_id_;
return ret;
}
OB_SERIALIZE_MEMBER(ObRpcReverseKeepaliveArg, dst_, first_send_ts_, pkt_id_);
OB_SERIALIZE_MEMBER(ObRpcReverseKeepaliveResp, ret_);
ObRpcReverseKeepAliveService rpc_reverse_keepalive_instance;
int ObRpcReverseKeepAliveService::init(ObSrvRpcProxy *srv_rpc_proxy)
{
int ret = OB_SUCCESS;
rpc_proxy_ = srv_rpc_proxy;
if (OB_FAIL(rpc_pkt_id_map_.init("RpcKeepalive"))) {
LOG_ERROR("init rpc pkt_id map failed");
} else {
init_ = true;
}
return ret;
}
int ObRpcReverseKeepAliveService::destroy()
{
if (init_) {
rpc_pkt_id_map_.destroy();
}
return OB_SUCCESS;
}
int ObRpcReverseKeepAliveService::receiver_probe(const ObRpcReverseKeepaliveArg &arg)
{
int ret = OB_SUCCESS;
if OB_ISNULL(rpc_proxy_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("rpc reverse keepalive rpc proxy is null");
} else {
const int64_t rpc_timeout = GCONF.rpc_timeout;
ObAddr addr = arg.dst_;
int64_t tenant_id = ob_get_tenant_id();
ObRpcReverseKeepaliveResp res;
ret = rpc_proxy_->to(addr).by(tenant_id).as(OB_SYS_TENANT_ID).timeout(rpc_timeout).rpc_reverse_keepalive(arg, res);
if (OB_SUCC(ret)) {
ret = res.ret_;
}
if OB_FAIL(ret) {
LOG_WARN("send reverse keepalive failed", K(tenant_id), K(arg));
}
}
return ret;
}
int ObRpcReverseKeepAliveService::sender_register(const int64_t pkt_id, int64_t send_time_us)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!init_)) {
ret = OB_NOT_INIT;
LOG_WARN("rpc reverse keepalive service is not inited");
} else {
RpcPktID key(pkt_id);
if (OB_FAIL(rpc_pkt_id_map_.insert(key, send_time_us))) {
LOG_WARN("register pkt_id failed", K(key), K(send_time_us));
}
}
return ret;
}
int ObRpcReverseKeepAliveService::sender_unregister(const int64_t pkt_id)
{
int ret = OB_SUCCESS;
RpcPktID key(pkt_id);
if (OB_UNLIKELY(!init_)) {
ret = OB_NOT_INIT;
LOG_WARN("rpc reverse keepalive service is not inited", K(pkt_id));
} else if (OB_FAIL(rpc_pkt_id_map_.erase(key))) {
LOG_WARN("unregister pkt_id failed", K(pkt_id));
}
return ret;
}
int ObRpcReverseKeepAliveService::check_status(const int64_t send_time_us, const int64_t pkt_id)
{
int ret = OB_SUCCESS;
int64_t time_us = 0;
RpcPktID key(pkt_id);
if (OB_UNLIKELY(!init_)) {
ret = OB_NOT_INIT;
LOG_WARN("rpc reverse keepalive service is not inited", K(pkt_id));
} else if OB_FAIL(rpc_pkt_id_map_.get(key, time_us)) {
LOG_INFO("orignal rpc has released", K(ret), K(pkt_id));
} else if (time_us > send_time_us) {
ret = OB_HASH_NOT_EXIST;
LOG_WARN("send_ts check failed, client has been restarted and the pkt_id is reused", K(time_us), K(send_time_us), K(pkt_id));
}
return ret;
}
void stream_rpc_register(const int64_t pkt_id, int64_t send_time_us)
{
IGNORE_RETURN rpc_reverse_keepalive_instance.sender_register(pkt_id, send_time_us);
}
void stream_rpc_unregister(const int64_t pkt_id)
{
IGNORE_RETURN rpc_reverse_keepalive_instance.sender_unregister(pkt_id);
}
int stream_rpc_reverse_probe(const ObRpcReverseKeepaliveArg& reverse_keepalive_arg)
{
return rpc_reverse_keepalive_instance.receiver_probe(reverse_keepalive_arg);
}
}; // end namespace obrpc
namespace observer
{
int ObRpcReverseKeepaliveP::process()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!arg_.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("rpc reverse keepalive arg invalid", K(arg_));
} else {
result_.ret_ = obrpc::rpc_reverse_keepalive_instance.check_status(arg_.first_send_ts_, arg_.pkt_id_);
}
return ret;
}
}; // end of namespace observer
}; // end namespace oceanbase

View File

@ -0,0 +1,72 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_OBSERVER_RPC_REVERSE_KEEPALIVE_H
#define OCEANBASE_OBSERVER_RPC_REVERSE_KEEPALIVE_H
#include "observer/ob_rpc_processor_simple.h"
#include "lib/utility/ob_unify_serialize.h"
#include "lib/net/ob_addr.h"
#include "lib/utility/ob_print_utils.h"
#include "share/ob_srv_rpc_proxy.h"
namespace oceanbase
{
namespace obrpc
{
class ObRpcReverseKeepAliveService
{
public:
ObRpcReverseKeepAliveService() : init_(false), rpc_proxy_(NULL), map_attr_(), rpc_pkt_id_map_() {}
int init(ObSrvRpcProxy *srv_rpc_proxy);
int destroy();
// called from the RPC receiver that needs to probe
int receiver_probe(const ObRpcReverseKeepaliveArg& reverse_keepalive_arg);
int sender_register(const int64_t pkt_id, int64_t pcode);
int sender_unregister(const int64_t pkt_id);
int check_status(const int64_t send_ts, const int64_t pkt_id);
bool init_;
public:
struct RpcPktID
{
int64_t rpc_pkt_id_;
RpcPktID(const int64_t id) : rpc_pkt_id_(id) {}
int64_t hash() const
{
return rpc_pkt_id_;
}
int hash(uint64_t &hash_val) const
{
hash_val = hash();
return OB_SUCCESS;
}
bool operator== (const RpcPktID &other) const
{
return rpc_pkt_id_ == other.rpc_pkt_id_;
}
TO_STRING_KV(K_(rpc_pkt_id));
};
private:
ObSrvRpcProxy *rpc_proxy_;
ObMemAttr map_attr_;
common::ObLinearHashMap<RpcPktID, int64_t> rpc_pkt_id_map_;
};
extern ObRpcReverseKeepAliveService rpc_reverse_keepalive_instance;
}; // end namespace obrpc
namespace observer
{
OB_DEFINE_PROCESSOR_S(Srv, OB_RPC_REVERSE_KEEPALIVE, ObRpcReverseKeepaliveP);
}; // end of namespace observer
}; // end namespace oceanbase
#endif /* !OCEANBASE_OBSERVER_RPC_REVERSE_KEEPALIVE_H */

View File

@ -173,7 +173,7 @@ int ObInnerSqlRpcP::process_read(
if (OB_SUCC(ret)) {
if (need_flush) { // last row is not be used
if (OB_FAIL(obrpc::ObRpcProcessor< obrpc::ObInnerSQLRpcProxy::ObRpc<
obrpc::OB_INNER_SQL_SYNC_TRANSMIT> >::flush(THIS_WORKER.get_timeout_remain()))) {
obrpc::OB_INNER_SQL_SYNC_TRANSMIT> >::flush(THIS_WORKER.get_timeout_remain(), &arg_.get_ctrl_svr()))) {
LOG_WARN("fail to flush", K(ret));
} else {
LOG_DEBUG("flush scanner successfully", K(scanner), K(scanner.get_found_rows()));

View File

@ -22,6 +22,7 @@
#include "share/ob_rpc_share.h"
#include "observer/ob_server_struct.h"
#include "observer/ob_rpc_intrusion_detect.h"
#include "observer/net/ob_rpc_reverse_keepalive.h"
#include "storage/ob_locality_manager.h"
#include "lib/ssl/ob_ssl_config.h"
extern "C" {
@ -158,6 +159,8 @@ int ObSrvNetworkFrame::init()
LOG_ERROR("high prio rpc net register fail", K(ret));
} else if (OB_FAIL(ingress_service_.init(GCONF.cluster_id))) {
LOG_ERROR("endpoint ingress service init fail", K(ret));
} else if (OB_FAIL(rpc_reverse_keepalive_instance.init(GCTX.srv_rpc_proxy_))) {
LOG_ERROR("rpc reverse keepalive instance init fail", K(ret));
} else if (OB_FAIL(net_.add_rpc_unix_listen(rpc_unix_path, rpc_handler_))) {
LOG_ERROR("listen rpc unix path fail");
} else {
@ -179,6 +182,7 @@ void ObSrvNetworkFrame::destroy()
net_.destroy();
ObNetKeepAlive::get_instance().destroy();
ingress_service_.destroy();
rpc_reverse_keepalive_instance.destroy();
if (NULL != obmysql::global_sql_nio_server) {
obmysql::global_sql_nio_server->destroy();
}

View File

@ -45,6 +45,7 @@
#include "observer/table/ob_table_batch_execute_processor.h"
#include "observer/table/ob_table_query_processor.h"
#include "observer/table/ob_table_query_and_mutate_processor.h"
#include "observer/net/ob_rpc_reverse_keepalive.h"
#include "observer/dbms_job/ob_dbms_job_rpc_processor.h"
#include "storage/tx_storage/ob_tenant_freezer_rpc.h"
@ -118,6 +119,7 @@ void oceanbase::observer::init_srv_xlator_for_sys(ObSrvRpcXlator *xlator) {
RPC_PROCESSOR(ObRpcRunDBMSSchedJobP, gctx_);
RPC_PROCESSOR(ObRpcGetServerResourceInfoP, gctx_);
RPC_PROCESSOR(ObRpcReverseKeepaliveP, gctx_);
}
void oceanbase::observer::init_srv_xlator_for_schema_test(ObSrvRpcXlator *xlator) {

View File

@ -22,6 +22,7 @@
#include "observer/net/ob_net_endpoint_ingress_rpc_struct.h"
#include "share/ob_heartbeat_struct.h"
#include "observer/table_load/control/ob_table_load_control_rpc_struct.h"
#include "rpc/obrpc/ob_rpc_reverse_keepalive_struct.h"
namespace oceanbase
{
@ -255,6 +256,7 @@ public:
RPC_S(PR5 cancel_gather_stats, OB_CANCEL_GATHER_STATS, (ObCancelGatherStatsArg));
RPC_S(PR5 force_set_tenant_log_disk, OB_LOG_FORCE_SET_TENANT_LOG_DISK, (obrpc::ObForceSetTenantLogDiskArg));
RPC_S(PR5 dump_server_usage, OB_FORCE_DUMP_SERVER_USAGE, (obrpc::ObDumpServerUsageRequest), obrpc::ObDumpServerUsageResult);
RPC_S(PR5 rpc_reverse_keepalive, OB_RPC_REVERSE_KEEPALIVE, (obrpc::ObRpcReverseKeepaliveArg), obrpc::ObRpcReverseKeepaliveResp);
}; // end of class ObSrvRpcProxy
} // end of namespace rpc

View File

@ -308,7 +308,7 @@ int ObRemoteBaseExecuteP<T>::sync_send_result(ObExecContext &exec_ctx,
} else {
has_send_result_ = true;
// override error code
if (OB_FAIL(ObRpcProcessor<T>::flush(THIS_WORKER.get_timeout_remain()))) {
if (OB_FAIL(ObRpcProcessor<T>::flush(THIS_WORKER.get_timeout_remain(), &ObRpcProcessor<T>::arg_.get_ctrl_server()))) {
LOG_WARN("fail to flush", K(ret));
} else {
// 超过1个scanner的情况,每次发送都打印一条日志

View File

@ -1317,7 +1317,7 @@ int ObStorageStreamRpcP<RPC_CODE>::flush_and_wait()
if (OB_FAIL(this->check_timeout())) {
LOG_WARN("rpc is timeout, no need flush", K(ret));
} else if (OB_FAIL(this->flush(OB_DEFAULT_STREAM_WAIT_TIMEOUT))) {
} else if (OB_FAIL(this->flush())) {
STORAGE_LOG(WARN, "failed to flush", K(ret));
} else {
this->result_.get_position() = 0;

View File

@ -5,6 +5,7 @@ storage_unittest(test_create_executor table/test_create_executor.cpp)
storage_unittest(test_table_aggregation table/test_table_aggregation.cpp)
storage_unittest(test_table_sess_pool table/test_table_sess_pool.cpp)
storage_unittest(test_ingress_bw_alloc_manager net/test_ingress_bw_alloc_manager.cpp)
storage_unittest(test_rpc_reverse_keepalive net/test_rpc_reverse_keepalive.cpp)
ob_unittest(test_obkv_config tableapi/test_obkv_config.cpp)
ob_unittest(test_uniq_task_queue)
ob_unittest(test_table_connection tableapi/test_table_connection.cpp)

View File

@ -0,0 +1,160 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SERVER
#include <gtest/gtest.h>
#include <gmock/gmock.h>
#include "observer/net/ob_rpc_reverse_keepalive.h"
#include "rpc/obrpc/ob_rpc_reverse_keepalive_struct.h"
#include "rpc/frame/ob_req_deliver.h"
#include "lib/net/ob_net_util.h"
#define private public
using namespace oceanbase::rpc::frame;
namespace oceanbase
{
namespace obrpc
{
class TestRpcReverseKeepAliveService : public testing::Test
{
public:
TestRpcReverseKeepAliveService()
{}
virtual ~TestRpcReverseKeepAliveService()
{}
static int find_port(int start_port = 20000)
{
int sock_fd = -1;
int port = -1;
if ((sock_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
// failed to create socket
} else {
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
for (port = start_port; port <= 65535; ++port) {
serv_addr.sin_port = htons(port);
serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(sock_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) == 0) {
close(sock_fd);
break;
}
}
close(sock_fd);
}
return port;
}
};
class ObTestRpcQHandler
: public rpc::frame::ObiReqQHandler
{
public:
ObTestRpcQHandler() {}
int onThreadCreated(obsys::CThread *) override final { return OB_SUCCESS; };
int onThreadDestroy(obsys::CThread *) override final { return OB_SUCCESS; };
bool handlePacketQueue(rpc::ObRequest *req, void *args) override final
{
ObIAllocator &alloc = THIS_WORKER.get_sql_arena_allocator();
const observer::ObGlobalContext gctx;
ObReqProcessor *processor = NULL;
ObRpcPacketCode pcode = reinterpret_cast<const obrpc::ObRpcPacket &>(req->get_packet()).get_pcode();
if (OB_RPC_REVERSE_KEEPALIVE == pcode) {
processor = OB_NEWx(observer::ObRpcReverseKeepaliveP, &alloc, gctx);
}
if (OB_NOT_NULL(processor)) {
processor->init();
processor->set_ob_request(*req);
processor->run();
processor->~ObReqProcessor();
THIS_WORKER.get_sql_arena_allocator().free(processor);
processor = NULL;
}
return true;
}
};
class ObTestRpcDeliver
: public rpc::frame::ObReqQDeliver
{
public:
ObTestRpcDeliver() : ObReqQDeliver(qhandler_) {};
int deliver(rpc::ObRequest &req) override final
{
int ret = OB_SUCCESS;
LOG_INFO("deliver rpc request", K(req));
qhandler_.handlePacketQueue(&req, NULL);
return ret;
}
int init() {return OB_SUCCESS;}
void stop() override final
{};
ObTestRpcQHandler qhandler_;
};
bool enable_pkt_nio(bool start_as_client) {
return true;
}
TEST_F(TestRpcReverseKeepAliveService, reverse_keepalive_service)
{
// init
int ret = common::OB_SUCCESS;
obrpc::ObSrvRpcProxy srv_rpc_proxy;
ObReqTransport dummy_transport(NULL, NULL);
srv_rpc_proxy.init(&dummy_transport);
ObTestRpcDeliver deliver;
int port = find_port();
ObAddr dst;
uint32_t ip_value = 0;
if (OB_FAIL(obsys::ObNetUtil::get_local_addr_ipv4("eth0", ip_value))
&& OB_FAIL(obsys::ObNetUtil::get_local_addr_ipv4("bond0", ip_value))) {
dst.set_ip_addr("127.0.0.1", port);
} else {
ip_value = ntohl(ip_value);
dst.set_ipv4_addr(ip_value, port);
LOG_INFO("get local ip", K(dst));
}
// test
ret = rpc_reverse_keepalive_instance.init(&srv_rpc_proxy);
ASSERT_EQ(ret, OB_SUCCESS);
ret = global_poc_server.start(port, 1, &deliver);
ASSERT_EQ(ret, OB_SUCCESS);
uint32_t pkt_id = 1;
stream_rpc_register(pkt_id, ObTimeUtility::current_time());
ObRpcReverseKeepaliveArg a(dst, ObTimeUtility::current_time(), pkt_id);
ret = stream_rpc_reverse_probe(a);
ASSERT_EQ(ret, OB_SUCCESS);
stream_rpc_unregister(pkt_id);
ret = stream_rpc_reverse_probe(a);
ASSERT_EQ(ret, OB_ENTRY_NOT_EXIST);
pkt_id = 2;
int64_t send_time_us = ObTimeUtility::current_time();
stream_rpc_register(pkt_id, send_time_us + 100000);
ObRpcReverseKeepaliveArg a2(dst, send_time_us, pkt_id);
ret = stream_rpc_reverse_probe(a2);
ASSERT_EQ(ret, OB_HASH_NOT_EXIST);
rpc_reverse_keepalive_instance.destroy();
}
} // end namespace obrpc
} // end namespace oceanbase
int main(int argc, char **argv)
{
OB_LOGGER.set_file_name("test_reverse_keepalive.log", true);
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}