[server net keepalive] sync serialization size
This commit is contained in:
80
deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp
vendored
80
deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp
vendored
@ -28,10 +28,13 @@
|
||||
#include "lib/utility/ob_defer.h"
|
||||
#include "lib/thread/ob_thread_name.h"
|
||||
#include "lib/time/ob_time_utility.h"
|
||||
#include "lib/utility/serialization.h"
|
||||
#include "lib/utility/utility.h"
|
||||
#include "rpc/frame/ob_net_easy.h"
|
||||
#include "io/easy_negotiation.h"
|
||||
|
||||
using namespace oceanbase::common;
|
||||
using namespace oceanbase::common::serialization;
|
||||
using namespace oceanbase::lib;
|
||||
using namespace oceanbase::rpc::frame;
|
||||
namespace oceanbase
|
||||
@ -44,6 +47,43 @@ namespace obrpc
|
||||
#define WINDOW_MAX_FAILS 4 // 4 times
|
||||
#define MAX_CREDIBLE_WINDOW 10 * 1000 * 1000 // 10s
|
||||
|
||||
constexpr int32_t KP_MAGIC = 0x2c15c364;
|
||||
struct Header
|
||||
{
|
||||
public:
|
||||
Header(int32_t data_len = 0)
|
||||
: magic_(KP_MAGIC), data_len_(data_len) {}
|
||||
int encode(char *buf, const int64_t buf_len, int64_t &pos)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(encode_i32(buf, buf_len, pos, magic_))) {
|
||||
_LOG_WARN("encode magic failed, ret: %d, pos: %ld", ret, pos);
|
||||
} else if (OB_FAIL(encode_i32(buf, buf_len, pos, data_len_))) {
|
||||
_LOG_WARN("encode data len failed, ret: %d, pos: %ld", ret, pos);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int decode(const char *buf, const int64_t buf_len, int64_t &pos)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(decode_i32(buf, buf_len, pos, &magic_))) {
|
||||
_LOG_WARN("decode magic failed, ret: %d, pos: %ld", ret, pos);
|
||||
} else if (magic_ != KP_MAGIC) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
_LOG_WARN("unexpected magic, magic: %d", magic_);
|
||||
} else if (OB_FAIL(decode_i32(buf, buf_len, pos, &data_len_))) {
|
||||
_LOG_WARN("decode data len failed, ret: %d, pos: %ld", ret, pos);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int32_t get_encoded_size() const
|
||||
{
|
||||
return encoded_length_i32(magic_) + encoded_length_i32(data_len_);
|
||||
}
|
||||
int32_t magic_;
|
||||
int32_t data_len_;
|
||||
};
|
||||
|
||||
enum {
|
||||
UNCONNECT = 0,
|
||||
CONNECTING,
|
||||
@ -333,6 +373,7 @@ void ObNetKeepAlive::do_server_loop()
|
||||
for (int i = 0; i < cnt; i++) {
|
||||
struct server *s = (struct server *)events[i].data.ptr;
|
||||
int ev_fd = NULL == s? pipefd_ : s->fd_;
|
||||
bool need_disconn = false;
|
||||
if (NULL == s) {
|
||||
struct server *s = (struct server *)ob_malloc(sizeof(struct server), "KeepAliveServer");
|
||||
if (NULL == s) {
|
||||
@ -383,10 +424,26 @@ void ObNetKeepAlive::do_server_loop()
|
||||
char data = PROTOCOL_DATA;
|
||||
while ((n = read(ev_fd, &data, sizeof data)) < 0 && errno == EINTR);
|
||||
if (n <= 0) break;
|
||||
while ((n = write(ev_fd, &PROTOCOL_DATA, sizeof PROTOCOL_DATA)) < 0 && errno == EINTR);
|
||||
char buf[128];
|
||||
const int64_t buf_len = sizeof buf;
|
||||
int32_t data_len = 1;
|
||||
Header header(data_len);
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
int64_t pos = 0;
|
||||
if (OB_SUCCESS != (tmp_ret = header.encode(buf, buf_len, pos))) {
|
||||
_LOG_WARN("encode failed, ret: %d, pos: %ld", tmp_ret, pos);
|
||||
} else if (FALSE_IT(pos += data_len)/*TODO: encode data*/) {
|
||||
_LOG_WARN("encode data failed: %d", data_len);
|
||||
} else {
|
||||
while ((n = write(ev_fd, buf, pos)) < 0 && errno == EINTR);
|
||||
need_disconn = n < pos;
|
||||
}
|
||||
}
|
||||
if (events[i].events & (EPOLLRDHUP | EPOLLHUP)) {
|
||||
}
|
||||
if (!need_disconn) {
|
||||
need_disconn = events[i].events & (EPOLLRDHUP | EPOLLHUP);
|
||||
}
|
||||
if (need_disconn) {
|
||||
_LOG_INFO("server connection closed, fd: %d, addr: %s", ev_fd, NULL == s? "" : addr_to_string(s->cli_addr_));
|
||||
epoll_ctl(epfd, EPOLL_CTL_DEL, ev_fd, NULL);
|
||||
close(ev_fd);
|
||||
@ -464,9 +521,24 @@ void ObNetKeepAlive::do_client_loop()
|
||||
_LOG_DEBUG("update read ts, addr: %s, fd: %d, ts: %ld", addr_to_string(rs->svr_addr_), ev_fd, rs->last_read_ts_);
|
||||
for (;;) {
|
||||
ssize_t n = -1;
|
||||
char data = PROTOCOL_DATA;
|
||||
while ((n = read(ev_fd, &data, sizeof data)) < 0 && errno == EINTR);
|
||||
char buf[128];
|
||||
Header header;
|
||||
int32_t read_len = header.get_encoded_size();
|
||||
while ((n = read(ev_fd, buf, read_len)) < 0 && errno == EINTR);
|
||||
if (n <= 0) break;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
int64_t pos = 0;
|
||||
if (OB_SUCCESS != (tmp_ret = header.decode(buf, read_len, pos))) {
|
||||
_LOG_WARN("decode failed, ret: %d, pos: %ld", tmp_ret, pos);
|
||||
} else {
|
||||
char data[512]; // TODO
|
||||
if (header.data_len_ > sizeof data) {
|
||||
tmp_ret = OB_BUF_NOT_ENOUGH;
|
||||
_LOG_WARN("data buf not enough: %d", header.data_len_);
|
||||
} else {
|
||||
while ((n = read(ev_fd, data, header.data_len_)) < 0 && errno == EINTR);
|
||||
}
|
||||
}
|
||||
do_rpin(client2rs(c));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user