diff --git a/deps/easy/src/io/easy_connection.c b/deps/easy/src/io/easy_connection.c index b4ed51bc4b..9b6a49ee93 100644 --- a/deps/easy/src/io/easy_connection.c +++ b/deps/easy/src/io/easy_connection.c @@ -2096,6 +2096,7 @@ error_exit: EASY_CONNECTION_DESTROY(c, "on_writable"); } +extern const char* trace_id_to_str_c(const uint64_t *uval); /** * 对timeout的处理message */ @@ -2111,9 +2112,9 @@ static void easy_connection_on_timeout_session(struct ev_loop *loop, ev_timer *w EASY_TIME_GUARD(); if ((now != (int)ev_now(loop)) && (s->error == 0)) { easy_info_log("Session has timed out, session(%p), time(%fs), packet_id(%" PRIu64 ")," - " pcode(%d), trace_id(" OB_TRACE_ID_FORMAT "), conn(%s).", + " pcode(%d), trace_id(%s), conn(%s).", s, ev_now(loop) - s->now, s->packet_id, - s->r.pcode, s->r.trace_id[0], s->r.trace_id[1], easy_connection_str(c)); + s->r.pcode, trace_id_to_str_c(&s->r.trace_id[0]), easy_connection_str(c)); now = (int)ev_now(loop); } @@ -2169,9 +2170,9 @@ int easy_connection_dump_request(easy_connection_t *conn, easy_request_t *r, dou if (r->protocol == EASY_REQQUEST_TYPE_RPC) { easy_warn_log("easy_reqeust hold by upper-layer for too much time. " "req(%p), timeout_warn_count(%lu), protocol(RPC), pcode(%d), time(%lf), packet_id(%lu), " - "trace_id(" OB_TRACE_ID_FORMAT "), trace_point(%d), bt(%s).", + "trace_id(%s), trace_point(%d), bt(%s).", r, r->timeout_warn_count, r->pcode, hold_time, r->packet_id, - r->trace_id[0], r->trace_id[1], r->trace_point, r->trace_bt); + trace_id_to_str_c(&r->trace_id[0]), r->trace_point, r->trace_bt); } else if (r->protocol == EASY_REQQUEST_TYPE_SQL) { easy_warn_log("easy_reqeust hold by upper-layer for too much time. " "req(%p), timeout_warn_count(%lu), protocol(SQL), time(%lf), session_id(%ld), " diff --git a/deps/easy/src/io/easy_io_struct.h b/deps/easy/src/io/easy_io_struct.h index 66208625d9..c430ada21a 100644 --- a/deps/easy/src/io/easy_io_struct.h +++ b/deps/easy/src/io/easy_io_struct.h @@ -131,8 +131,6 @@ enum { EASY_REQQUEST_TYPE_SQL = 1, }; -#define OB_TRACE_ID_FORMAT "Y%lX-%016lX" - // async + spinlock #define EASY_BASETH_DEFINE \ easy_baseth_on_start_pt *on_start; \ diff --git a/deps/easy/src/io/easy_socket.c b/deps/easy/src/io/easy_socket.c index db9574b546..05d41fd918 100644 --- a/deps/easy/src/io/easy_socket.c +++ b/deps/easy/src/io/easy_socket.c @@ -169,7 +169,7 @@ char* easy_socket_err_reason(int error_no) case ENETUNREACH: /* 101 - Network is unreachable */ case EHOSTUNREACH: - /* 113 - No route to host */ + /* 113 - No route to host */ default: err_reason = ""; break; @@ -184,7 +184,7 @@ char* easy_socket_err_reason(int error_no) */ ssize_t easy_socket_read(easy_connection_t *conn, char *buf, size_t size, int *pending) { - ssize_t n; + ssize_t n; EASY_SOCKET_IO_TIME_GUARD(ev_read_count, ev_read_time, size); do { diff --git a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.cpp b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.cpp index b18165ea85..c24e85c94f 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.cpp +++ b/deps/oblib/src/lib/mysqlclient/ob_mysql_connection.cpp @@ -106,7 +106,7 @@ int ObMySQLConnection::connect(const char *user, const char *pass, const char *d bool read_write_no_timeout /*false*/, int64_t sql_req_level /*0*/) { int ret = OB_SUCCESS; - const static int MAX_IP_BUFFER_LEN = 32; + const static int MAX_IP_BUFFER_LEN = common::OB_IP_STR_BUFF; char host[MAX_IP_BUFFER_LEN]; host[0] = '\0'; // if db is NULL, the default database is used. @@ -180,7 +180,7 @@ int ObMySQLConnection::connect(const char *user, const char *pass, const char *d bool read_write_no_timeout /*false*/, int64_t sql_req_level /*0*/) { int ret = OB_SUCCESS; - const static int MAX_IP_BUFFER_LEN = 32; + const static int MAX_IP_BUFFER_LEN = common::OB_IP_STR_BUFF; char host[MAX_IP_BUFFER_LEN]; host[0] = '\0'; // if db is NULL, the default database is used. diff --git a/deps/oblib/src/lib/net/ob_addr.cpp b/deps/oblib/src/lib/net/ob_addr.cpp index f07bf2485b..fd4aabfe3e 100644 --- a/deps/oblib/src/lib/net/ob_addr.cpp +++ b/deps/oblib/src/lib/net/ob_addr.cpp @@ -45,48 +45,31 @@ ObAddr::ObAddr(const easy_addr_t& addr) port_ = addr.port; } -ObAddr::ObAddr(const sockaddr &addr) +void ObAddr::from_sockaddr(struct sockaddr_storage *sock_addr) { - if (AF_INET == addr.sa_family) { - version_ = IPV4; - const sockaddr_in &addr_in = *static_cast(static_cast(&addr)); - ip_.v4_ = ntohl(addr_in.sin_addr.s_addr); - port_ = ntohs(addr_in.sin_port); - } else if (AF_INET6 == addr.sa_family) { - version_ = IPV6; - const sockaddr_in6 &addr_in6 = *static_cast(static_cast(&addr)); - MEMCPY(ip_.v6_, &addr_in6.sin6_addr, sizeof(ip_.v6_)); - port_ = ntohs(addr_in6.sin6_port); - } else if (AF_UNIX == addr.sa_family) { - version_ = UNIX; - const sockaddr_un &addr_un = *static_cast(static_cast(&addr)); - MEMCPY(ip_.unix_path_, addr_un.sun_path, sizeof(ip_.unix_path_)); - port_ = 0; - } else { - port_ = 0; + reset(); + bool is_ipv6 = false; + oceanbase::obsys::ObNetUtil::sockaddr_to_addr(sock_addr, is_ipv6, &ip_, port_); + version_ = !is_ipv6 ? IPV4 : IPV6; +} + +struct sockaddr_storage *ObAddr::to_sockaddr(struct sockaddr_storage *sock_addr) const +{ + sockaddr_storage *sret = NULL; + if (IPV4 == version_ || IPV6 == version_) { + sret = oceanbase::obsys::ObNetUtil::make_unix_sockaddr(IPV6 == version_, &ip_, port_, sock_addr); } + return sret; } int ObAddr::convert_ipv4_addr(const char *ip) { int ret = OB_SUCCESS; - in_addr in; - if (OB_ISNULL(ip)) { - // null ptr + bool is_ipv6 = false; + if (!oceanbase::obsys::ObNetUtil::straddr_to_addr(ip, is_ipv6, &ip_)) { ret = OB_INVALID_ARGUMENT; - } else if('\0' == *ip) { - // empty ip - ip_.v4_ = 0; } else { - MEMSET(&in, 0, sizeof (in)); - int rt = inet_pton(AF_INET, ip, &in); - if (rt != 1) { // wrong ip or error - in.s_addr = 0; - ret = OB_ERR_UNEXPECTED; - } else { - ret = OB_SUCCESS; - } - ip_.v4_ = ntohl(in.s_addr); + reset_v4_extraneous(); } return ret; } @@ -94,18 +77,9 @@ int ObAddr::convert_ipv4_addr(const char *ip) int ObAddr::convert_ipv6_addr(const char *ip) { int ret = OB_SUCCESS; - in6_addr in6; - - if (!OB_ISNULL(ip)) { - MEMSET(&in6, 0, sizeof (in6)); - ret = inet_pton(AF_INET6, ip, &in6); - if (ret != 1) { // wrong ip or error - ret = OB_ERR_UNEXPECTED; - memset(&in6, 0, sizeof (in6)); - } else { - ret = OB_SUCCESS; - } - MEMCPY(ip_.v6_, in6.s6_addr, sizeof(ip_.v6_)); + bool is_ipv6 = false; + if (!oceanbase::obsys::ObNetUtil::straddr_to_addr(ip, is_ipv6, &ip_)) { + ret = OB_INVALID_ARGUMENT; } return ret; } @@ -113,7 +87,7 @@ int ObAddr::convert_ipv6_addr(const char *ip) int ObAddr::parse_from_string(const ObString &str) { int ret = OB_SUCCESS; - char buf[MAX_IP_ADDR_LENGTH] = ""; + char buf[MAX_IP_PORT_LENGTH] = ""; int port = 0; if (str.ptr() != NULL) { @@ -136,19 +110,12 @@ int ObAddr::parse_from_string(const ObString &str) } if (OB_SUCC(ret)) { - if ('[' != buf[0]) { // IPV4 format - if (false == set_ipv4_addr(buf, port)) - { - ret = OB_INVALID_ARGUMENT; - } - } else { // IPV6 format - const char *ipv6 = buf + 1; - if (']' == buf[strlen(buf) - 1]) { - buf[strlen(buf) - 1] = '\0'; - IGNORE_RETURN set_ipv6_addr(ipv6, port); - } else { - ret = OB_INVALID_ARGUMENT; - } + char *p = buf; + if ('[' == buf[0]) p = buf + 1; + if (']' == buf[strlen(buf) - 1]) buf[strlen(buf) - 1] = '\0'; + if (NULL == strchr(p, ':') ? + !set_ipv4_addr(p, port) : !set_ipv6_addr(p, port)) { + ret = OB_INVALID_ARGUMENT; } } return ret; @@ -348,7 +315,7 @@ int ObAddr::to_yson(char *buf, const int64_t buf_len, int64_t &pos) const OB_ID(ip), ip_.v4_, OB_ID(port), port_); } else if (IPV6 == version_) { - char ip[MAX_IP_ADDR_LENGTH + 1] = { '\0' }; + char ip[MAX_IP_PORT_LENGTH + 1] = { '\0' }; if (!ip_to_string(ip, sizeof(ip))) { ret = OB_ERR_UNEXPECTED; } else { @@ -407,8 +374,8 @@ bool ObAddr::set_ipv4_addr(const char *ip, const int32_t port) bool ObAddr::set_ip_addr(const ObString &ip, const int32_t port) { bool ret = true; - char ip_buf[MAX_IP_ADDR_LENGTH] = ""; - if (ip.length() >= MAX_IP_ADDR_LENGTH) { + char ip_buf[MAX_IP_PORT_LENGTH] = ""; + if (ip.length() >= MAX_IP_PORT_LENGTH) { ret = false; } else { // ObString may be not terminated by '\0' @@ -435,6 +402,7 @@ void ObAddr::set_ipv4_server_id(const int64_t ipv4_server_id) { version_ = IPV4; ip_.v4_ = static_cast(0x00000000ffffffff & (ipv4_server_id >> 32)); + reset_v4_extraneous(); port_ = static_cast(0x00000000ffffffff & ipv4_server_id); } @@ -474,34 +442,12 @@ ObAddr &ObAddr::as_subnet(const ObAddr &mask) bool ObAddr::operator <(const ObAddr &rv) const { - int64_t ipcmp = 0; - if (version_ != rv.version_) { - LOG_ERROR_RET(common::OB_NOT_SUPPORTED, "comparision between different IP versions hasn't supported!"); - } else if (IPV4 == version_) { - ipcmp = static_cast(ip_.v4_) - static_cast(rv.ip_.v4_); - } else if (IPV6 == version_) { - int pos = 0; - for (; ipcmp == 0 && pos < IPV6_LEN; pos++) { - ipcmp = ip_.v6_[pos] - rv.ip_.v6_[pos]; - } - } - return (ipcmp < 0) || (0 == ipcmp && port_ < rv.port_); + return compare_refactored(rv) < 0; } bool ObAddr::operator >(const ObAddr &rv) const { - int64_t ipcmp = 0; - if (version_ != rv.version_) { - LOG_ERROR_RET(common::OB_NOT_SUPPORTED, "comparision between different IP versions hasn't supported!"); - } else if (IPV4 == version_) { - ipcmp = static_cast(ip_.v4_) - static_cast(rv.ip_.v4_); - } else if (IPV6 == version_) { - int pos = 0; - for (; ipcmp == 0 && pos < IPV6_LEN; pos++) { - ipcmp = ip_.v6_[pos] - rv.ip_.v6_[pos]; - } - } - return (ipcmp > 0) || (0 == ipcmp && port_ > rv.port_); + return compare_refactored(rv) > 0; } bool ObAddr::is_equal_except_port(const ObAddr &rv) const diff --git a/deps/oblib/src/lib/net/ob_addr.h b/deps/oblib/src/lib/net/ob_addr.h index 171481f647..2861269956 100644 --- a/deps/oblib/src/lib/net/ob_addr.h +++ b/deps/oblib/src/lib/net/ob_addr.h @@ -74,10 +74,12 @@ public: explicit ObAddr(const easy_addr_t& addr); - explicit ObAddr(const sockaddr &addr); + void from_sockaddr(struct sockaddr_storage *sock_addr); + struct sockaddr_storage *to_sockaddr(struct sockaddr_storage *sock_addr) const; void reset() { + reset_v4_extraneous(); port_ = 0; //memset(&ip_, 0, sizeof (ip_)); } @@ -129,8 +131,16 @@ private: friend class oceanbase::obrpc::ObBatchP; int64_t get_ipv4_server_id() const; void set_ipv4_server_id(const int64_t ipv4_server_id); + int compare_refactored(const ObAddr &rv) const; private: + void reset_v4_extraneous() + { + uint32_t *p = &ip_.v4_; + *++p = 0; + *++p = 0; + *++p = 0; + } int convert_ipv4_addr(const char *ip); int convert_ipv6_addr(const char *ip); bool set_ipv4_addr(const char *ip, const int32_t port); @@ -212,9 +222,31 @@ inline bool ObAddr::operator !=(const ObAddr &rv) const inline bool ObAddr::operator ==(const ObAddr &rv) const { - return version_ == rv.version_ && port_ == rv.port_ && (0 == memcmp(&ip_, &rv.ip_, sizeof(ip_))); + return 0 == compare_refactored(rv); } +inline int ObAddr::compare_refactored(const ObAddr &rv) const +{ + int64_t ipcmp = 0; + if (this == &rv) { + ipcmp = 0; + } else if (version_ != rv.version_) { + ipcmp = version_ - rv.version_; + } else if (IPV4 == version_) { + ipcmp = static_cast(ip_.v4_) - static_cast(rv.ip_.v4_); + } else if (IPV6 == version_) { + int pos = 0; + for (; 0 == ipcmp && pos < IPV6_LEN; pos++) { + ipcmp = ip_.v6_[pos] - rv.ip_.v6_[pos]; + } + } + if (0 == ipcmp) { + ipcmp = port_ - rv.port_; + } + return 0 == ipcmp ? 0 : (ipcmp > 0 ? 1 : -1);; +} + +// forward compatible inline int ObAddr::compare(const ObAddr &rv) const { int compare_ret = 0; diff --git a/deps/oblib/src/lib/net/ob_net_util.cpp b/deps/oblib/src/lib/net/ob_net_util.cpp index b8112cd64c..1170a8e4fa 100644 --- a/deps/oblib/src/lib/net/ob_net_util.cpp +++ b/deps/oblib/src/lib/net/ob_net_util.cpp @@ -156,6 +156,155 @@ uint64_t ObNetUtil::ip_to_addr(uint32_t ip, int port) return ipport; } +struct sockaddr_storage* ObNetUtil::make_unix_sockaddr_any(bool is_ipv6, + int port, + struct sockaddr_storage *sock_addr) +{ + uint32_t addr_v4 = INADDR_ANY; + in6_addr addr_v6 = in6addr_any; + return make_unix_sockaddr(is_ipv6, !is_ipv6 ? (void*)&addr_v4 : (void*)&addr_v6, port, sock_addr); +} + +struct sockaddr_storage* ObNetUtil::make_unix_sockaddr(bool is_ipv6, const void *ip, int port, + struct sockaddr_storage *sock_addr) +{ + if (!is_ipv6) { + struct sockaddr_in *sin = (struct sockaddr_in *)sock_addr; + sin->sin_port = (uint16_t)htons((uint16_t)port); + sin->sin_addr.s_addr = htonl(*(uint32_t*)ip); + sin->sin_family = AF_INET; + } else { + struct sockaddr_in6 *sin = (struct sockaddr_in6 *)sock_addr; + sin->sin6_port = (uint16_t)htons((uint16_t)port); + memcpy(&sin->sin6_addr.s6_addr, ip, sizeof(in6_addr::s6_addr)); + sin->sin6_family = AF_INET6; + } + return sock_addr; +} + +void ObNetUtil::sockaddr_to_addr(struct sockaddr_storage *sock_addr_s, bool &is_ipv6, void *ip, int &port) +{ + struct sockaddr *sock_addr = (typeof(sock_addr))sock_addr_s; + is_ipv6 = AF_INET6 == sock_addr->sa_family; + if (!is_ipv6) { + sockaddr_in *addr_in = (sockaddr_in*)sock_addr; + *(uint32_t*)ip = ntohl(addr_in->sin_addr.s_addr); + port = ntohs(addr_in->sin_port); + } else { + sockaddr_in6 *addr_in6 = (sockaddr_in6 *)sock_addr; + MEMCPY(ip, &addr_in6->sin6_addr, sizeof(in6_addr::s6_addr)); + port = ntohs(addr_in6->sin6_port); + } +} + +bool ObNetUtil::straddr_to_addr(const char *ip_str, bool &is_ipv6, void *ip) +{ + bool bret = true; + if (!ip_str) { + bret = false; + } else if ('\0' == *ip_str) { + // empty ip + *(uint32_t*)ip = 0; + } else { + const char *colonp = strchr(ip_str, ':'); + is_ipv6 = colonp != NULL; + if (!is_ipv6) { + in_addr in; + int rc = inet_pton(AF_INET, ip_str, &in); + if (rc != 1) { + // wrong ip or error + bret = false; + } else { + *(uint32_t*)ip = ntohl(in.s_addr); + } + } else { + in6_addr in6; + int rc = inet_pton(AF_INET6, ip_str, &in6); + if (rc != 1) { + // wrong ip or error + bret = false; + } else { + memcpy(ip, in6.s6_addr, sizeof(in6_addr::s6_addr)); + } + } + } + return bret; +} + +bool ObNetUtil::is_support_ipv6() +{ + bool support_ipv6 = false; + int fd = socket(AF_INET6, SOCK_STREAM, 0); + if (fd >= 0) { + support_ipv6 = true; + close(fd); + } + return support_ipv6; +} + +char* ObNetUtil::sockaddr_to_str(struct sockaddr_storage *addr, char *buf, int len) +{ + ObAddr ob_addr; + ob_addr.from_sockaddr(addr); + ob_addr.to_string(buf, len); + return buf; +} + +char *ObNetUtil::sockfd_to_str(int fd, char *buf, int len) +{ + char *cret = NULL; + struct sockaddr_storage addr; + socklen_t sock_len = sizeof(addr); + if (0 != getsockname(fd, (struct sockaddr *)&addr, &sock_len)) { + LOG_WARN_RET(OB_ERR_SYS, "getsockname failed", K(fd), K(errno)); + } else { + cret = sockaddr_to_str(&addr, buf, len); + } + return cret; +} + +int ObNetUtil::sockaddr_compare(struct sockaddr_storage *left, struct sockaddr_storage *right) +{ + int cmp = 0; + struct sockaddr *l = (typeof(l))left; + struct sockaddr *r = (typeof(r))right; + cmp = l->sa_family - r->sa_family; + if (0 == cmp) { + if (AF_INET == l->sa_family) { + struct sockaddr_in *l_sin = (struct sockaddr_in *)l; + struct sockaddr_in *r_sin = (struct sockaddr_in *)r; + cmp = l_sin->sin_port - r_sin->sin_port; + if (0 == cmp) { + cmp = memcmp(&l_sin->sin_addr, &r_sin->sin_addr, sizeof(l_sin->sin_addr)); + } + } else if (AF_INET6 == l->sa_family) { + struct sockaddr_in6 *l_sin = (struct sockaddr_in6 *)l; + struct sockaddr_in6 *r_sin = (struct sockaddr_in6 *)r; + cmp = l_sin->sin6_port - r_sin->sin6_port; + if (0 == cmp) { + cmp = memcmp(&l_sin->sin6_addr, &r_sin->sin6_addr, sizeof(l_sin->sin6_addr)); + } + } + } + return cmp; +} + +bool ObNetUtil::is_valid_sockaddr(struct sockaddr_storage *sock_addr_s) +{ + bool valid = true; + struct sockaddr *sock_addr = (struct sockaddr *)sock_addr_s; + if (sock_addr->sa_family == AF_INET) { + sockaddr_in *addr_in = (sockaddr_in*)sock_addr; + valid = addr_in->sin_addr.s_addr != 0 && addr_in->sin_port != 0; + } else if (sock_addr->sa_family == AF_INET6) { + sockaddr_in6 *addr_in6 = (sockaddr_in6 *)sock_addr; + valid = addr_in6->sin6_port != 0; + } else { + valid = false; + } + return valid; +} + char *ObNetUtil::get_addr_by_hostname(const char *hostname) { char *addr = nullptr; @@ -333,3 +482,42 @@ bool ObNetUtil::is_in_white_list(const ObString &client_ip, ObString &orig_ip_wh } // namespace obsys } // namespace oceanbase + +extern "C" { + struct sockaddr_storage* make_unix_sockaddr_any_c(bool is_ipv6, int port, struct sockaddr_storage *sock_addr) + { + return oceanbase::obsys::ObNetUtil::make_unix_sockaddr_any(is_ipv6, port, sock_addr); + } + struct sockaddr_storage* make_unix_sockaddr_c(bool is_ipv6, void *ip, int port, struct sockaddr_storage *sock_addr) + { + return oceanbase::obsys::ObNetUtil::make_unix_sockaddr(is_ipv6, ip, port, sock_addr); + } + void sockaddr_to_addr_c(struct sockaddr_storage *sock_addr, bool *is_ipv6, void *ip, int *port) + { + return oceanbase::obsys::ObNetUtil::sockaddr_to_addr(sock_addr, *(bool*)is_ipv6, ip, *(int*)port); + } + bool straddr_to_addr_c(const char *ip_str, bool *is_ipv6, void *ip) + { + return oceanbase::obsys::ObNetUtil::straddr_to_addr(ip_str, *(bool*)is_ipv6, ip); + } + bool is_support_ipv6_c() + { + return oceanbase::obsys::ObNetUtil::is_support_ipv6(); + } + char *sockaddr_to_str_c(struct sockaddr_storage *sock_addr, char *buf, int len) + { + return oceanbase::obsys::ObNetUtil::sockaddr_to_str(sock_addr, buf, len); + } + char *sockfd_to_str_c(int fd, char *buf, int len) + { + return oceanbase::obsys::ObNetUtil::sockfd_to_str(fd, buf, len); + } + int sockaddr_compare_c(struct sockaddr_storage *left, struct sockaddr_storage *right) + { + return oceanbase::obsys::ObNetUtil::sockaddr_compare(left, right); + } + bool is_valid_sockaddr_c(struct sockaddr_storage *sock_addr) + { + return oceanbase::obsys::ObNetUtil::is_valid_sockaddr(sock_addr); + } +} /* extern "C" */ diff --git a/deps/oblib/src/lib/net/ob_net_util.h b/deps/oblib/src/lib/net/ob_net_util.h index 229ef841b5..85b5aee3d6 100644 --- a/deps/oblib/src/lib/net/ob_net_util.h +++ b/deps/oblib/src/lib/net/ob_net_util.h @@ -55,10 +55,31 @@ public: // get ipv4 by hostname, no need free the returned value static char *get_addr_by_hostname(const char *hostname); static int get_ifname_by_addr(const char *local_ip, char *if_name, uint64_t if_name_len, bool& has_found); + static struct sockaddr_storage* make_unix_sockaddr_any(bool is_ipv6, int port, struct sockaddr_storage *sock_addr); + static struct sockaddr_storage* make_unix_sockaddr(bool is_ipv6, const void *ip, int port, struct sockaddr_storage *sock_addr); + static void sockaddr_to_addr(struct sockaddr_storage *sock_addr, bool &is_ipv6, void *ip, int &port); + static bool straddr_to_addr(const char *ip_str, bool &is_ipv6, void *ip); + static bool is_support_ipv6(); + static char *sockaddr_to_str(struct sockaddr_storage *sock_addr, char *buf, int len); + static char *sockfd_to_str(int fd, char *buf, int len); + static int sockaddr_compare(struct sockaddr_storage *left, struct sockaddr_storage *right); + static bool is_valid_sockaddr(struct sockaddr_storage *sock_addr); static bool is_match(const common::ObString &client_ip, const common::ObString &host_name); static bool is_in_white_list(const common::ObString &client_ip, common::ObString &orig_ip_white_list); }; } // namespace obsys } // namespace oceanbase + +extern "C" { + struct sockaddr_storage* make_unix_sockaddr_any_c(bool is_ipv6, int port, struct sockaddr_storage *a); + struct sockaddr_storage* make_unix_sockaddr_c(bool is_ipv6, void *ip, int port, struct sockaddr_storage *sock_addr); + void sockaddr_to_addr_c(struct sockaddr_storage *sock_addr, bool *is_ipv6, void *ip, int *port); + bool straddr_to_addr_c(const char *ip_str, bool *is_ipv6, void *ip); + bool is_support_ipv6_c(); + char *sockaddr_to_str_c(struct sockaddr_storage *sock_addr, char *buf, int len); + char *sockfd_to_str_c(int fd, char *buf, int len); + int sockaddr_compare_c(struct sockaddr_storage *left, struct sockaddr_storage *right); + bool is_valid_sockaddr_c(struct sockaddr_storage *sock_addr); +} /* extern "C" */ #endif diff --git a/deps/oblib/src/lib/ob_running_mode.cpp b/deps/oblib/src/lib/ob_running_mode.cpp index ee36d63339..d064dc1dfe 100644 --- a/deps/oblib/src/lib/ob_running_mode.cpp +++ b/deps/oblib/src/lib/ob_running_mode.cpp @@ -21,6 +21,12 @@ const int64_t ObRunningModeConfig::MINI_MEM_UPPER = 12L << 30; const int64_t ObRunningModeConfig::MINI_CPU_UPPER = 8; bool __attribute__((weak)) mtl_is_mini_mode() { return false; } - } //end of namespace lib } //end of namespace oceanbase + +extern "C" { + bool use_ipv6_c() + { + return oceanbase::lib::use_ipv6(); + } +} /* extern "C" */ diff --git a/deps/oblib/src/lib/ob_running_mode.h b/deps/oblib/src/lib/ob_running_mode.h index 289304f1d3..f0aca78b12 100644 --- a/deps/oblib/src/lib/ob_running_mode.h +++ b/deps/oblib/src/lib/ob_running_mode.h @@ -29,6 +29,7 @@ struct ObRunningModeConfig bool mini_mode_ = false; bool mini_cpu_mode_ = false; int64_t memory_limit_ = 0; + bool use_ipv6_ = false; static ObRunningModeConfig &instance(); private: ObRunningModeConfig() = default; @@ -70,6 +71,21 @@ inline void update_mini_mode(int64_t memory_limit, int64_t cpu_cnt) ObRunningModeConfig::instance().mini_cpu_mode_ = (cpu_cnt <= lib::ObRunningModeConfig::MINI_CPU_UPPER); } +inline bool use_ipv6() +{ + return ObRunningModeConfig::instance().use_ipv6_; +} + +inline void enable_use_ipv6() +{ + ObRunningModeConfig::instance().use_ipv6_ = true; +} + } //lib } //oceanbase + +extern "C" { + bool use_ipv6_c(); +} /* extern "C" */ + #endif // OB_RUNNING_MODE_H_ diff --git a/deps/oblib/src/lib/oblog/ob_log.cpp b/deps/oblib/src/lib/oblog/ob_log.cpp index dbad007174..a4c43b22a6 100644 --- a/deps/oblib/src/lib/oblog/ob_log.cpp +++ b/deps/oblib/src/lib/oblog/ob_log.cpp @@ -688,7 +688,6 @@ int ObLogger::log_head(const int64_t ts, ts_to_tv(ts, tv); struct tm tm; ob_fast_localtime(last_unix_sec_, last_localtime_, static_cast(tv.tv_sec), &tm); - const uint64_t *trace_id = ObCurTraceId::get(); const int32_t errcode_buf_size = 32; char errcode_buf[errcode_buf_size]; errcode_buf[0] = '\0'; @@ -702,20 +701,20 @@ int ObLogger::log_head(const int64_t ts, //forbid modify the format of logdata_printf ret = logdata_printf(buf, buf_len, pos, "[%04d-%02d-%02d %02d:%02d:%02d.%06ld] " - "[%ld][%s][T%lu][" TRACE_ID_FORMAT_V2 "] ", + "[%ld][%s][T%lu][%s] ", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, - tm.tm_sec, tv.tv_usec, GETTID(), GETTNAME(), GET_TENANT_ID(), TRACE_ID_FORMAT_PARAM(trace_id)); + tm.tm_sec, tv.tv_usec, GETTID(), GETTNAME(), GET_TENANT_ID(), ObCurTraceId::get_trace_id_str()); } else { constexpr int cluster_id_buf_len = 8; char cluster_id_buf[cluster_id_buf_len] = {'\0'}; (void)snprintf(cluster_id_buf, cluster_id_buf_len, "[C%lu]", GET_CLUSTER_ID()); ret = logdata_printf(buf, buf_len, pos, "[%04d-%02d-%02d %02d:%02d:%02d.%06ld] " - "%-5s %s%s (%s:%d) [%ld][%s]%s[T%lu][" TRACE_ID_FORMAT_V2 "] [lt=%ld]%s ", + "%-5s %s%s (%s:%d) [%ld][%s]%s[T%lu][%s] [lt=%ld]%s ", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, tv.tv_usec, errstr_[level], mod_name, function, base_file_name, line, GETTID(), GETTNAME(), is_arb_replica_ ? cluster_id_buf : "", - is_arb_replica_ ? GET_ARB_TENANT_ID() : GET_TENANT_ID(), TRACE_ID_FORMAT_PARAM(trace_id), + is_arb_replica_ ? GET_ARB_TENANT_ID() : GET_TENANT_ID(), ObCurTraceId::get_trace_id_str(), last_logging_cost_time_us_, errcode_buf); } } diff --git a/deps/oblib/src/lib/profile/ob_trace_id.cpp b/deps/oblib/src/lib/profile/ob_trace_id.cpp index 68025d5d39..a6e9065f6b 100644 --- a/deps/oblib/src/lib/profile/ob_trace_id.cpp +++ b/deps/oblib/src/lib/profile/ob_trace_id.cpp @@ -30,3 +30,12 @@ OB_SERIALIZE_MEMBER(ObCurTraceId::TraceId, uval_[0], uval_[1], uval_[2], uval_[3 } // end namespace common } // end namespace oceanbase + +extern "C" { + const char* trace_id_to_str_c(const uint64_t *uval) + { + ObCurTraceId::TraceId trace_id; + trace_id.set(uval); + return to_cstring(trace_id); + } +} /* extern "C" */ diff --git a/deps/oblib/src/lib/profile/ob_trace_id.h b/deps/oblib/src/lib/profile/ob_trace_id.h index da7e919d13..104ba378a8 100644 --- a/deps/oblib/src/lib/profile/ob_trace_id.h +++ b/deps/oblib/src/lib/profile/ob_trace_id.h @@ -20,9 +20,8 @@ namespace oceanbase { namespace common { -#define TRACE_ID_FORMAT "Y%lX-%016lX" -#define TRACE_ID_FORMAT_V2 "Y%lX-%016lX-%lx-%lx" -#define TRACE_ID_FORMAT_PARAM(x) x[0], x[1], x[2], x[3] +#define TRACE_ID_FORMAT_V4 "Y%lX-%016lX-%lx-%lx" +#define TRACE_ID_FORMAT_V6 "Z%X-%016lX-%lx-%lx" struct ObCurTraceId { class SeqGenerator @@ -113,20 +112,19 @@ struct ObCurTraceId } return ret; } - inline int64_t to_string(char *buf, const int64_t buf_len) const { int64_t pos = 0; - common::databuff_printf(buf, buf_len, pos, TRACE_ID_FORMAT_V2, uval_[0], uval_[1], uval_[2], uval_[3]); + if (!id_.is_ipv6_) { + common::databuff_printf(buf, buf_len, pos, TRACE_ID_FORMAT_V4, uval_[0], uval_[1], uval_[2], uval_[3]); + } else { + common::databuff_printf(buf, buf_len, pos, TRACE_ID_FORMAT_V6, id_.bytes_no_ip_, uval_[1], uval_[2], uval_[3]); + } return pos; } int parse_from_buf(char* buf) { - int ret = OB_SUCCESS; - if (4 != sscanf(buf, TRACE_ID_FORMAT_V2, &uval_[0], &uval_[1], &uval_[2], &uval_[3])) { - ret = OB_INVALID_ARGUMENT; - } - return ret; + return set(buf); } inline bool equals(const TraceId &trace_id) const { @@ -147,13 +145,14 @@ struct ObCurTraceId inline int set(const char *buf) { int ret = OB_SUCCESS; - if (OB_ISNULL(buf)) { - ret = OB_ERR_UNEXPECTED; - } else { - int32_t return_value = sscanf(buf, TRACE_ID_FORMAT, &uval_[0], &uval_[1]); - if (0 != return_value && 2 != return_value) { - ret = OB_ERR_UNEXPECTED; - } + int n_match = 0; + if (TRACE_ID_FORMAT_V4[0] == buf[0]) { + n_match = sscanf(buf, TRACE_ID_FORMAT_V4, &uval_[0], &uval_[1], &uval_[2], &uval_[3]); + } else if (TRACE_ID_FORMAT_V6[0] == buf[0]) { + n_match = sscanf(buf, TRACE_ID_FORMAT_V6, &id_.bytes_no_ip_, &uval_[1], &uval_[2], &uval_[3]); + } + if (n_match != 0 && n_match != 4) { + ret = OB_INVALID_ARGUMENT; } return ret; } @@ -185,11 +184,16 @@ struct ObCurTraceId struct { uint32_t ip_: 32; - uint16_t port_: 16; - uint8_t is_user_request_: 1; - uint8_t is_ipv6_: 1; - uint16_t reserved_: 2; - uint16_t sub_task_: 12; + union { + struct { + uint16_t port_: 16; + uint8_t is_user_request_: 1; + uint8_t is_ipv6_: 1; + uint16_t reserved_: 2; + uint16_t sub_task_: 12; + }; + uint32_t bytes_no_ip_; + }; uint64_t seq_: 64; uint64_t ipv6_[2]; } id_; @@ -316,5 +320,9 @@ private: }// namespace common }// namespace oceanbase +extern "C" { + const char* trace_id_to_str_c(const uint64_t *uval); +} /* extern "C" */ + #endif diff --git a/deps/oblib/src/lib/signal/ob_signal_handlers.cpp b/deps/oblib/src/lib/signal/ob_signal_handlers.cpp index 12cc162a8a..f0b3ca7325 100644 --- a/deps/oblib/src/lib/signal/ob_signal_handlers.cpp +++ b/deps/oblib/src/lib/signal/ob_signal_handlers.cpp @@ -96,7 +96,7 @@ signal_handler_t &get_signal_handler() bool g_redirect_handler = false; static __thread int g_coredump_num = 0; -#define COMMON_FMT "timestamp=%ld, tid=%ld, tname=%s, trace_id="TRACE_ID_FORMAT_V2", extra_info=(%s), lbt=%s" +#define COMMON_FMT "timestamp=%ld, tid=%ld, tname=%s, trace_id=%s, extra_info=(%s), lbt=%s" void coredump_cb(int, int, void*, void*); void ob_signal_handler(int sig, siginfo_t *si, void *context) @@ -193,10 +193,13 @@ void coredump_cb(volatile int sig, volatile int sig_code, void* volatile sig_add bt[len++] = '\0'; // extra const ObFatalErrExtraInfoGuard *extra_info = nullptr; // TODO: May deadlock, ObFatalErrExtraInfoGuard::get_thd_local_val_ptr(); - uint64_t uval[4] = {0}; auto *trace_id = ObCurTraceId::get_trace_id(); + char trace_id_buf[128] = {'\0'}; if (trace_id != nullptr) { - trace_id->get_uval(uval); + int64_t pos = trace_id->to_string(trace_id_buf, sizeof(trace_id_buf)); + if (pos < sizeof(trace_id_buf)) { + trace_id_buf[pos]= '\0'; + } } char print_buf[1024]; const ucontext_t *con = (ucontext_t *)context; @@ -222,7 +225,7 @@ void coredump_cb(volatile int sig, volatile int sig_code, void* volatile sig_add ssize_t print_len = lnprintf(print_buf, sizeof(print_buf), "%s IP=%lx, RBP=%lx, sig=%d, sig_code=%d, sig_addr=%p, RLIMIT_CORE=%s, "COMMON_FMT", ", crash_info, ip, bp, sig, sig_code, sig_addr, rlimit_core, - ts, GETTID(), tname, TRACE_ID_FORMAT_PARAM(uval), + ts, GETTID(), tname, trace_id_buf, (NULL == extra_info) ? NULL : to_cstring(*extra_info), bt); ObSqlInfo sql_info = ObSqlInfoGuard::get_tl_sql_info(); char sql_id[] = "SQL_ID="; diff --git a/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp b/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp index 7a0cf5574e..bb35c1637a 100644 --- a/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp +++ b/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp @@ -15,6 +15,7 @@ #include "rpc/obmysql/ob_sql_sock_session.h" #include "rpc/obmysql/ob_i_sql_sock_handler.h" #include "rpc/obmysql/ob_sql_sock_session.h" +#include "lib/ob_running_mode.h" #include "lib/oblog/ob_log.h" #include "lib/allocator/ob_malloc.h" #include "lib/queue/ob_link_queue.h" @@ -22,6 +23,7 @@ #include "lib/thread/ob_thread_name.h" #include "lib/utility/ob_macro_utils.h" #include "lib/profile/ob_trace_id.h" +#include "lib/net/ob_net_util.h" #include "common/ob_clock_generator.h" #include #include @@ -613,16 +615,47 @@ int ObSqlSock::write_handshake_packet(const char* buf, int64_t sz) { return ret; } -static struct epoll_event *__make_epoll_event(struct epoll_event *event, uint32_t event_flag, void* val) { +// Why wrapping? The purpose is to distinguish the fd from the pointer. +// See ObSqlNioImpl::handle_epoll_event() in this file for details. +static uint64_t wrap_fd(int fd) +{ + return 0x8000000000000000 | fd; +} + +static int unwrap_fd(uint64_t wrapped_fd) +{ + return (int)(wrapped_fd & 0xffffffff); +} + +static bool is_wrapped_fd(uint64_t num64) +{ + bool wrapped = false; + if (0 != (0x8000000000000000 & num64)) { + wrapped = true; + } + return wrapped; +} + +static struct epoll_event *__make_epoll_event( + struct epoll_event *event, + uint32_t event_flag, + int fd, + void* val) +{ event->events = event_flag; - event->data.ptr = val; + if (NULL == val) { + event->data.u64 = wrap_fd(fd); + } else { + event->data.ptr = val; + } + return event; } static int epoll_regist(int epfd, int fd, uint32_t eflag, void* s) { int err = 0; struct epoll_event event; - if (0 != epoll_ctl(epfd, EPOLL_CTL_ADD, fd, __make_epoll_event(&event, eflag, s))) { + if (0 != epoll_ctl(epfd, EPOLL_CTL_ADD, fd, __make_epoll_event(&event, eflag, fd, s))) { err = -EIO; LOG_ERROR_RET(common::OB_ERR_SYS, "add fd to epoll failed", K(fd), K(epfd), K(errno)); } @@ -637,21 +670,28 @@ static int socket_set_opt(int fd, int option, int value) // need_monopolize is true means the first bind on mysql port should // detect whether the port has been used or not to prevent the same mysql port // been used by different observer processes -static int listen_create(int port, bool need_monopolize) +static int listen_create(int family, int port, bool need_monopolize) { int err = 0; int fd = 0; - struct sockaddr_in sin; - if ((fd = socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0)) < 0) { + struct sockaddr_storage addr; + memset(&addr, 0, sizeof(addr)); + int ipv6_only_on = 1; /* Disable IPv4-mapped IPv6 addresses */ + if ((fd = socket(family, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0)) < 0) { LOG_ERROR_RET(common::OB_ERR_SYS, "sql nio create socket for listen failed", K(errno)); err = errno; } else if (socket_set_opt(fd, SO_REUSEADDR, 1) < 0) { LOG_ERROR_RET(OB_ERR_SYS, "sql nio set sock opt SO_REUSEADDR failed", K(errno), K(fd)); err = errno; + } else if (AF_INET6 == family && + setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &ipv6_only_on, sizeof(ipv6_only_on)) < 0) { + LOG_ERROR_RET(OB_ERR_SYS, "sql nio set sock opt IPV6_V6ONLY failed", K(errno), K(fd)); + err = errno; } else if ((false == need_monopolize) && (socket_set_opt(fd, SO_REUSEPORT, 1) < 0)) { LOG_ERROR_RET(OB_ERR_SYS, "sql nio set sock opt SO_REUSEPORT failed", K(errno), K(fd)); err = errno; - } else if (bind(fd, (sockaddr*)obrpc::make_unix_sockaddr(&sin, 0, port), sizeof(sin))) { + } else if (bind(fd, (sockaddr*)obsys::ObNetUtil::make_unix_sockaddr_any(AF_INET6 == family, port, &addr), + sizeof(addr)) < 0) { LOG_ERROR_RET(OB_ERR_SYS, "sql nio bind listen fd failed", K(errno), K(fd)); err = errno; } else if (listen(fd, 1024) < 0) { @@ -774,16 +814,32 @@ public: if ((epfd_ = epoll_create1(EPOLL_CLOEXEC)) < 0) { ret = OB_IO_ERROR; LOG_WARN("epoll_create fail", K(ret), K(errno)); - } else if ((lfd_ = listen_create(port, need_monopolize)) < 0) { - ret = OB_SERVER_LISTEN_ERROR; - LOG_WARN("listen create fail", K(ret), K(port), K(errno), KERRNOMSG(errno)); - } else if (0 != epoll_regist(epfd_, lfd_, epflag, NULL)) { - ret = OB_IO_ERROR; - LOG_WARN("regist listen fd fail", K(ret)); - } else if (OB_FAIL(evfd_.create(epfd_))) { - LOG_WARN("evfd create fail", K(ret)); } else { - LOG_INFO("sql_nio init listen succ", K(port)); + int ret = OB_SUCCESS; + if ((lfd_ = listen_create(!oceanbase::lib::use_ipv6() ? AF_INET : AF_INET6, port, need_monopolize)) < 0) { + ret = OB_SERVER_LISTEN_ERROR; + LOG_WARN("listen create fail", K(ret), K(port), K(errno), KERRNOMSG(errno)); + } else if (0 != epoll_regist(epfd_, lfd_, epflag, NULL)) { + ret = OB_IO_ERROR; + LOG_WARN("regist listen fd fail", K(ret)); + } else { + LOG_INFO("sql_nio init listen succ", K(port), "fd", lfd_); + } + if (OB_SUCCESS != ret && lfd_ >= 0) { + close(lfd_); + lfd_ = -1; + } + if (OB_SUCCESS != ret) + { + ret = OB_SERVER_LISTEN_ERROR; + } + } + if (OB_SUCC(ret)) { + if (OB_FAIL(evfd_.create(epfd_))) { + LOG_WARN("evfd create fail", K(ret)); + } else { + LOG_INFO("sql_nio init listen succ", K(port)); + } } return ret; } @@ -877,13 +933,23 @@ private: struct epoll_event events[maxevents]; int cnt = ob_epoll_wait(epfd_, events, maxevents, 1000); for(int i = 0; i < cnt; i++) { - ObSqlSock* s = (ObSqlSock*)events[i].data.ptr; - if (OB_UNLIKELY(NULL == s)) { - do_accept_loop(); - } else if (OB_UNLIKELY((void*)&evfd_ == (void*)s)) { - evfd_.consume(); + uint64_t num64 = events[i].data.u64; + if (is_wrapped_fd(num64)) { + int lfd = unwrap_fd(num64); + if (lfd == lfd_) { + do_accept_loop(lfd); + } else { + // skip + } } else { - handle_sock_event(s, events[i].events); + ObSqlSock* s = (ObSqlSock*)events[i].data.ptr; + if (OB_UNLIKELY(NULL == s)) { + // skip + } else if (OB_UNLIKELY((void*)&evfd_ == (void*)s)) { + evfd_.consume(); + } else { + handle_sock_event(s, events[i].events); + } } } } @@ -985,14 +1051,14 @@ private: } } - void do_accept_loop() { + void do_accept_loop(int lfd) { while(1){ int fd = -1; - if ((fd = accept4(lfd_, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC)) < 0) { + if ((fd = accept4(lfd, NULL, NULL, SOCK_NONBLOCK|SOCK_CLOEXEC)) < 0) { if (EAGAIN == errno || EWOULDBLOCK == errno) { break; } else { - LOG_ERROR_RET(OB_ERR_SYS, "accept4 fail", K(lfd_), K(errno)); + LOG_ERROR_RET(OB_ERR_SYS, "accept4 fail", K(lfd), K(errno)); break; } } else { diff --git a/deps/oblib/src/rpc/obmysql/ob_sql_sock_handler.cpp b/deps/oblib/src/rpc/obmysql/ob_sql_sock_handler.cpp index 8b9eee21e0..07e6782134 100644 --- a/deps/oblib/src/rpc/obmysql/ob_sql_sock_handler.cpp +++ b/deps/oblib/src/rpc/obmysql/ob_sql_sock_handler.cpp @@ -39,19 +39,7 @@ static int get_client_addr_for_sql_sock_session(int fd, ObAddr& client_addr) ret = OB_ERR_UNEXPECTED; LOG_WARN("sql nio getpeername failed", K(errno), K(ret)); } else { - if (AF_INET == addr.ss_family) { - struct sockaddr_in *s = (struct sockaddr_in *)&addr; - if (false == client_addr.set_ipv4_addr(ntohl(s->sin_addr.s_addr), ntohs(s->sin_port))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("sql nio set_ipv4_addr failed", K(ret)); - } - } else if (AF_INET6 == addr.ss_family) { - struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr; - if (false == client_addr.set_ipv6_addr((void *)&s->sin6_addr, ntohs(s->sin6_port))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("sql nio set_ipv6_addr failed", K(ret)); - } - } + client_addr.from_sockaddr(&addr); } return ret; diff --git a/deps/oblib/src/rpc/obrpc/ob_listener.cpp b/deps/oblib/src/rpc/obrpc/ob_listener.cpp index 992140157c..36f1c9c26a 100644 --- a/deps/oblib/src/rpc/obrpc/ob_listener.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_listener.cpp @@ -11,9 +11,11 @@ */ #include "rpc/obrpc/ob_listener.h" +#include "lib/ob_running_mode.h" #include "lib/oblog/ob_log.h" #include "lib/oblog/ob_log_module.h" #include "lib/utility/serialization.h" +#include "lib/net/ob_net_util.h" #include #include #include @@ -33,6 +35,10 @@ using namespace oceanbase::common; using namespace oceanbase::obrpc; using namespace oceanbase::common::serialization; +#ifndef SO_REUSEPORT +#define SO_REUSEPORT 15 +#endif + ObListener::ObListener() { listen_fd_ = -1; @@ -57,58 +63,57 @@ int ObListener::ob_listener_set_opt(int fd, int option, int value) return setsockopt(fd, SOL_SOCKET, option, (void *)&value, sizeof(value)); } -int ObListener::listen_create(int port) { +int ObListener::listen_special(int family, int port) +{ int ret = OB_SUCCESS; int fd = -1; - struct sockaddr_in sin; + struct sockaddr_storage addr; int no_block_flag = 1; - - memset(&sin, 0, sizeof(sin)); - socklen_t ob_listener_gid_len = sizeof(OB_LISTENER_GID); - + memset(&addr, 0, sizeof(addr)); if (port <= 0) { ret = OB_INVALID_ARGUMENT; RPC_LOG(ERROR, "invalid port", K(ret), K(port)); - } else if ((fd = socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, 0)) < 0) { + } else if (AF_INET != family && AF_INET6 != family) { + ret = OB_INVALID_ARGUMENT; + RPC_LOG(ERROR, "invalid port", K(ret), K(port)); + } else if ((fd = socket(family, SOCK_STREAM|SOCK_CLOEXEC, 0)) < 0) { ret = OB_SERVER_LISTEN_ERROR; - RPC_LOG(ERROR, "create socket failed!", K(ret), K(fd), K(port), K(errno)); + RPC_LOG(ERROR, "create socket failed!", K(ret), K(fd), K(family), K(port), K(errno)); } else if (ioctl(fd, FIONBIO, &no_block_flag) < 0) { ret = OB_SERVER_LISTEN_ERROR; - RPC_LOG(ERROR, "set non block failed!", K(ret), K(fd), K(port), K(errno)); + RPC_LOG(ERROR, "set non block failed!", K(ret), K(fd), K(family), K(port), K(errno)); } else if (ob_listener_set_tcp_opt(fd, TCP_DEFER_ACCEPT, 1) < 0) { ret = OB_SERVER_LISTEN_ERROR; - RPC_LOG(ERROR, "set tcp defer accept failed!", K(ret), K(fd), K(port), K(errno)); + RPC_LOG(ERROR, "set tcp defer accept failed!", K(ret), K(fd), K(family), K(port), K(errno)); } else if (ob_listener_set_opt(fd, SO_REUSEADDR, 1) < 0) { ret = OB_SERVER_LISTEN_ERROR; - RPC_LOG(ERROR, "set reuse_addr fail!", K(ret), K(fd), K(port), K(errno)); + RPC_LOG(ERROR, "set reuse addr fail!", K(ret), K(fd), K(family), K(port), K(errno)); + } else if (ob_listener_set_opt(fd, SO_REUSEPORT, 1) < 0) { + ret = OB_SERVER_LISTEN_ERROR; + RPC_LOG(ERROR, "set reuse port fail!", K(fd), K(family), K(port), K(errno)); + } else if (bind(fd, (sockaddr*)obsys::ObNetUtil::make_unix_sockaddr_any(AF_INET6 == family, port, &addr), + sizeof(addr)) < 0) { + ret = OB_SERVER_LISTEN_ERROR; + RPC_LOG(ERROR, "bind failed!", K(fd), K(family), K(port), K(errno)); + } else if (listen(fd, 1024) < 0) { + ret = OB_SERVER_LISTEN_ERROR; + RPC_LOG(ERROR, "listen failed", K(fd), K(family), K(port), K(errno)); } -#ifdef SO_REUSEPORT - else if (ob_listener_set_opt(fd, SO_REUSEPORT, 1) < 0) { - ret = OB_SERVER_LISTEN_ERROR; - RPC_LOG(ERROR, "set reuse port fail!", K(fd), K(port), K(errno)); - } -#endif - else if (ussl_setsockopt(fd, SOL_OB_SOCKET, SO_OB_SET_SERVER_GID, &OB_LISTENER_GID, ob_listener_gid_len) < 0) { - ret = OB_SERVER_LISTEN_ERROR; - RPC_LOG(ERROR, "set ObListener gid failed", K(OB_LISTENER_GID)); - } else if (bind(fd, (sockaddr*)make_unix_sockaddr(&sin, 0, port), sizeof(sin)) < 0) { - ret = OB_SERVER_LISTEN_ERROR; - RPC_LOG(ERROR, "bind failed!", K(errno)); - } else if (ussl_listen(fd, 1024) < 0) { - ret = OB_SERVER_LISTEN_ERROR; - RPC_LOG(ERROR, "listen failed", K(errno)); + if (OB_FAIL(ret) && fd >= 0) { + close(fd); + fd = -1; } + return fd; +} - if (OB_FAIL(ret)) { - if (fd >= 0) { - close(fd); - fd = -1; - } - } else { - listen_fd_ = fd; - RPC_LOG(INFO, "create listen success!", K(port)); +int ObListener::listen_create(int port) { + int ret = OB_SUCCESS; + if ((listen_fd_ = listen_special(AF_INET, port)) < 0) { + RPC_LOG(ERROR, "create listen for IPv4 fail!", K(errno)); + } + if (listen_fd_ < 0) { + ret = OB_SERVER_LISTEN_ERROR; } - return ret; } @@ -132,9 +137,11 @@ void ObListener::destroy() struct linger so_linger; so_linger.l_onoff = 1; so_linger.l_linger = 0; - setsockopt(listen_fd_, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); - close(listen_fd_); - listen_fd_ = -1; + if (listen_fd_ >= 0) { + setsockopt(listen_fd_, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger)); + close(listen_fd_); + listen_fd_ = -1; + } memset(&io_wrpipefd_map_, 0, sizeof(io_wrpipefd_map_)); } } @@ -278,28 +285,35 @@ static void trace_connection_info(int fd) addr_len = sizeof(addr); if (getpeername(fd, (struct sockaddr *)&addr, &addr_len) == 0) { - char *src_addr = NULL; - uint16_t src_port = 0; - src_addr = inet_ntoa(((struct sockaddr_in *)&addr)->sin_addr); - src_port = ntohs(((struct sockaddr_in *)&addr)->sin_port); - RPC_LOG(INFO, "oblistener receive connection from", KCSTRING(src_addr), K(src_port)); + ObAddr peer; + peer.from_sockaddr(&addr); + RPC_LOG(INFO, "oblistener receive connection from", K(peer)); } } void ObListener::do_work() { - struct epoll_event listen_ev; - listen_ev.events = EPOLLIN; - listen_ev.data.fd = listen_fd_; int ret = OB_SUCCESS; - int epoll_fd = epoll_create(256); + int epoll_fd = -1; - if (epoll_fd < 0) { + if (listen_fd_ < 0) { + ret = OB_IO_ERROR; + RPC_LOG(ERROR, "listen_fd_ is less than 0"); + } else if ((epoll_fd = epoll_create(256)) < 0) { ret = OB_IO_ERROR; RPC_LOG(ERROR, "epoll create failed", K(errno)); - } else if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd_, &listen_ev) < 0) { - ret = OB_IO_ERROR; - RPC_LOG(ERROR, "epoll add listen fd failed!", K(errno)); + } + + if (OB_SUCC(ret)) { + if (listen_fd_ >= 0) { + struct epoll_event listen_ev4; + listen_ev4.events = EPOLLIN; + listen_ev4.data.fd = listen_fd_; + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd_, &listen_ev4) < 0) { + ret = OB_IO_ERROR; + RPC_LOG(ERROR, "epoll add listen fd for IPv4 failed!", K(errno)); + } + } } if (OB_FAIL(ret)) { diff --git a/deps/oblib/src/rpc/obrpc/ob_listener.h b/deps/oblib/src/rpc/obrpc/ob_listener.h index e4db960328..6fe96136e4 100644 --- a/deps/oblib/src/rpc/obrpc/ob_listener.h +++ b/deps/oblib/src/rpc/obrpc/ob_listener.h @@ -29,15 +29,6 @@ namespace obrpc #define MAX_PROTOCOL_TYPE_SIZE (5) #define OB_LISTENER_MAX_THREAD_CNT 64 -inline struct sockaddr_in* make_unix_sockaddr(struct sockaddr_in *sin, in_addr_t ip, int port) { - if (NULL != sin) { - sin->sin_port = (uint16_t)htons((uint16_t)port); - sin->sin_addr.s_addr = ip; - sin->sin_family = AF_INET; - } - return sin; -} - typedef struct io_threads_pipefd_pool_t{ int count; int pipefd[OB_LISTENER_MAX_THREAD_CNT]; @@ -67,6 +58,7 @@ public: private: void do_work(); + int listen_special(int family, int port); private: int listen_fd_; int port_; diff --git a/deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp b/deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp index 53611e810d..554875fbac 100644 --- a/deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp @@ -37,7 +37,9 @@ extern "C" { extern int ob_epoll_wait(int __epfd, struct epoll_event *__events, - int __maxevents, int __timeout); + int __maxevents, int __timeout); +extern int sockaddr_compare_c(struct sockaddr_storage *left, struct sockaddr_storage *right); +extern char *sockaddr_to_str_c(struct sockaddr_storage *sock_addr, char *buf, int len); }; using namespace oceanbase::common; @@ -368,22 +370,20 @@ client* create_client(DestKeepAliveState *rs) if (OB_SUCC(ret)) { c->status_ = CONNECTING; update_write_ts(rs); - if (AF_INET == addr.ss_family) { - struct sockaddr_in self_addr; - socklen_t len = sizeof(self_addr); - if (0 == getsockname(c->fd_, (struct sockaddr *)&self_addr, &len)) { - struct sockaddr_in *dst_addr = (struct sockaddr_in *)(&addr); - if (self_addr.sin_port == dst_addr->sin_port && self_addr.sin_addr.s_addr == dst_addr->sin_addr.s_addr) { - char str[INET_ADDRSTRLEN]; - ret = OB_IO_ERROR; - _LOG_WARN("connection to %s failed, self connect self", inet_ntop(AF_INET, (const void*)(&addr), str, sizeof(str))); - } else { - _LOG_DEBUG("connection local_port: %d, fd: %d", ntohs(self_addr.sin_port), c->fd_); - } - } else { + struct sockaddr_storage self_addr; + socklen_t len = sizeof(self_addr); + if (0 == getsockname(c->fd_, (struct sockaddr *)&self_addr, &len)) { + char str[128]; + const char *addr_str = sockaddr_to_str_c(&self_addr, str, sizeof(str)); + if (0 == sockaddr_compare_c(&self_addr, &addr)) { ret = OB_IO_ERROR; - _LOG_WARN("getsockname failed: fd:%d, errno:%d", c->fd_, errno); + _LOG_WARN("connection to %s failed, self connect self", addr_str); + } else { + _LOG_DEBUG("connection local_addr: %s, fd: %d", addr_str, c->fd_); } + } else { + ret = OB_IO_ERROR; + _LOG_WARN("getsockname failed: fd:%d, errno:%d", c->fd_, errno); } } diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h index 27dcff1603..b1501f4719 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_proxy.h @@ -132,7 +132,7 @@ public: int64_t resp_sz = 0; ObRpcPacket resp_pkt; ObRpcResultCode rcode; - sockaddr_in sock_addr; + sockaddr_storage sock_addr; uint8_t thread_id = balance_assign_tidx(); uint64_t pnio_group_id = ObPocRpcServer::DEFAULT_PNIO_GROUP; // TODO:@fangwu.lcc map proxy.group_id_ to pnio_group_id @@ -155,7 +155,7 @@ public: &cb }; cb.gtid_ = (pnio_group_id<<32) + thread_id; - if (0 != (sys_err = pn_send((pnio_group_id<<32) + thread_id, obaddr2sockaddr(&sock_addr, addr), &pkt, &cb.pkt_id_))) { + if (0 != (sys_err = pn_send((pnio_group_id<<32) + thread_id, addr.to_sockaddr(&sock_addr), &pkt, &cb.pkt_id_))) { ret = translate_io_error(sys_err); RPC_LOG(WARN, "pn_send fail", K(sys_err), K(addr), K(pcode)); } @@ -244,7 +244,7 @@ public: IGNORE_RETURN snprintf(rpc_timeguard_str, sizeof(rpc_timeguard_str), "sz=%ld,pcode=%x,id=%ld", req_sz, pcode, src_tenant_id); timeguard.click(rpc_timeguard_str); if (OB_SUCC(ret)) { - sockaddr_in sock_addr; + sockaddr_storage sock_addr; const pn_pkt_t pkt = { req, req_sz, @@ -255,7 +255,7 @@ public: }; if (0 != (sys_err = pn_send( (pnio_group_id<<32) + thread_id, - obaddr2sockaddr(&sock_addr, addr), + addr.to_sockaddr(&sock_addr), &pkt, pkt_id_ptr))) { ret = translate_io_error(sys_err); @@ -275,15 +275,6 @@ public: return ret; } - static struct sockaddr_in* obaddr2sockaddr(struct sockaddr_in *sin, const ObAddr& addr) - { - if (NULL != sin) { - sin->sin_port = (uint16_t)htons((uint16_t)(addr.get_port())); - sin->sin_addr.s_addr = htonl(addr.get_ipv4()); - sin->sin_family = AF_INET; - } - return sin; - } int log_user_error_and_warn(const ObRpcResultCode &rcode) const; }; diff --git a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp index cb82ff7b17..69e17cf658 100644 --- a/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_poc_rpc_server.cpp @@ -197,13 +197,7 @@ void ObPocServerHandleContext::set_peer_unsafe() { struct sockaddr_storage sock_addr; if (0 == pn_get_peer(resp_id_, &sock_addr)) { - if (AF_INET == sock_addr.ss_family) { - struct sockaddr_in *sin = reinterpret_cast(&sock_addr); - peer_.set_ipv4_addr(ntohl(sin->sin_addr.s_addr), ntohs(sin->sin_port)); - } else if (AF_INET6 == sock_addr.ss_family) { - struct sockaddr_in6 *sin6 = reinterpret_cast(&sock_addr); - peer_.set_ipv6_addr(&sin6->sin6_addr.s6_addr, ntohs(sin6->sin6_port)); - } + peer_.from_sockaddr(&sock_addr); } } diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_endec.h b/deps/oblib/src/rpc/obrpc/ob_rpc_endec.h index add5cab62b..18617b1ffc 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_endec.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_endec.h @@ -16,6 +16,7 @@ #include "rpc/obrpc/ob_rpc_mem_pool.h" #include "rpc/obrpc/ob_rpc_packet.h" #include "rpc/obrpc/ob_rpc_result_code.h" +#include "lib/compress/ob_compressor_pool.h" namespace oceanbase { @@ -84,7 +85,7 @@ template K(extra_payload_size), K(pcode)); } else { const common::ObCompressorType &compressor_type = get_proxy_compressor_type(proxy); - bool need_compressed = ObCompressorPool::get_instance().need_common_compress(compressor_type); + bool need_compressed = common::ObCompressorPool::get_instance().need_common_compress(compressor_type); if (need_compressed) { // compress EVENT_INC(RPC_COMPRESS_ORIGINAL_PACKET_CNT); @@ -94,7 +95,7 @@ template char *compressed_buf = NULL; int64_t dst_data_size = 0; int64_t max_overflow_size = 0; - if (OB_FAIL(ObCompressorPool::get_instance().get_compressor(compressor_type, compressor))) { + if (OB_FAIL(common::ObCompressorPool::get_instance().get_compressor(compressor_type, compressor))) { RPC_OBRPC_LOG(WARN, "get_compressor failed", K(ret), K(compressor_type)); } else if (OB_FAIL(compressor->get_max_overflow_size(payload_sz, max_overflow_size))) { RPC_OBRPC_LOG(WARN, "get_max_overflow_size failed", K(ret), K(payload_sz), K(max_overflow_size)); diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp index 68bd07c484..3307062130 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_proxy.ipp @@ -60,7 +60,7 @@ int SSHandle::get_more(typename pcodeStruct::Response &result) int64_t pnio_req_sz = 0, resp_sz = 0; const char* resp = NULL; ObRpcPacket resp_pkt; - sockaddr_in sock_addr; + sockaddr_storage sock_addr; uint8_t thread_id = ObPocClientStub::balance_assign_tidx(); uint64_t pnio_group_id = ObPocRpcServer::DEFAULT_PNIO_GROUP; int pn_err = 0; @@ -82,7 +82,7 @@ int SSHandle::get_more(typename pcodeStruct::Response &result) &cb }; cb.gtid_ = (pnio_group_id<<32) + thread_id; - if (0 != (pn_err = pn_send((pnio_group_id<<32) + thread_id, ObPocClientStub::obaddr2sockaddr(&sock_addr, dst_), &pkt, &cb.pkt_id_))) { + if (0 != (pn_err = pn_send((pnio_group_id<<32) + thread_id, dst_.to_sockaddr(&sock_addr), &pkt, &cb.pkt_id_))) { ret = ObPocClientStub::translate_io_error(pn_err); RPC_LOG(WARN, "pnio post fail", K(pn_err)); } @@ -201,7 +201,7 @@ int SSHandle::abort() int64_t pnio_req_sz = 0, resp_sz = 0; const char* resp = NULL; ObRpcPacket resp_pkt; - sockaddr_in sock_addr; + sockaddr_storage sock_addr; uint8_t thread_id = ObPocClientStub::balance_assign_tidx(); uint64_t pnio_group_id = ObPocRpcServer::DEFAULT_PNIO_GROUP; int pn_err = 0; @@ -223,7 +223,7 @@ int SSHandle::abort() &cb }; cb.gtid_ = (pnio_group_id<<32) + thread_id; - if (0 != (pn_err = pn_send((pnio_group_id<<32) + thread_id, ObPocClientStub::obaddr2sockaddr(&sock_addr, dst_), &pkt, &cb.pkt_id_))) { + if (0 != (pn_err = pn_send((pnio_group_id<<32) + thread_id, dst_.to_sockaddr(&sock_addr), &pkt, &cb.pkt_id_))) { ret = ObPocClientStub::translate_io_error(pn_err); RPC_LOG(WARN, "pnio post fail", K(pn_err)); } diff --git a/deps/oblib/src/rpc/pnio/ds/hash_map.c b/deps/oblib/src/rpc/pnio/ds/hash_map.c index a1c103e7cb..518da197bb 100644 --- a/deps/oblib/src/rpc/pnio/ds/hash_map.c +++ b/deps/oblib/src/rpc/pnio/ds/hash_map.c @@ -10,16 +10,15 @@ * See the Mulan PubL v2 for more details. */ -hash_t* hash_create(int64_t capacity) { - int64_t alloc_size = sizeof(hash_t) + capacity * sizeof(link_t); - hash_t* p = (hash_t*)malloc(alloc_size); - hash_init(p, capacity); - return p; -} - -void hash_init(hash_t* h, int64_t capacity) { +void hash_init(hash_t* h, int64_t capacity, + _key_func key_func, + _hash_func hash_func, + _equal_func equal_func) { h->capacity = capacity; memset(&h->table, 0, sizeof(link_t) * capacity); + h->key_func = key_func; + h->hash_func = hash_func; + h->equal_func = equal_func; } static uint64_t __hash_calc(link_t* k) { return str_hash((str_t*)(k + 1)); } diff --git a/deps/oblib/src/rpc/pnio/ds/hash_map.h b/deps/oblib/src/rpc/pnio/ds/hash_map.h index 8ddb7c3ffa..05c4ed0647 100644 --- a/deps/oblib/src/rpc/pnio/ds/hash_map.h +++ b/deps/oblib/src/rpc/pnio/ds/hash_map.h @@ -10,13 +10,22 @@ * See the Mulan PubL v2 for more details. */ +typedef void* (*_key_func)(link_t* link); +typedef uint64_t (*_hash_func)(void* key); +typedef bool (*_equal_func)(void* l_key, void *r_key); + typedef struct hash_t { + _key_func key_func; + _hash_func hash_func; + _equal_func equal_func; int64_t capacity; link_t table[0]; } hash_t; -extern hash_t* hash_create(int64_t capacity); -extern void hash_init(hash_t* h, int64_t capacity); +extern void hash_init(hash_t* h, int64_t capacity, + _key_func key_func, + _hash_func hash_func, + _equal_func equal_func); extern link_t* hash_insert(hash_t* map, link_t* k); extern link_t* hash_del(hash_t* map, str_t* k); extern link_t* hash_get(hash_t* map, str_t* k); diff --git a/deps/oblib/src/rpc/pnio/ds/ihash_map.c b/deps/oblib/src/rpc/pnio/ds/ihash_map.c index af6036ee6f..6eb97a1bdc 100644 --- a/deps/oblib/src/rpc/pnio/ds/ihash_map.c +++ b/deps/oblib/src/rpc/pnio/ds/ihash_map.c @@ -10,12 +10,15 @@ * See the Mulan PubL v2 for more details. */ -static uint64_t __ihash_calc(uint64_t k) { return fasthash64(&k, sizeof(k), 0); } -static link_t* __ihash_locate(hash_t* map, uint64_t k) { return &map->table[__ihash_calc(k) % map->capacity]; } -static uint64_t __ihash_key(link_t* l) { return *(uint64_t*)(l + 1); } -static link_t* __ihash_list_search(link_t* start, uint64_t k, link_t** prev) { +static link_t* __ihash_locate(hash_t* map, void* key) +{ + return &map->table[map->hash_func(key) % map->capacity]; +} + +static link_t* __ihash_list_search(hash_t* map, void* key, link_t** prev) { + link_t* start = __ihash_locate(map, key); link_t* p = start; - while(p->next != NULL && __ihash_key(p->next) != k) { + while(p->next != NULL && !map->equal_func(map->key_func(p->next), key)) { p = p->next; } if (NULL != prev) { @@ -26,8 +29,8 @@ static link_t* __ihash_list_search(link_t* start, uint64_t k, link_t** prev) { link_t* ihash_insert(hash_t* map, link_t* klink) { link_t* prev = NULL; - uint64_t k = __ihash_key(klink); - if(!__ihash_list_search(__ihash_locate(map, k), k, &prev)) { + void* key = map->key_func(klink); + if(!__ihash_list_search(map, key, &prev)) { link_insert(prev, klink); } else { klink = NULL; @@ -35,15 +38,15 @@ link_t* ihash_insert(hash_t* map, link_t* klink) { return klink; } -link_t* ihash_del(hash_t* map, uint64_t k) { +link_t* ihash_del(hash_t* map, void* key) { link_t* ret = NULL; link_t* prev = NULL; - if((ret = __ihash_list_search(__ihash_locate(map, k), k, &prev))) { + if((ret = __ihash_list_search(map, key, &prev))) { link_delete(prev); } return ret; } -link_t* ihash_get(hash_t* map, uint64_t k) { - return __ihash_list_search(__ihash_locate(map, k), k, NULL); +link_t* ihash_get(hash_t* map, void* key) { + return __ihash_list_search(map, key, NULL); } diff --git a/deps/oblib/src/rpc/pnio/ds/ihash_map.h b/deps/oblib/src/rpc/pnio/ds/ihash_map.h index 5768a274cb..6e5f525b37 100644 --- a/deps/oblib/src/rpc/pnio/ds/ihash_map.h +++ b/deps/oblib/src/rpc/pnio/ds/ihash_map.h @@ -10,6 +10,6 @@ * See the Mulan PubL v2 for more details. */ -extern link_t* ihash_insert(hash_t* map, link_t* k); -extern link_t* ihash_del(hash_t* map, uint64_t k); -extern link_t* ihash_get(hash_t* map, uint64_t k); +extern link_t* ihash_insert(hash_t* map, link_t* klink); +extern link_t* ihash_del(hash_t* map, void* key); +extern link_t* ihash_get(hash_t* map, void* key); diff --git a/deps/oblib/src/rpc/pnio/interface/group.c b/deps/oblib/src/rpc/pnio/interface/group.c index 0d92c09ed5..7a3dc88e16 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.c +++ b/deps/oblib/src/rpc/pnio/interface/group.c @@ -49,6 +49,7 @@ typedef struct pn_t static int next_pn_listen_idx; static pn_listen_t pn_listen_array[MAX_PN_LISTEN]; static pn_grp_t* pn_grp_array[MAX_PN_GRP]; +static int pn_has_listened = 0; int64_t pnio_keepalive_timeout; PN_API int64_t pn_set_keepalive_timeout(int64_t user_timeout) { if (user_timeout >= 0) { @@ -88,19 +89,27 @@ static void* pn_thread_func(void* arg) } static int pnl_dispatch_accept(int fd, const void* b, int sz); +extern bool is_support_ipv6_c(); PN_API int pn_listen(int port, serve_cb_t cb) { int idx = FAA(&next_pn_listen_idx, 1); pn_listen_t* pnl = locate_listen(idx); addr_t addr; + addr_t addr6; addr_init(&addr, "0.0.0.0", port); - - if (listen_create(addr) <= 0) { - idx = -1; - } else { - pnl->serve_cb = cb; + if (is_support_ipv6_c()) { + addr_init(&addr6, "::", port); } + if (ATOMIC_BCAS(&pn_has_listened, 0, 1)) { + if (listen_create(addr) <= 0 || + (is_support_ipv6_c() && listen_create(addr6) <= 0)) { + idx = -1; + ATOMIC_STORE(&pn_has_listened, 0); + } else { + pnl->serve_cb = cb; + } + } return idx; } @@ -379,7 +388,8 @@ static pn_t* get_pn_for_send(pn_grp_t* pgrp, int tid) return pgrp->pn_array[tid % pgrp->count]; } -PN_API int pn_send(uint64_t gtid, struct sockaddr_in* addr, const pn_pkt_t* pkt, uint32_t* pkt_id_ret) +extern bool is_valid_sockaddr_c(struct sockaddr_storage *sock_addr); +PN_API int pn_send(uint64_t gtid, struct sockaddr_storage* sock_addr, const pn_pkt_t* pkt, uint32_t* pkt_id_ret) { int err = 0; const char* buf = pkt->buf; @@ -390,11 +400,12 @@ PN_API int pn_send(uint64_t gtid, struct sockaddr_in* addr, const pn_pkt_t* pkt, pn_grp_t* pgrp = locate_grp(gtid>>32); pn_t* pn = get_pn_for_send(pgrp, gtid & 0xffffffff); - addr_t dest = {.ip=addr->sin_addr.s_addr, .port=htons(addr->sin_port), .tid=0}; + addr_t dest; + sockaddr_to_addr(sock_addr, &dest); uint32_t pkt_id = gen_pkt_id(); - if (addr->sin_addr.s_addr == 0 || htons(addr->sin_port) == 0) { + if (!is_valid_sockaddr_c(sock_addr)) { err = -EINVAL; - rk_warn("invalid sin_addr: %x:%d", addr->sin_addr.s_addr, addr->sin_port); + rk_warn("invalid sin_addr"); } else if (expire_us < 0) { err = -EINVAL; rk_error("invalid rpc timeout: %ld, it might be that the up-layer rpc timeout is too large, categ_id=%d", expire_us, categ_id); @@ -571,10 +582,7 @@ PN_API int pn_get_peer(uint64_t req_id, struct sockaddr_storage* addr) { err = -EINVAL; rk_warn("idm_get sock failed, sock_id=%lx", ctx->sock_id); } else { - struct sockaddr_in* sin = (typeof(sin))addr; - sin->sin_family = AF_INET; - sin->sin_addr.s_addr = sock->peer.ip; - sin->sin_port = htons(sock->peer.port); + make_sockaddr(addr, sock->peer); } } return err; diff --git a/deps/oblib/src/rpc/pnio/interface/group.h b/deps/oblib/src/rpc/pnio/interface/group.h index cee719f5c0..be5f5ad2d3 100644 --- a/deps/oblib/src/rpc/pnio/interface/group.h +++ b/deps/oblib/src/rpc/pnio/interface/group.h @@ -79,7 +79,7 @@ PN_API int pn_listen(int port, serve_cb_t cb); // make sure grp != 0 PN_API int pn_provision(int listen_id, int grp, int thread_count); // gid_tid = (gid<<8) | tid -PN_API int pn_send(uint64_t gtid, struct sockaddr_in* addr, const pn_pkt_t* pkt, uint32_t* pkt_id_ret); +PN_API int pn_send(uint64_t gtid, struct sockaddr_storage* sock_addr, const pn_pkt_t* pkt, uint32_t* pkt_id_ret); PN_API int pn_resp(uint64_t req_id, const char* buf, int64_t sz, int64_t resp_expired_abs_us); PN_API int pn_get_peer(uint64_t req_id, struct sockaddr_storage* addr); PN_API int pn_ratelimit(int grp_id, int64_t value); diff --git a/deps/oblib/src/rpc/pnio/io/time_wheel.c b/deps/oblib/src/rpc/pnio/io/time_wheel.c index b1217d11c7..7e8304afd3 100644 --- a/deps/oblib/src/rpc/pnio/io/time_wheel.c +++ b/deps/oblib/src/rpc/pnio/io/time_wheel.c @@ -65,8 +65,8 @@ void keepalive_check(pktc_t* io) { // walks through pktc_t skmap, refresh tcp keepalive params and check server keepalive dlink_for(&io->sk_list, p) { pktc_sk_t *sk = structof(p, pktc_sk_t, list_link); - struct sockaddr_in sin; - if (sk->conn_ok && SERVER_IN_BLACK((struct sockaddr*)make_sockaddr(&sin, sk->dest))) { + struct sockaddr_storage sock_addr; + if (sk->conn_ok && SERVER_IN_BLACK((struct sockaddr*)make_sockaddr(&sock_addr, sk->dest))) { // mark the socket as waiting for destroy rk_info("socket dest server in blacklist, it will be destroyed, sock=(ptr=%p,dest=%d:%d)", sk, sk->dest.ip, sk->dest.port); sk->mask |= EPOLLERR; diff --git a/deps/oblib/src/rpc/pnio/nio/addr.c b/deps/oblib/src/rpc/pnio/nio/addr.c index ccdb8afd06..a729088b8f 100644 --- a/deps/oblib/src/rpc/pnio/nio/addr.c +++ b/deps/oblib/src/rpc/pnio/nio/addr.c @@ -9,43 +9,35 @@ * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ +extern bool straddr_to_addr_c(const char *ip_str, bool *is_ipv6, void *ip); +extern void sockaddr_to_addr_c(struct sockaddr_storage *sock_addr, bool *is_ipv6, void *ip, int *port); +extern char *sockaddr_to_str_c(struct sockaddr_storage *sock_addr, char *buf, int len); +extern struct sockaddr_storage* make_unix_sockaddr_c(bool is_ipv6, void *ip, int port, struct sockaddr_storage *sock_addr); const char* addr_str(format_t* f, addr_t addr) { - char buf[18]; - return format_sf(f, "%s:%hu", - inet_ntop(AF_INET, (struct in_addr*)(&addr.ip), buf, sizeof(buf)), addr.port); -} - -addr_t addr_build(const char* ip, int port) { - addr_t addr; - return *addr_init(&addr, ip, port); + char buf[INET6_ADDRSTRLEN + 6]; + struct sockaddr_storage sock_addr; + return format_sf(f, "%s", sockaddr_to_str_c(make_sockaddr(&sock_addr, addr), buf, sizeof(buf))); } addr_t* addr_init(addr_t* addr, const char* ip, int port) { - *addr = (addr_t){inet_addr(ip), (uint16_t)port, 0}; - return addr; -} - -addr_t* addr_set(addr_t* addr, uint32_t ip, uint16_t port, uint16_t id) { - addr->ip = ip; + memset(addr, 0, sizeof(*addr)); + straddr_to_addr_c(ip, &addr->is_ipv6, &addr->ip); addr->port = port; - addr->tid = id; return addr; } void addr_reset(addr_t* addr) { - addr_set(addr, 0, 0, 0); + memset(addr, 0, sizeof(*addr)); } addr_t get_remote_addr(int fd) { addr_t addr; - struct sockaddr_in sa; - socklen_t sa_len = sizeof(sa); - if (0 == getpeername(fd, (struct sockaddr*)&sa, &sa_len)) { - int ip = sa.sin_addr.s_addr; - int port = (int)ntohs(sa.sin_port); - addr_set(&addr, ip, (uint16_t)port, 0); + struct sockaddr_storage sock_addr; + socklen_t addr_len = sizeof(sock_addr); + if (0 == getpeername(fd, (struct sockaddr*)&sock_addr, &addr_len)) { + sockaddr_to_addr_c(&sock_addr, &addr.is_ipv6, &addr.ip, (int*)&addr.port); } else { addr_reset(&addr); } @@ -54,27 +46,28 @@ addr_t get_remote_addr(int fd) { addr_t get_local_addr(int fd) { addr_t addr; - struct sockaddr_in sa; - socklen_t sa_len = sizeof(sa); - if (0 == getsockname(fd, (struct sockaddr*)&sa, &sa_len)) { - int ip = sa.sin_addr.s_addr; - int port = (int)ntohs(sa.sin_port); - addr_set(&addr, ip, (uint16_t)port, 0); + struct sockaddr_storage sock_addr; + socklen_t addr_len = sizeof(sock_addr); + if (0 == getpeername(fd, (struct sockaddr*)&sock_addr, &addr_len)) { + sockaddr_to_addr_c(&sock_addr, &addr.is_ipv6, &addr.ip, (int*)&addr.port); } else { addr_reset(&addr); } return addr; } -static struct sockaddr_in* rk_make_unix_sockaddr(struct sockaddr_in *sin, in_addr_t ip, int port) { - if (NULL != sin) { - sin->sin_port = (uint16_t)htons((uint16_t)port); - sin->sin_addr.s_addr = ip; - sin->sin_family = AF_INET; - } - return sin; +static struct sockaddr_storage* rk_make_unix_sockaddr(struct sockaddr_storage *sock_addr, addr_t addr) { + make_unix_sockaddr_c(addr.is_ipv6, &addr.ip, addr.port, sock_addr); + return sock_addr; } -struct sockaddr_in* make_sockaddr(struct sockaddr_in* sin, addr_t addr) { - return rk_make_unix_sockaddr(sin, addr.ip, addr.port); +struct sockaddr_storage* make_sockaddr(struct sockaddr_storage* sock_addr, addr_t addr) { + return rk_make_unix_sockaddr(sock_addr, addr); +} + +addr_t *sockaddr_to_addr(struct sockaddr_storage *sock_addr, addr_t *addr) +{ + memset(addr, 0, sizeof(*addr)); + sockaddr_to_addr_c(sock_addr, &addr->is_ipv6, &addr->ip, (int*)&addr->port); + return addr; } diff --git a/deps/oblib/src/rpc/pnio/nio/addr.h b/deps/oblib/src/rpc/pnio/nio/addr.h index 65e9860e97..b0f10be1af 100644 --- a/deps/oblib/src/rpc/pnio/nio/addr.h +++ b/deps/oblib/src/rpc/pnio/nio/addr.h @@ -15,15 +15,18 @@ #include typedef struct addr_t { - uint32_t ip; + bool is_ipv6; + union { + uint32_t ip; + uint8_t ipv6[16]; + }; uint16_t port; uint16_t tid; } addr_t; extern const char* addr_str(format_t* f, addr_t addr); -extern addr_t addr_build(const char* ip, int port); extern addr_t* addr_init(addr_t* addr, const char* ip, int port); -extern addr_t* addr_set(addr_t* addr, uint32_t ip, uint16_t port, uint16_t tid); -extern struct sockaddr_in* make_sockaddr(struct sockaddr_in *sin, addr_t addr); +extern struct sockaddr_storage* make_sockaddr(struct sockaddr_storage *sock_addr, addr_t addr); extern addr_t get_remote_addr(int fd); extern addr_t get_local_addr(int fd); +extern addr_t* sockaddr_to_addr(struct sockaddr_storage *sock_addr, addr_t *addr); diff --git a/deps/oblib/src/rpc/pnio/nio/inet.c b/deps/oblib/src/rpc/pnio/nio/inet.c index 3a4abab8ea..40af58215c 100644 --- a/deps/oblib/src/rpc/pnio/nio/inet.c +++ b/deps/oblib/src/rpc/pnio/nio/inet.c @@ -28,19 +28,19 @@ int check_connect_result(int fd) { int async_connect(addr_t dest, uint64_t dispatch_id) { int fd = -1; - struct sockaddr_in sin; + struct sockaddr_storage sock_addr; const int ssl_ctx_id = 0; socklen_t ssl_ctx_id_len = sizeof(ssl_ctx_id); socklen_t dispatch_id_len = sizeof(dispatch_id); int send_negotiation_flag = 1; socklen_t send_negotiation_len = sizeof(send_negotiation_flag); - ef((fd = socket(AF_INET, SOCK_STREAM, 0)) < 0); + ef((fd = socket(!dest.is_ipv6 ? AF_INET : AF_INET6, SOCK_STREAM, 0)) < 0); ef(make_fd_nonblocking(fd)); set_tcpopt(fd, TCP_SYNCNT, PNIO_TCP_SYNCNT); ef(ussl_setsockopt(fd, SOL_OB_SOCKET, SO_OB_SET_CLIENT_GID, &dispatch_id, dispatch_id_len)); ef(ussl_setsockopt(fd, SOL_OB_SOCKET, SO_OB_SET_CLIENT_SSL_CTX_ID, &ssl_ctx_id, ssl_ctx_id_len)); ef(ussl_setsockopt(fd, SOL_OB_SOCKET, SO_OB_SET_SEND_NEGOTIATION_FLAG, &send_negotiation_flag, send_negotiation_len)); - ef(ussl_connect(fd, (struct sockaddr*)make_sockaddr(&sin, dest), sizeof(sin)) < 0 && EINPROGRESS != errno); + ef(ussl_connect(fd, make_sockaddr(&sock_addr, dest), sizeof(sock_addr)) < 0 && EINPROGRESS != errno); set_tcp_nodelay(fd); return fd; el(); @@ -53,14 +53,22 @@ int async_connect(addr_t dest, uint64_t dispatch_id) { int listen_create(addr_t src) { int fd = -1; int err = 0; - struct sockaddr_in sin; - if ((fd = socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0)) < 0) { + struct sockaddr_storage sock_addr; + int ipv6_only_on = 1; /* Disable IPv4-mapped IPv6 addresses */ + if ((fd = socket(!src.is_ipv6 ? AF_INET : AF_INET6, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0)) < 0) { rk_warn("create socket failed, src=%s, errno=%d", T2S(addr, src), errno); err = PNIO_LISTEN_ERROR; } else if (set_tcp_reuse_addr(fd) != 0) { err = PNIO_LISTEN_ERROR; rk_warn("reuse_addr failed, src=%s, fd=%d, errno=%d", T2S(addr, src), fd, errno); - } else if (bind(fd, (const struct sockaddr*)make_sockaddr(&sin, src), sizeof(sin)) != 0) { + } else if (set_tcp_reuse_port(fd) != 0) { + err = PNIO_LISTEN_ERROR; + rk_warn("reuse_port failed, src=%s, fd=%d, errno=%d", T2S(addr, src), fd, errno); + } else if (src.is_ipv6 && + ussl_setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &ipv6_only_on, sizeof(ipv6_only_on)) != 0) { + err = PNIO_LISTEN_ERROR; + rk_warn("set sock opt IPV6_V6ONLY failed, src=%s, fd=%d, errno=%d", T2S(addr, src), fd, errno); + } else if (bind(fd, (const struct sockaddr*)make_sockaddr(&sock_addr, src), sizeof(sock_addr)) != 0) { err = PNIO_LISTEN_ERROR; rk_warn("bind failed, src=%s, fd=%d, errno=%d", T2S(addr, src), fd, errno); } else if (ussl_listen(fd, 1024) != 0) { diff --git a/deps/oblib/src/rpc/pnio/nio/packet_client.c b/deps/oblib/src/rpc/pnio/nio/packet_client.c index a568c7d420..0b8f1fa02e 100644 --- a/deps/oblib/src/rpc/pnio/nio/packet_client.c +++ b/deps/oblib/src/rpc/pnio/nio/packet_client.c @@ -44,6 +44,60 @@ static int pktc_wq_flush(sock_t* s, write_queue_t* wq, dlink_t** old_head) { #include "pktc_sk_factory.h" #include "pktc_post.h" +void* sk_key_func(link_t* link) +{ + pktc_sk_t *sk = structof(link, pktc_sk_t, hash); + return &sk->dest; +} + +uint64_t sk_hash_func(void* key) +{ + addr_t* addr = (addr_t*)key; + uint64_t hash = addr->port; + if (!addr->is_ipv6) { + hash += addr->ip; + } else { + for (int i = 0; i < sizeof(addr->ipv6)/sizeof(addr->ipv6[0]); i++) { + hash += addr->ipv6[i]; + } + } + return hash; +} + +bool sk_equal_func(void* l_key, void* l_right) +{ + addr_t *l = (addr_t*)l_key; + addr_t *r = (addr_t*)l_right; + bool eq = true; + if (l->is_ipv6 != r->is_ipv6) { + eq = false; + } else if (!l->is_ipv6) { + eq = l->ip == r->ip; + } else { + for (int i = 0; eq && i < sizeof(l->ipv6)/sizeof(l->ipv6[0]); i++) { + eq = l->ipv6[i] == r->ipv6[i]; + } + } + return eq; +} + +void* cb_key_func(link_t* link) +{ + pktc_cb_t *cb = structof(link, pktc_cb_t, hash_link); + return &cb->id; +} + +uint64_t cb_hash_func(void* key) +{ + uint64_t id = *(uint64_t*)key; + return fasthash64(&id, sizeof(id), 0); +} + +bool cb_equal_func(void* l_key, void* l_right) +{ + return *(uint64_t*)l_key == *(uint64_t*)l_right; +} + int64_t pktc_init(pktc_t* io, eloop_t* ep, uint64_t dispatch_id) { int err = 0; io->ep = ep; @@ -53,8 +107,8 @@ int64_t pktc_init(pktc_t* io, eloop_t* ep, uint64_t dispatch_id) { sc_queue_init(&io->req_queue); ef(err = timerfd_init_tw(io->ep, &io->cb_timerfd)); tw_init(&io->cb_tw, pktc_resp_cb_on_timeout); - hash_init(&io->sk_map, arrlen(io->sk_table)); - hash_init(&io->cb_map, arrlen(io->cb_table)); + hash_init(&io->sk_map, arrlen(io->sk_table), sk_key_func, sk_hash_func, sk_equal_func); + hash_init(&io->cb_map, arrlen(io->cb_table), cb_key_func, cb_hash_func, cb_equal_func); dlink_init(&io->sk_list); rk_info("pktc init succ"); el(); diff --git a/deps/oblib/src/rpc/pnio/nio/pktc_post.h b/deps/oblib/src/rpc/pnio/nio/pktc_post.h index 09bb62ec47..085ad91e1a 100644 --- a/deps/oblib/src/rpc/pnio/nio/pktc_post.h +++ b/deps/oblib/src/rpc/pnio/nio/pktc_post.h @@ -29,7 +29,7 @@ static pktc_sk_t* pktc_do_connect(pktc_t* cl, addr_t dest) { static pktc_sk_t* pktc_try_connect(pktc_t* cl, addr_t dest) { pktc_sk_t* sk = NULL; - link_t* sk_link = ihash_get(&cl->sk_map, *(uint64_t*)&dest); + link_t* sk_link = ihash_get(&cl->sk_map, &dest); if (sk_link) { sk = structof(sk_link, pktc_sk_t, hash); } else { diff --git a/deps/oblib/src/rpc/pnio/nio/pktc_resp.h b/deps/oblib/src/rpc/pnio/nio/pktc_resp.h index 4854ab5582..6729558727 100644 --- a/deps/oblib/src/rpc/pnio/nio/pktc_resp.h +++ b/deps/oblib/src/rpc/pnio/nio/pktc_resp.h @@ -36,7 +36,7 @@ static void pktc_do_cb_exception(pktc_t* io, pktc_cb_t* cb) { static void pktc_resp_cb_on_sk_destroy(pktc_t* io, pktc_sk_t* s) { dlink_for(&s->cb_head, p) { pktc_cb_t* cb = structof(p, pktc_cb_t, sk_dlink); - ihash_del(&io->cb_map, cb->id); + ihash_del(&io->cb_map, &cb->id); dlink_delete(&cb->timer_dlink); rk_info("resp_cb on sk_destroy: packet_id=%lu s=%p", cb->id, s); cb->errcode = PNIO_DISCONNECT; @@ -47,7 +47,7 @@ static void pktc_resp_cb_on_sk_destroy(pktc_t* io, pktc_sk_t* s) { static void pktc_resp_cb_on_timeout(time_wheel_t* tw, dlink_t* l) { pktc_cb_t* cb = structof(l, pktc_cb_t, timer_dlink); pktc_t* io = structof(tw, pktc_t, cb_tw); - ihash_del(&io->cb_map, cb->id); + ihash_del(&io->cb_map, &cb->id); dlink_delete(&cb->sk_dlink); rk_debug("resp_cb on timeout: packet_id=%lu expire_us=%ld", cb->id, cb->expire_us); cb->errcode = PNIO_TIMEOUT; @@ -60,7 +60,7 @@ static void pktc_resp_cb_on_post_fail(pktc_t* io, pktc_cb_t* cb) { static void pktc_resp_cb_on_msg(pktc_t* io, pktc_msg_t* msg) { uint64_t id = pktc_get_id(msg); - link_t* hlink = ihash_del(&io->cb_map, id); + link_t* hlink = ihash_del(&io->cb_map, &id); if (hlink) { pktc_cb_t* cb = structof(hlink, pktc_cb_t, hash_link); dlink_delete(&cb->timer_dlink); @@ -72,7 +72,7 @@ static void pktc_resp_cb_on_msg(pktc_t* io, pktc_msg_t* msg) { } static void pktc_resp_cb_on_terminate(pktc_t* io, uint32_t id) { - link_t* hlink = ihash_del(&io->cb_map, id); + link_t* hlink = ihash_del(&io->cb_map, &id); if (hlink) { pktc_cb_t* cb = structof(hlink, pktc_cb_t, hash_link); dlink_delete(&cb->timer_dlink); diff --git a/deps/oblib/src/rpc/pnio/nio/pktc_sk_factory.h b/deps/oblib/src/rpc/pnio/nio/pktc_sk_factory.h index 43871a4cea..7bd4c70352 100644 --- a/deps/oblib/src/rpc/pnio/nio/pktc_sk_factory.h +++ b/deps/oblib/src/rpc/pnio/nio/pktc_sk_factory.h @@ -60,7 +60,7 @@ static int pktc_sk_init(pktc_sf_t* sf, pktc_sk_t* s) { static void pktc_sk_destroy(pktc_sf_t* sf, pktc_sk_t* s) { pktc_t* pc = structof(sf, pktc_t, sf); if (s) { - ihash_del(&pc->sk_map, *(uint64_t*)&s->dest); + ihash_del(&pc->sk_map, &s->dest); dlink_delete(&s->list_link); } } diff --git a/deps/oblib/unittest/lib/net/test_ob_addr.cpp b/deps/oblib/unittest/lib/net/test_ob_addr.cpp index c5b718dd30..823389db46 100644 --- a/deps/oblib/unittest/lib/net/test_ob_addr.cpp +++ b/deps/oblib/unittest/lib/net/test_ob_addr.cpp @@ -35,16 +35,10 @@ TEST(OB_ADDR, TEST1) EXPECT_EQ(addr.get_port(), 1234); EXPECT_EQ(addr.parse_from_cstring("1.0.0.1234:1234"), OB_INVALID_ARGUMENT); - EXPECT_FALSE(addr.is_valid()); - - ObAddr addr2; - EXPECT_LT(addr2, addr); - - addr2.set_port(1234); - EXPECT_EQ(addr2, addr); addr.set_ip_addr("0.0.0.1", 1); EXPECT_EQ(addr.get_ipv4(), 1U); + ObAddr addr2; addr2.set_ip_addr("1.0.0.0", 1); EXPECT_EQ(addr2.get_ipv4(), 1U << 24); EXPECT_LT(addr, addr2); @@ -72,7 +66,7 @@ TEST(OB_ADDR, TEST_UNIX_PATH) EXPECT_FALSE(addr.set_unix_addr(NULL)); EXPECT_FALSE(addr.is_valid()); - + char path0[] = ""; EXPECT_TRUE(addr.set_unix_addr(path0)); EXPECT_FALSE(addr.is_valid()); diff --git a/deps/ussl-hook/loop/handle-event.c b/deps/ussl-hook/loop/handle-event.c index fc7a7359ba..db98845e58 100644 --- a/deps/ussl-hook/loop/handle-event.c +++ b/deps/ussl-hook/loop/handle-event.c @@ -68,35 +68,7 @@ static void auth_type_to_str(int auth_type, char *buf, size_t len) } } -static void get_client_addr(int fd, char *buf, int len) -{ - struct sockaddr_storage addr; - socklen_t sock_len = sizeof(addr); - if (0 != getsockname(fd, (struct sockaddr *)&addr, &sock_len)) { - ussl_log_warn("getsockname failed, fd:%d, errno:%d", fd, errno); - } else { - char src_addr[INET6_ADDRSTRLEN]; - if (AF_INET == addr.ss_family) { - struct sockaddr_in *s = (struct sockaddr_in *)&addr; - if (NULL != inet_ntop(AF_INET, &s->sin_addr, src_addr, INET_ADDRSTRLEN)) { - if (snprintf(buf, len, "%s:%d", src_addr, ntohs(s->sin_port)) < 0) { - ussl_log_warn("snprintf failed, errno:%d", errno); - } - } else { - ussl_log_warn("call inet_ntop for AF_INET failed, errno:%d", errno); - } - } else { - struct sockaddr_in6 *s = (struct sockaddr_in6 *)&addr; - if (NULL != inet_ntop(AF_INET6, &s->sin6_addr, src_addr, INET6_ADDRSTRLEN)) { - if (snprintf(buf, len, "[%s]:%d", src_addr, ntohs(s->sin6_port)) < 0) { - ussl_log_warn("snprintf failed, errno:%d", errno); - } - } else { - ussl_log_warn("call inet_ntop for AF_INET6 failed, errno:%d", errno); - } - } - } -} +extern char *sockfd_to_str_c(int fd, char *buf, int len); static int is_local_ip_address(const char *addr) { @@ -150,7 +122,7 @@ static int handle_client_writable_event(ussl_sock_t *s) } else { // 4.add to timeout list (if needed) // succ log char client_addr[IP_STRING_MAX_LEN] = {0}; - get_client_addr(cs->fd, client_addr, IP_STRING_MAX_LEN); + sockfd_to_str_c(cs->fd, client_addr, IP_STRING_MAX_LEN); char auth_type[AUTH_TYPE_STRING_MAX_LEN] = {0}; auth_type_to_str(nego_msg.type, auth_type, AUTH_TYPE_STRING_MAX_LEN); ussl_log_info("client send negotiation message succ, fd:%d, addr:%s, auth_method:%s, gid:0x%lx", @@ -202,7 +174,7 @@ static int handle_client_readable_event(ussl_sock_t *s) int ret = EAGAIN; clientfd_sk_t *cs = (clientfd_sk_t *)s; char client_addr[IP_STRING_MAX_LEN] = {0}; - get_client_addr(cs->fd, client_addr, IP_STRING_MAX_LEN); + sockfd_to_str_c(cs->fd, client_addr, IP_STRING_MAX_LEN); char auth_type[AUTH_TYPE_STRING_MAX_LEN] = {0}; auth_type_to_str(cs->fd_info.auth_methods, auth_type, AUTH_TYPE_STRING_MAX_LEN); if (SEND_FIRST_NEGO_MESSAGE == cs->fd_info.stage) { @@ -514,4 +486,4 @@ int ussl_check_pcode_mismatch_connection(int fd, uint32_t pcode) !ob_is_bypass_pcode(pcode); } return ret; -} \ No newline at end of file +} diff --git a/deps/ussl-hook/loop/handle-event.h b/deps/ussl-hook/loop/handle-event.h index 1bad81d0d1..fb92a56b05 100644 --- a/deps/ussl-hook/loop/handle-event.h +++ b/deps/ussl-hook/loop/handle-event.h @@ -22,4 +22,4 @@ extern int is_net_keepalive_connection(ssize_t rbytes, char *buf); extern int ob_judge_is_tableapi_pcode_from_raw_packet(const char *buf, ssize_t data_len); extern int ob_is_bypass_pcode(uint32_t pcode); extern void ussl_reset_rpc_connection_type(int fd); -#endif // USSL_HOOK_LOOP_HANDLE_EVENT_ \ No newline at end of file +#endif // USSL_HOOK_LOOP_HANDLE_EVENT_ diff --git a/deps/ussl-hook/loop/ussl_listenfd.c b/deps/ussl-hook/loop/ussl_listenfd.c index e0f9ca20ac..87bb5ae690 100644 --- a/deps/ussl-hook/loop/ussl_listenfd.c +++ b/deps/ussl-hook/loop/ussl_listenfd.c @@ -105,4 +105,4 @@ int ussl_listenfd_init(ussl_eloop_t *ep, ussl_listenfd_t *s, ussl_sf_t *sf, int } } return ret; -} \ No newline at end of file +} diff --git a/deps/ussl-hook/ussl-hook.c b/deps/ussl-hook/ussl-hook.c index 98d8c9889f..712f605f3b 100644 --- a/deps/ussl-hook/ussl-hook.c +++ b/deps/ussl-hook/ussl-hook.c @@ -41,6 +41,9 @@ static int global_send_negotiation_arr[USSL_MAX_FD_NUM]; int is_ussl_bg_thread_started = 0; static int ussl_is_stopped = 0; +extern int sockaddr_compare_c(struct sockaddr_storage *left, struct sockaddr_storage *right); +extern char *sockaddr_to_str_c(struct sockaddr_storage *sock_addr, char *buf, int len); + static __attribute__((constructor(102))) void init_global_array() { for (int i = 0; i < USSL_MAX_FD_NUM; ++i) { @@ -65,29 +68,26 @@ static int ussl_is_pipe(int fd) return 0 == fstat(fd, &st) && S_ISFIFO(st.st_mode); } -int ussl_connect(int sockfd, const struct sockaddr *address, socklen_t address_len) +int ussl_connect(int sockfd, struct sockaddr_storage *address, socklen_t address_len) { int ret = 0; - if ((ret = libc_connect(sockfd, address, address_len)) < 0) { + if ((ret = libc_connect(sockfd, (struct sockaddr*)address, address_len)) < 0) { if (EINPROGRESS != errno) { ussl_clear_client_opt(sockfd); } } else { - if (AF_INET == address->sa_family) { - struct sockaddr_in self_addr; - socklen_t len = sizeof(self_addr); - if (0 == getsockname(sockfd, (struct sockaddr *)&self_addr, &len)) { - struct sockaddr_in *dst_addr = (struct sockaddr_in *)address; - if (self_addr.sin_port == dst_addr->sin_port && self_addr.sin_addr.s_addr == dst_addr->sin_addr.s_addr) { - ret = -1; - errno = EIO; - char str[INET_ADDRSTRLEN]; - ussl_log_warn("connection to %s failed, self connect self", inet_ntop(AF_INET, (const void*)(address), str, sizeof(str))); - } - } else { + struct sockaddr_storage self_addr; + socklen_t len = sizeof(self_addr); + if (0 == getsockname(sockfd, (struct sockaddr *)&self_addr, &len)) { + if (sockaddr_compare_c(address, &self_addr) != 0) { ret = -1; - ussl_log_warn("getsockname failed, fd:%d, error:%s", sockfd, strerror(errno)); + errno = EIO; + char str[128]; + ussl_log_warn("connection to %s failed, self connect self", sockaddr_to_str_c(address, str, sizeof(str))); } + } else { + ret = -1; + ussl_log_warn("getsockname failed, fd:%d, error:%s", sockfd, strerror(errno)); } /* if gid has been set and connect return success, we also let the ret to be -1 and errno to be * EINPROGRESS */ @@ -216,7 +216,7 @@ int ussl_listen(int socket, int backlog) "fd is expected to be a non-negative integer and less than %d, but currently fd is %d", USSL_MAX_FD_NUM, socket); } else { - if (0 != ussl_loop_add_listen_once(socket, backlog)) { + if (0 != ussl_loop_add_listen(socket, backlog)) { ret = -1; ussl_log_error("add listen filed, fd:%d", socket); } @@ -301,4 +301,4 @@ ssize_t ussl_writev(int fildes, const struct iovec *iov, int iovcnt) #include "loop/ussl_ihash_map.c" #include "ssl/ssl_config.c" #include "ussl-loop.c" -#include "message.c" \ No newline at end of file +#include "message.c" diff --git a/deps/ussl-hook/ussl-hook.h b/deps/ussl-hook/ussl-hook.h index fe3eb25832..a6a7c14fa0 100644 --- a/deps/ussl-hook/ussl-hook.h +++ b/deps/ussl-hook/ussl-hook.h @@ -56,7 +56,7 @@ void ussl_wait(); int ussl_setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen); int ussl_listen(int fd, int n); -int ussl_connect(int fd, const struct sockaddr *addr, socklen_t len); +int ussl_connect(int fd, struct sockaddr_storage *addr, socklen_t len); int ussl_accept(int fd, struct sockaddr *addr, socklen_t *addr_len); int ussl_accept4(int fd, struct sockaddr *addr, socklen_t *addr_len, int flags); int ussl_epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); diff --git a/deps/ussl-hook/ussl-loop.c b/deps/ussl-hook/ussl-loop.c index 0a020b618a..6cb5edb78f 100644 --- a/deps/ussl-hook/ussl-loop.c +++ b/deps/ussl-hook/ussl-loop.c @@ -21,13 +21,13 @@ typedef struct uloop_t { ussl_eloop_t ep; - ussl_listenfd_t listenfd; + ussl_listenfd_t listenfd4; + ussl_listenfd_t listenfd6; pipefd_t pipefd; ussl_dlink_t timeout_list; } uloop_t; uloop_t global_ussl_loop_struct; -static int ussl_has_listened = 0; static ussl_sf_t acceptfd_fty; static ussl_sf_t clientfd_fty; static void *ussl_bg_thread_id; @@ -59,19 +59,37 @@ static int uloop_run(uloop_t *loop) static int uloop_add_listen(uloop_t *l, int listen_fd, int backlog) { int ret = 0; - if (libc_listen(listen_fd, backlog) < 0) { - ret = -EIO; - ussl_log_error("listen failed, fd:%d, errno:%d", listen_fd, errno); - } else if (0 != acceptfd_sf_init(&acceptfd_fty)) { + in_port_t net_port = 0; + struct sockaddr_storage ussl_listened_addr; + socklen_t ussl_listened_addrlen = sizeof(ussl_listened_addr); + if (0 != acceptfd_sf_init(&acceptfd_fty)) { ret = -1; ussl_log_error("acceptfd_sf_init failed, fd:%d", listen_fd); - } else if (0 != ussl_listenfd_init(&l->ep, &l->listenfd, (ussl_sf_t *)&acceptfd_fty, listen_fd)) { + } else if (0 != getsockname(listen_fd, (struct sockaddr *)&ussl_listened_addr, + &ussl_listened_addrlen)) { ret = -1; - ussl_log_error("listenfd_init failed, fd:%d", listen_fd); - } - if (0 != ret) { - if (listen_fd >= 0) { - close(listen_fd); + ussl_log_error("getsockname failed, fd:%d, errno:%d", listen_fd, errno); + } else if (AF_INET != ussl_listened_addr.ss_family && AF_INET6 != ussl_listened_addr.ss_family) { + ret = -1; + ussl_log_error("the protocol is not supported, fd:%d, family:%hu", listen_fd, + ussl_listened_addr.ss_family); + } else { + int fd = listen_fd; + int ret = 0; + if (AF_INET == ussl_listened_addr.ss_family) { + net_port = ((struct sockaddr_in *)(&ussl_listened_addr))->sin_port; + } else { + net_port = ((struct sockaddr_in6 *)(&ussl_listened_addr))->sin6_port; + } + if (libc_listen(fd, backlog) < 0) { + ret = -1; + ussl_log_error("listen failed, fd:%d, port:%hu", fd, ntohs(net_port)); + } else if (0 != ussl_listenfd_init(&l->ep, + AF_INET == ussl_listened_addr.ss_family ? &l->listenfd4 : &l->listenfd6, (ussl_sf_t *)&acceptfd_fty, fd)) { + ret = -1; + ussl_log_error("listenfd_init failed, fd:%d, port:%hu", fd, ntohs(net_port)); + } else { + ussl_log_info("listen success, fd:%d, port:%hu", fd, ntohs(net_port)); } } return ret; @@ -189,38 +207,33 @@ void check_and_handle_timeout_event() } } -int ussl_loop_add_listen_once(int listen_fd, int backlog) +int ussl_loop_add_listen(int listen_fd, int backlog) { int ret = 0; - if (ATOMIC_BCAS(&ussl_has_listened, 0, 1)) { - // get addr from fd - struct sockaddr_storage ussl_listened_addr; - socklen_t ussl_listened_addrlen = sizeof(ussl_listened_addr); - if (0 != - getsockname(listen_fd, (struct sockaddr *)&ussl_listened_addr, &ussl_listened_addrlen)) { - ret = -1; - ussl_log_error("getsockname failed, fd:%d, errno:%d", listen_fd, errno); - ATOMIC_STORE(&ussl_has_listened, 0); - } else if (AF_INET != ussl_listened_addr.ss_family && - AF_INET6 != ussl_listened_addr.ss_family) { - ret = -1; - ussl_log_info("the protocol family is not IPv4 or IPv6, fd:%d", listen_fd); - ATOMIC_STORE(&ussl_has_listened, 0); - } else if (0 != uloop_add_listen(&global_ussl_loop_struct, listen_fd, backlog)) { - ret = -1; - ussl_log_error("uloop_add_listen failed, fd:%d errno:%d", listen_fd, errno); - ATOMIC_STORE(&ussl_has_listened, 0); - } else { - int port = 0; - if (AF_INET == ussl_listened_addr.ss_family) { - struct sockaddr_in *s = (struct sockaddr_in *)(&ussl_listened_addr); - port = s->sin_port; - } else if (AF_INET6 == ussl_listened_addr.ss_family) { - struct sockaddr_in6 * s = (struct sockaddr_in6 *)(&ussl_listened_addr); - port = s->sin6_port; - } - ussl_log_info("uloop add listen success! port:%d", ntohs(port)); + // get addr from fd + struct sockaddr_storage ussl_listened_addr; + socklen_t ussl_listened_addrlen = sizeof(ussl_listened_addr); + if (0 != + getsockname(listen_fd, (struct sockaddr *)&ussl_listened_addr, &ussl_listened_addrlen)) { + ret = -1; + ussl_log_error("getsockname failed, fd:%d, errno:%d", listen_fd, errno); + } else if (AF_INET != ussl_listened_addr.ss_family && + AF_INET6 != ussl_listened_addr.ss_family) { + ret = -1; + ussl_log_info("the protocol family is not IPv4 or IPv6, fd:%d", listen_fd); + } else if (0 != uloop_add_listen(&global_ussl_loop_struct, listen_fd, backlog)) { + ret = -1; + ussl_log_error("uloop_add_listen failed, fd:%d errno:%d", listen_fd, errno); + } else { + int port = 0; + if (AF_INET == ussl_listened_addr.ss_family) { + struct sockaddr_in *s = (struct sockaddr_in *)(&ussl_listened_addr); + port = s->sin_port; + } else if (AF_INET6 == ussl_listened_addr.ss_family) { + struct sockaddr_in6 * s = (struct sockaddr_in6 *)(&ussl_listened_addr); + port = s->sin6_port; } + ussl_log_info("uloop add listen success! port:%d", ntohs(port)); } return ret; } diff --git a/deps/ussl-hook/ussl-loop.h b/deps/ussl-hook/ussl-loop.h index 3ccc0bd526..cd6c995f0b 100644 --- a/deps/ussl-hook/ussl-loop.h +++ b/deps/ussl-hook/ussl-loop.h @@ -16,7 +16,7 @@ // the client communicates with the background thread through this pipe int client_comm_pipe[2]; -extern int ussl_loop_add_listen_once(int listen_fd, int backlog); +extern int ussl_loop_add_listen(int listen_fd, int backlog); extern int ussl_loop_add_clientfd(int client_fd, uint64_t gid, int ctx_id, int send_negotiation, int auth_methods, int epfd, struct epoll_event *event); int __attribute__((weak)) dispatch_accept_fd_to_certain_group(int fd, uint64_t gid); diff --git a/mittest/mtlenv/storage/test_trans.cpp b/mittest/mtlenv/storage/test_trans.cpp index cc2bf84f17..2c6a9ce69c 100644 --- a/mittest/mtlenv/storage/test_trans.cpp +++ b/mittest/mtlenv/storage/test_trans.cpp @@ -216,7 +216,7 @@ void TestTrans::prepare_tx_desc(ObTxDesc *&tx_desc, ObTxReadSnapshot &snapshot) TEST_F(TestTrans, create_ls_and_tablet) { int ret = OB_SUCCESS; - ASSERT_EQ(OB_SUCCESS, ObCurTraceId::get_trace_id()->set("Y1-1111111111111111")); + ASSERT_EQ(OB_SUCCESS, ObCurTraceId::get_trace_id()->set("Y1-1111111111111111-0-0")); ObLSID ls_id(100); ObLS *ls = nullptr; uint64_t tenant_id = MTL_ID(); @@ -312,7 +312,7 @@ TEST_F(TestTrans, basic) TEST_F(TestTrans, dist_trans) { - ASSERT_EQ(OB_SUCCESS, ObCurTraceId::get_trace_id()->set("Y2-2222222222222222222222")); + ASSERT_EQ(OB_SUCCESS, ObCurTraceId::get_trace_id()->set("Y2-2222222222222222-0-0")); uint64_t tenant_id = MTL_ID(); ObLSID ls_id(100); ObTabletID tablet_id(1001); diff --git a/src/observer/ob_server.cpp b/src/observer/ob_server.cpp index d47f336de7..68f15edfd8 100644 --- a/src/observer/ob_server.cpp +++ b/src/observer/ob_server.cpp @@ -2102,6 +2102,9 @@ int ObServer::init_pre_setting() LOG_INFO("set stack_size", K(stack_size)); global_thread_stack_size = stack_size - SIG_STACK_SIZE - ACHUNK_PRESERVE_SIZE; } + if (OB_SUCC(ret) && GCONF.use_ipv6) { + enable_use_ipv6(); + } return ret; } diff --git a/src/share/inner_table/sys_package/dbms_xplan_mysql.sql b/src/share/inner_table/sys_package/dbms_xplan_mysql.sql index 29f34d0616..9a5ec94770 100644 --- a/src/share/inner_table/sys_package/dbms_xplan_mysql.sql +++ b/src/share/inner_table/sys_package/dbms_xplan_mysql.sql @@ -34,7 +34,7 @@ CREATE OR REPLACE PACKAGE dbms_xplan AUTHID CURRENT_USER -- display sql plan table`s plan function display_cursor(plan_id DECIMAL default 0, -- default value: last plan format VARCHAR(32) default 'TYPICAL', - svr_ip VARCHAR(32) default null, -- default value: server connected by client + svr_ip VARCHAR(64) default null, -- default value: server connected by client svr_port DECIMAL default 0, -- default value: server connected by client tenant_id DECIMAL default 0 -- default value: current tenant ) @@ -44,7 +44,7 @@ CREATE OR REPLACE PACKAGE dbms_xplan AUTHID CURRENT_USER function display_sql_plan_baseline(sql_handle VARCHAR(32) default NULL, plan_name VARCHAR(32) default NULL, format VARCHAR(32) default 'TYPICAL', - svr_ip VARCHAR(32) default null, -- default value: server connected by client + svr_ip VARCHAR(64) default null, -- default value: server connected by client svr_port DECIMAL default 0, -- default value: server connected by client tenant_id DECIMAL default 0 -- default value: current tenant ) @@ -54,9 +54,9 @@ CREATE OR REPLACE PACKAGE dbms_xplan AUTHID CURRENT_USER function display_active_session_plan( session_id DECIMAL default 0, format VARCHAR(32) default 'TYPICAL', - svr_ip VARCHAR(32) default null, -- default value: server connected by client + svr_ip VARCHAR(64) default null, -- default value: server connected by client svr_port DECIMAL default 0 -- default value: server connected by client ) return text; -END dbms_xplan; \ No newline at end of file +END dbms_xplan; diff --git a/src/share/inner_table/sys_package/dbms_xplan_mysql_body.sql b/src/share/inner_table/sys_package/dbms_xplan_mysql_body.sql index 306c2c5d5d..36b4329644 100644 --- a/src/share/inner_table/sys_package/dbms_xplan_mysql_body.sql +++ b/src/share/inner_table/sys_package/dbms_xplan_mysql_body.sql @@ -31,7 +31,7 @@ CREATE OR REPLACE PACKAGE BODY dbms_xplan -- display sql plan table`s plan function display_cursor(plan_id DECIMAL default 0, -- default value: last plan format VARCHAR(32) default 'TYPICAL', - svr_ip VARCHAR(32) default null, -- default value: server connected by client + svr_ip VARCHAR(64) default null, -- default value: server connected by client svr_port DECIMAL default 0, -- default value: server connected by client tenant_id DECIMAL default 0 -- default value: current tenant ) @@ -42,7 +42,7 @@ CREATE OR REPLACE PACKAGE BODY dbms_xplan function display_sql_plan_baseline(sql_handle VARCHAR(32) default NULL, plan_name VARCHAR(32) default NULL, format VARCHAR(32) default 'TYPICAL', - svr_ip VARCHAR(32) default null, -- default value: server connected by client + svr_ip VARCHAR(64) default null, -- default value: server connected by client svr_port DECIMAL default 0, -- default value: server connected by client tenant_id DECIMAL default 0 -- default value: current tenant ) @@ -53,10 +53,10 @@ CREATE OR REPLACE PACKAGE BODY dbms_xplan function display_active_session_plan( session_id DECIMAL default 0, format VARCHAR(32) default 'TYPICAL', - svr_ip VARCHAR(32) default null, -- default value: server connected by client + svr_ip VARCHAR(64) default null, -- default value: server connected by client svr_port DECIMAL default 0 -- default value: server connected by client ) return text; PRAGMA INTERFACE(C, DISPLAY_ACTIVE_SESSION_PLAN); -END dbms_xplan; \ No newline at end of file +END dbms_xplan; diff --git a/src/share/ob_web_service_root_addr.cpp b/src/share/ob_web_service_root_addr.cpp index d2cb8b57df..c75012cdd5 100644 --- a/src/share/ob_web_service_root_addr.cpp +++ b/src/share/ob_web_service_root_addr.cpp @@ -698,9 +698,13 @@ int ObWebServiceRootAddr::to_json( } else if (!is_strong_leader(role) && !is_follower(role)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid role type", K(ret), K(role)); - } else if (OB_FAIL(json.append_fmt("{\"%s\":\"%s:%d\",\"%s\":\"%s\",\"%s\":%ld}", + } else if (OB_FAIL(json.append_fmt("{\"%s\":\"%s%s%s:%d\",\"%s\":\"%s\",\"%s\":%ld}", - JSON_ADDRESS, ip_buf, addr_list.at(i).get_server().get_port(), + JSON_ADDRESS, + addr_list.at(i).get_server().using_ipv4() ? "" : "[", + ip_buf, + addr_list.at(i).get_server().using_ipv4() ? "" : "]", + addr_list.at(i).get_server().get_port(), JSON_ROLE, is_strong_leader(role) ? "LEADER" : "FOLLOWER", JSON_SQL_PORT, addr_list.at(i).get_sql_port()))) { LOG_WARN("append string failed", K(ret)); diff --git a/src/storage/ob_locality_manager.cpp b/src/storage/ob_locality_manager.cpp index 2fcc263dd7..d08bf8e396 100644 --- a/src/storage/ob_locality_manager.cpp +++ b/src/storage/ob_locality_manager.cpp @@ -179,7 +179,7 @@ int ObLocalityManager::check_ssl_invited_nodes(easy_connection_t &c) char ip_buffer[MAX_IP_ADDR_LENGTH] = {}; easy_addr_t tmp_addr = c.addr; tmp_addr.port = 0;//mark it invalied, not care it - char *clinet_ip = easy_inet_addr_to_str(&tmp_addr, ip_buffer, 32); + char *clinet_ip = easy_inet_addr_to_str(&tmp_addr, ip_buffer, sizeof(ip_buffer)); if (NULL != strstr(ssl_invited_nodes.ptr(), clinet_ip) && self_.ip_to_string(ip_buffer, MAX_IP_ADDR_LENGTH) && NULL != strstr(ssl_invited_nodes.ptr(), ip_buffer))