diff --git a/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp b/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp index 61c0982947..f882f4a5d7 100644 --- a/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp +++ b/deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp @@ -527,25 +527,32 @@ static int socket_set_opt(int fd, int option, int value) return setsockopt(fd, SOL_SOCKET, option, (void *)&value, sizeof(value)); } -static int listen_create(int port) { +// need_monopolize is true means the first bind on mysql port should +// detect whether the port has been used or not to prevent the same mysql port +// been used by different observer processes +static int listen_create(int port, bool need_monopolize) +{ int err = 0; int fd = 0; struct sockaddr_in sin; if ((fd = socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0)) < 0) { LOG_ERROR_RET(common::OB_ERR_SYS, "sql nio create socket for listen failed", K(errno)); err = errno; - } else if (socket_set_opt(fd, SO_REUSEPORT, 1) < 0) { - LOG_ERROR_RET(OB_ERR_SYS, "sql nio set sock opt SO_REUSEPORT failed", K(errno), K(fd)); - err = errno; } else if (socket_set_opt(fd, SO_REUSEADDR, 1) < 0) { LOG_ERROR_RET(OB_ERR_SYS, "sql nio set sock opt SO_REUSEADDR failed", K(errno), K(fd)); err = errno; + } else if ((false == need_monopolize) && (socket_set_opt(fd, SO_REUSEPORT, 1) < 0)) { + LOG_ERROR_RET(OB_ERR_SYS, "sql nio set sock opt SO_REUSEPORT failed", K(errno), K(fd)); + err = errno; } else if (bind(fd, (sockaddr*)obrpc::make_unix_sockaddr(&sin, 0, port), sizeof(sin))) { LOG_ERROR_RET(OB_ERR_SYS, "sql nio bind listen fd failed", K(errno), K(fd)); err = errno; } else if (listen(fd, 1024) < 0) { LOG_ERROR_RET(OB_ERR_SYS, "sql nio listen failed", K(errno), K(fd)); err = errno; + } else if ((true == need_monopolize) && (socket_set_opt(fd, SO_REUSEPORT, 1) < 0)) { + LOG_ERROR_RET(OB_ERR_SYS, "sql nio set sock opt SO_REUSEPORT failed", K(errno), K(fd)); + err = errno; } if (0 != err) { if (fd >= 0) { @@ -631,12 +638,13 @@ public: free_sql_sock(s); } } - int init(int port) { + //use need_monopolize to prevent two observer processes use the same mysql port + int init(int port, bool need_monopolize) { int ret = OB_SUCCESS; if (port == -1) { ret = init_io(); } else { - ret = init_listen(port); + ret = init_listen(port, need_monopolize); } return ret; } @@ -653,13 +661,13 @@ public: } return ret; } - int init_listen(int port) { + int init_listen(int port, bool need_monopolize) { int ret = OB_SUCCESS; uint32_t epflag = EPOLLIN; if ((epfd_ = epoll_create1(EPOLL_CLOEXEC)) < 0) { ret = OB_IO_ERROR; LOG_WARN("epoll_create fail", K(ret), K(errno)); - } else if ((lfd_ = listen_create(port)) < 0) { + } else if ((lfd_ = listen_create(port, need_monopolize)) < 0) { ret = OB_SERVER_LISTEN_ERROR; LOG_WARN("listen create fail", K(ret), K(port), K(errno), KERRNOMSG(errno)); } else if (0 != epoll_regist(epfd_, lfd_, epflag, NULL)) { @@ -1024,8 +1032,9 @@ int ObSqlNio::start(int port, ObISqlSockHandler *handler, int n_thread, LOG_WARN("alloc sql nio fail", K(ret)); } else { for (int i = 0; OB_SUCCESS == ret && i < n_thread; i++) { + bool need_monopolize = ((i == 0) ? true : false); new (impl_ + i) ObSqlNioImpl(*handler); - if (OB_FAIL(impl_[i].init(port))) { + if (OB_FAIL(impl_[i].init(port, need_monopolize))) { LOG_WARN("impl init fail", K(ret)); } } @@ -1158,6 +1167,7 @@ int ObSqlNio::set_thread_count(const int n_thread) { int ret = OB_SUCCESS; int cur_thread = get_thread_count(); + bool need_monopolize = false; if (n_thread > MAX_THREAD_CNT) { ret = OB_INVALID_ARGUMENT; } else if (n_thread == cur_thread) { @@ -1166,7 +1176,7 @@ int ObSqlNio::set_thread_count(const int n_thread) if (n_thread > cur_thread) { for (int i = cur_thread; OB_SUCCESS == ret && i < n_thread; i++) { new (impl_ + i) ObSqlNioImpl(*handler_); - if (OB_FAIL(impl_[i].init(port_))) { + if (OB_FAIL(impl_[i].init(port_, need_monopolize))) { LOG_WARN("impl init fail"); } } diff --git a/deps/oblib/src/rpc/pnio/nio/inet.c b/deps/oblib/src/rpc/pnio/nio/inet.c index 9d1c883da2..97f7ace382 100644 --- a/deps/oblib/src/rpc/pnio/nio/inet.c +++ b/deps/oblib/src/rpc/pnio/nio/inet.c @@ -43,7 +43,6 @@ int listen_create(addr_t src) { struct sockaddr_in sin; ef((fd = socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0)) < 0); ef(set_tcp_reuse_addr(fd)); - ef(set_tcp_reuse_port(fd)); ef(bind(fd, (const struct sockaddr*)make_sockaddr(&sin, src), sizeof(sin))); ef(ussl_listen(fd, 1024)); return fd;