prevent the problem of SO_REUSEPORT that two observer can use same rpc/mysql port for listen
This commit is contained in:

committed by
ob-robot

parent
0e941a7b37
commit
9ea66e3409
30
deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp
vendored
30
deps/oblib/src/rpc/obmysql/ob_sql_nio.cpp
vendored
@ -527,25 +527,32 @@ static int socket_set_opt(int fd, int option, int value)
|
|||||||
return setsockopt(fd, SOL_SOCKET, option, (void *)&value, sizeof(value));
|
return setsockopt(fd, SOL_SOCKET, option, (void *)&value, sizeof(value));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int listen_create(int port) {
|
// need_monopolize is true means the first bind on mysql port should
|
||||||
|
// detect whether the port has been used or not to prevent the same mysql port
|
||||||
|
// been used by different observer processes
|
||||||
|
static int listen_create(int port, bool need_monopolize)
|
||||||
|
{
|
||||||
int err = 0;
|
int err = 0;
|
||||||
int fd = 0;
|
int fd = 0;
|
||||||
struct sockaddr_in sin;
|
struct sockaddr_in sin;
|
||||||
if ((fd = socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0)) < 0) {
|
if ((fd = socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0)) < 0) {
|
||||||
LOG_ERROR_RET(common::OB_ERR_SYS, "sql nio create socket for listen failed", K(errno));
|
LOG_ERROR_RET(common::OB_ERR_SYS, "sql nio create socket for listen failed", K(errno));
|
||||||
err = errno;
|
err = errno;
|
||||||
} else if (socket_set_opt(fd, SO_REUSEPORT, 1) < 0) {
|
|
||||||
LOG_ERROR_RET(OB_ERR_SYS, "sql nio set sock opt SO_REUSEPORT failed", K(errno), K(fd));
|
|
||||||
err = errno;
|
|
||||||
} else if (socket_set_opt(fd, SO_REUSEADDR, 1) < 0) {
|
} else if (socket_set_opt(fd, SO_REUSEADDR, 1) < 0) {
|
||||||
LOG_ERROR_RET(OB_ERR_SYS, "sql nio set sock opt SO_REUSEADDR failed", K(errno), K(fd));
|
LOG_ERROR_RET(OB_ERR_SYS, "sql nio set sock opt SO_REUSEADDR failed", K(errno), K(fd));
|
||||||
err = errno;
|
err = errno;
|
||||||
|
} else if ((false == need_monopolize) && (socket_set_opt(fd, SO_REUSEPORT, 1) < 0)) {
|
||||||
|
LOG_ERROR_RET(OB_ERR_SYS, "sql nio set sock opt SO_REUSEPORT failed", K(errno), K(fd));
|
||||||
|
err = errno;
|
||||||
} else if (bind(fd, (sockaddr*)obrpc::make_unix_sockaddr(&sin, 0, port), sizeof(sin))) {
|
} else if (bind(fd, (sockaddr*)obrpc::make_unix_sockaddr(&sin, 0, port), sizeof(sin))) {
|
||||||
LOG_ERROR_RET(OB_ERR_SYS, "sql nio bind listen fd failed", K(errno), K(fd));
|
LOG_ERROR_RET(OB_ERR_SYS, "sql nio bind listen fd failed", K(errno), K(fd));
|
||||||
err = errno;
|
err = errno;
|
||||||
} else if (listen(fd, 1024) < 0) {
|
} else if (listen(fd, 1024) < 0) {
|
||||||
LOG_ERROR_RET(OB_ERR_SYS, "sql nio listen failed", K(errno), K(fd));
|
LOG_ERROR_RET(OB_ERR_SYS, "sql nio listen failed", K(errno), K(fd));
|
||||||
err = errno;
|
err = errno;
|
||||||
|
} else if ((true == need_monopolize) && (socket_set_opt(fd, SO_REUSEPORT, 1) < 0)) {
|
||||||
|
LOG_ERROR_RET(OB_ERR_SYS, "sql nio set sock opt SO_REUSEPORT failed", K(errno), K(fd));
|
||||||
|
err = errno;
|
||||||
}
|
}
|
||||||
if (0 != err) {
|
if (0 != err) {
|
||||||
if (fd >= 0) {
|
if (fd >= 0) {
|
||||||
@ -631,12 +638,13 @@ public:
|
|||||||
free_sql_sock(s);
|
free_sql_sock(s);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int init(int port) {
|
//use need_monopolize to prevent two observer processes use the same mysql port
|
||||||
|
int init(int port, bool need_monopolize) {
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (port == -1) {
|
if (port == -1) {
|
||||||
ret = init_io();
|
ret = init_io();
|
||||||
} else {
|
} else {
|
||||||
ret = init_listen(port);
|
ret = init_listen(port, need_monopolize);
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -653,13 +661,13 @@ public:
|
|||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
int init_listen(int port) {
|
int init_listen(int port, bool need_monopolize) {
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
uint32_t epflag = EPOLLIN;
|
uint32_t epflag = EPOLLIN;
|
||||||
if ((epfd_ = epoll_create1(EPOLL_CLOEXEC)) < 0) {
|
if ((epfd_ = epoll_create1(EPOLL_CLOEXEC)) < 0) {
|
||||||
ret = OB_IO_ERROR;
|
ret = OB_IO_ERROR;
|
||||||
LOG_WARN("epoll_create fail", K(ret), K(errno));
|
LOG_WARN("epoll_create fail", K(ret), K(errno));
|
||||||
} else if ((lfd_ = listen_create(port)) < 0) {
|
} else if ((lfd_ = listen_create(port, need_monopolize)) < 0) {
|
||||||
ret = OB_SERVER_LISTEN_ERROR;
|
ret = OB_SERVER_LISTEN_ERROR;
|
||||||
LOG_WARN("listen create fail", K(ret), K(port), K(errno), KERRNOMSG(errno));
|
LOG_WARN("listen create fail", K(ret), K(port), K(errno), KERRNOMSG(errno));
|
||||||
} else if (0 != epoll_regist(epfd_, lfd_, epflag, NULL)) {
|
} else if (0 != epoll_regist(epfd_, lfd_, epflag, NULL)) {
|
||||||
@ -1024,8 +1032,9 @@ int ObSqlNio::start(int port, ObISqlSockHandler *handler, int n_thread,
|
|||||||
LOG_WARN("alloc sql nio fail", K(ret));
|
LOG_WARN("alloc sql nio fail", K(ret));
|
||||||
} else {
|
} else {
|
||||||
for (int i = 0; OB_SUCCESS == ret && i < n_thread; i++) {
|
for (int i = 0; OB_SUCCESS == ret && i < n_thread; i++) {
|
||||||
|
bool need_monopolize = ((i == 0) ? true : false);
|
||||||
new (impl_ + i) ObSqlNioImpl(*handler);
|
new (impl_ + i) ObSqlNioImpl(*handler);
|
||||||
if (OB_FAIL(impl_[i].init(port))) {
|
if (OB_FAIL(impl_[i].init(port, need_monopolize))) {
|
||||||
LOG_WARN("impl init fail", K(ret));
|
LOG_WARN("impl init fail", K(ret));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1158,6 +1167,7 @@ int ObSqlNio::set_thread_count(const int n_thread)
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int cur_thread = get_thread_count();
|
int cur_thread = get_thread_count();
|
||||||
|
bool need_monopolize = false;
|
||||||
if (n_thread > MAX_THREAD_CNT) {
|
if (n_thread > MAX_THREAD_CNT) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
} else if (n_thread == cur_thread) {
|
} else if (n_thread == cur_thread) {
|
||||||
@ -1166,7 +1176,7 @@ int ObSqlNio::set_thread_count(const int n_thread)
|
|||||||
if (n_thread > cur_thread) {
|
if (n_thread > cur_thread) {
|
||||||
for (int i = cur_thread; OB_SUCCESS == ret && i < n_thread; i++) {
|
for (int i = cur_thread; OB_SUCCESS == ret && i < n_thread; i++) {
|
||||||
new (impl_ + i) ObSqlNioImpl(*handler_);
|
new (impl_ + i) ObSqlNioImpl(*handler_);
|
||||||
if (OB_FAIL(impl_[i].init(port_))) {
|
if (OB_FAIL(impl_[i].init(port_, need_monopolize))) {
|
||||||
LOG_WARN("impl init fail");
|
LOG_WARN("impl init fail");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
1
deps/oblib/src/rpc/pnio/nio/inet.c
vendored
1
deps/oblib/src/rpc/pnio/nio/inet.c
vendored
@ -43,7 +43,6 @@ int listen_create(addr_t src) {
|
|||||||
struct sockaddr_in sin;
|
struct sockaddr_in sin;
|
||||||
ef((fd = socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0)) < 0);
|
ef((fd = socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK|SOCK_CLOEXEC, 0)) < 0);
|
||||||
ef(set_tcp_reuse_addr(fd));
|
ef(set_tcp_reuse_addr(fd));
|
||||||
ef(set_tcp_reuse_port(fd));
|
|
||||||
ef(bind(fd, (const struct sockaddr*)make_sockaddr(&sin, src), sizeof(sin)));
|
ef(bind(fd, (const struct sockaddr*)make_sockaddr(&sin, src), sizeof(sin)));
|
||||||
ef(ussl_listen(fd, 1024));
|
ef(ussl_listen(fd, 1024));
|
||||||
return fd;
|
return fd;
|
||||||
|
Reference in New Issue
Block a user