269 lines
9.3 KiB
C
269 lines
9.3 KiB
C
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#include "ussl-loop.h"
|
|
#include <errno.h>
|
|
#include <netinet/in.h>
|
|
#include <pthread.h>
|
|
#include <sys/prctl.h>
|
|
|
|
#define TIMEOUT_THRESHOLD_SEC 3
|
|
|
|
typedef struct uloop_t
|
|
{
|
|
ussl_eloop_t ep;
|
|
ussl_listenfd_t listenfd4;
|
|
ussl_listenfd_t listenfd6;
|
|
pipefd_t pipefd;
|
|
ussl_dlink_t timeout_list;
|
|
} uloop_t;
|
|
|
|
uloop_t global_ussl_loop_struct;
|
|
static ussl_sf_t acceptfd_fty;
|
|
static ussl_sf_t clientfd_fty;
|
|
static void *ussl_bg_thread_id;
|
|
|
|
int ob_pthread_create(void **ptr, void *(*start_routine) (void *), void *arg);
|
|
void ob_pthread_join(void *ptr);
|
|
void ob_set_thread_name(const char* type);
|
|
|
|
static int uloop_init(uloop_t *l)
|
|
{
|
|
int err = 0;
|
|
ussl_dlink_init(&l->timeout_list);
|
|
if (0 != (err = ussl_eloop_init(&l->ep))) {
|
|
ussl_log_error("eloop init fail: %d", err);
|
|
} else if (0 != clientfd_sf_init(&clientfd_fty)) {
|
|
ussl_log_error("initialize clientfd factory fail.")
|
|
} else if (0 !=
|
|
(err = pipefd_init(&l->ep, &l->pipefd, (ussl_sf_t *)&clientfd_fty, client_comm_pipe[0]))) {
|
|
ussl_log_error("pipefd init fail: %d", err);
|
|
}
|
|
return err;
|
|
}
|
|
|
|
static int uloop_run(uloop_t *loop)
|
|
{
|
|
return ussl_eloop_run(&loop->ep);
|
|
}
|
|
|
|
static int uloop_add_listen(uloop_t *l, int listen_fd, int backlog)
|
|
{
|
|
int ret = 0;
|
|
in_port_t net_port = 0;
|
|
struct sockaddr_storage ussl_listened_addr;
|
|
socklen_t ussl_listened_addrlen = sizeof(ussl_listened_addr);
|
|
if (0 != acceptfd_sf_init(&acceptfd_fty)) {
|
|
ret = -1;
|
|
ussl_log_error("acceptfd_sf_init failed, fd:%d", listen_fd);
|
|
} else if (0 != getsockname(listen_fd, (struct sockaddr *)&ussl_listened_addr,
|
|
&ussl_listened_addrlen)) {
|
|
ret = -1;
|
|
ussl_log_error("getsockname failed, fd:%d, errno:%d", listen_fd, errno);
|
|
} else if (AF_INET != ussl_listened_addr.ss_family && AF_INET6 != ussl_listened_addr.ss_family) {
|
|
ret = -1;
|
|
ussl_log_error("the protocol is not supported, fd:%d, family:%hu", listen_fd,
|
|
ussl_listened_addr.ss_family);
|
|
} else {
|
|
int fd = listen_fd;
|
|
int ret = 0;
|
|
if (AF_INET == ussl_listened_addr.ss_family) {
|
|
net_port = ((struct sockaddr_in *)(&ussl_listened_addr))->sin_port;
|
|
} else {
|
|
net_port = ((struct sockaddr_in6 *)(&ussl_listened_addr))->sin6_port;
|
|
}
|
|
if (libc_listen(fd, backlog) < 0) {
|
|
ret = -1;
|
|
ussl_log_error("listen failed, fd:%d, port:%hu", fd, ntohs(net_port));
|
|
} else if (0 != ussl_listenfd_init(&l->ep,
|
|
AF_INET == ussl_listened_addr.ss_family ? &l->listenfd4 : &l->listenfd6, (ussl_sf_t *)&acceptfd_fty, fd)) {
|
|
ret = -1;
|
|
ussl_log_error("listenfd_init failed, fd:%d, port:%hu", fd, ntohs(net_port));
|
|
} else {
|
|
ussl_log_info("listen success, fd:%d, port:%hu", fd, ntohs(net_port));
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
static void *bg_thread_func(void *arg)
|
|
{
|
|
ob_set_thread_name("ussl_loop");
|
|
uloop_run(&global_ussl_loop_struct);
|
|
return NULL;
|
|
}
|
|
|
|
#ifndef F_SETPIPE_SZ
|
|
#define F_SETPIPE_SZ 1031
|
|
#endif
|
|
|
|
int ussl_init_bg_thread()
|
|
{
|
|
int ret = 0;
|
|
static const int pipe_resize = 128 * 1024;
|
|
if (0 != pipe2(client_comm_pipe, O_NONBLOCK | O_CLOEXEC)) {
|
|
ret = EIO;
|
|
ussl_log_error("create client_communicate_pipe failed, errno:%d", errno);
|
|
} else if (fcntl(client_comm_pipe[1], F_SETPIPE_SZ, pipe_resize) < 0) {
|
|
int pipe_size = fcntl(client_comm_pipe[1], F_GETPIPE_SZ);
|
|
ussl_log_warn("resize pipe failed, pipefd:%d, errno:%d, pipe_size:%d", client_comm_pipe[1], errno, pipe_size);
|
|
}
|
|
if (0 == ret) {
|
|
if (0 != uloop_init(&global_ussl_loop_struct)) {
|
|
ussl_log_error("initialize uloop failed.")
|
|
} else if (0 != ob_pthread_create(&ussl_bg_thread_id, bg_thread_func, NULL)) {
|
|
ret = EIO;
|
|
ussl_log_error("create background thread failed, errno:%d", errno);
|
|
} else {
|
|
ussl_log_info("create background thread success!");
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
extern int is_ussl_bg_thread_started;
|
|
void ussl_wait_bg_thread()
|
|
{
|
|
if (ATOMIC_LOAD(&is_ussl_bg_thread_started)) {
|
|
ob_pthread_join(ussl_bg_thread_id);
|
|
ussl_bg_thread_id = NULL;
|
|
ATOMIC_STORE(&is_ussl_bg_thread_started, 0);
|
|
}
|
|
}
|
|
|
|
void add_to_timeout_list(ussl_dlink_t *l)
|
|
{
|
|
ussl_dlink_insert(&global_ussl_loop_struct.timeout_list, l);
|
|
}
|
|
|
|
void remove_from_timeout_list(ussl_dlink_t *l)
|
|
{
|
|
ussl_dlink_delete(l);
|
|
}
|
|
|
|
static void handle_client_timeout_event(clientfd_sk_t *client_sk)
|
|
{
|
|
ussl_sf_t *clientfd_fty = client_sk->fty;
|
|
shutdown(client_sk->fd, SHUT_WR);
|
|
if (libc_epoll_ctl(client_sk->ep->fd, EPOLL_CTL_DEL, client_sk->fd, NULL) < 0) {
|
|
ussl_log_warn("remove fd from current epoll failed, fd:%d, cur_epollfd:%d, errno:%d",
|
|
client_sk->fd, client_sk->ep->fd, errno);
|
|
} else if (libc_epoll_ctl(client_sk->fd_info.org_epfd, EPOLL_CTL_ADD, client_sk->fd,
|
|
&client_sk->fd_info.event) < 0) {
|
|
ussl_log_warn("give back fd to original epoll failed, fd:%d, origin_epfd:%d, errno:%d",
|
|
client_sk->fd, client_sk->fd_info.org_epfd, errno);
|
|
}
|
|
client_sk->fd = -1;
|
|
clientfd_fty->destroy(clientfd_fty, (ussl_sock_t *)client_sk);
|
|
}
|
|
|
|
static void handle_server_timeout_event(acceptfd_sk_t *accept_sk)
|
|
{
|
|
ussl_sf_t *acceptfd_fty = accept_sk->fty;
|
|
if (libc_epoll_ctl(accept_sk->ep->fd, EPOLL_CTL_DEL, accept_sk->fd, NULL) < 0) {
|
|
ussl_log_warn("remove fd from current epoll failed, fd:%d, cur_epollfd:%d, errno:%d",
|
|
accept_sk->fd, accept_sk->ep->fd, errno);
|
|
}
|
|
close(accept_sk->fd);
|
|
accept_sk->fd = -1;
|
|
acceptfd_fty->destroy(acceptfd_fty, (ussl_sock_t *)accept_sk);
|
|
}
|
|
|
|
void check_and_handle_timeout_event()
|
|
{
|
|
time_t cur_time = time(NULL);
|
|
ussl_dlink_for(&global_ussl_loop_struct.timeout_list, p)
|
|
{
|
|
int type = *(int *)(p + 1);
|
|
if (CLIENT_SOCK == type) {
|
|
clientfd_sk_t *client_sk = ussl_structof(p, clientfd_sk_t, timeout_link);
|
|
if (cur_time - client_sk->start_time > TIMEOUT_THRESHOLD_SEC) {
|
|
char dst_addr[IP_STRING_MAX_LEN] = {0};
|
|
ussl_get_peer_addr(client_sk->fd, dst_addr, IP_STRING_MAX_LEN);
|
|
ussl_log_warn("clientfd timeout, fd:%d, dst_addr:%s", client_sk->fd, dst_addr);
|
|
ussl_dlink_delete(&client_sk->ready_link);
|
|
ussl_dlink_delete(p);
|
|
handle_client_timeout_event(client_sk);
|
|
}
|
|
} else {
|
|
acceptfd_sk_t *accept_sk = ussl_structof(p, acceptfd_sk_t, timeout_link);
|
|
if (cur_time - accept_sk->start_time > TIMEOUT_THRESHOLD_SEC) {
|
|
char src_addr[IP_STRING_MAX_LEN] = {0};
|
|
ussl_get_peer_addr(accept_sk->fd, src_addr, IP_STRING_MAX_LEN);
|
|
ussl_log_warn("acceptfd timeout, fd:%d, src_addr:%s", accept_sk->fd, src_addr);
|
|
ussl_dlink_delete(&accept_sk->ready_link);
|
|
ussl_dlink_delete(p);
|
|
handle_server_timeout_event(accept_sk);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
int ussl_loop_add_listen(int listen_fd, int backlog)
|
|
{
|
|
int ret = 0;
|
|
// get addr from fd
|
|
struct sockaddr_storage ussl_listened_addr;
|
|
socklen_t ussl_listened_addrlen = sizeof(ussl_listened_addr);
|
|
if (0 !=
|
|
getsockname(listen_fd, (struct sockaddr *)&ussl_listened_addr, &ussl_listened_addrlen)) {
|
|
ret = -1;
|
|
ussl_log_error("getsockname failed, fd:%d, errno:%d", listen_fd, errno);
|
|
} else if (AF_INET != ussl_listened_addr.ss_family &&
|
|
AF_INET6 != ussl_listened_addr.ss_family) {
|
|
ret = -1;
|
|
ussl_log_info("the protocol family is not IPv4 or IPv6, fd:%d", listen_fd);
|
|
} else if (0 != uloop_add_listen(&global_ussl_loop_struct, listen_fd, backlog)) {
|
|
ret = -1;
|
|
ussl_log_error("uloop_add_listen failed, fd:%d errno:%d", listen_fd, errno);
|
|
} else {
|
|
int port = 0;
|
|
if (AF_INET == ussl_listened_addr.ss_family) {
|
|
struct sockaddr_in *s = (struct sockaddr_in *)(&ussl_listened_addr);
|
|
port = s->sin_port;
|
|
} else if (AF_INET6 == ussl_listened_addr.ss_family) {
|
|
struct sockaddr_in6 * s = (struct sockaddr_in6 *)(&ussl_listened_addr);
|
|
port = s->sin6_port;
|
|
}
|
|
ussl_log_info("uloop add listen success! port:%d", ntohs(port));
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int ussl_loop_add_clientfd(int client_fd, uint64_t gid, int ctx_id, int send_negotiation, int auth_methods, int epfd,
|
|
struct epoll_event *event)
|
|
{
|
|
int ret = 0;
|
|
client_fd_info_t client_fd_info;
|
|
|
|
client_fd_info.client_gid = gid;
|
|
client_fd_info.ssl_ctx_id = ctx_id;
|
|
client_fd_info.event = *event;
|
|
client_fd_info.org_epfd = epfd;
|
|
client_fd_info.client_fd = client_fd;
|
|
client_fd_info.auth_methods = auth_methods;
|
|
client_fd_info.send_negotiation = send_negotiation;
|
|
ssize_t wbytes = 0;
|
|
|
|
if (sizeof(client_fd_info) !=
|
|
(wbytes = libc_write(client_comm_pipe[1], &client_fd_info, sizeof(client_fd_info)))) {
|
|
ret = -1;
|
|
ussl_log_error(
|
|
"write client_fd_info failed, clientfd:%d, gid:0x%lx, ctx_id:%d, errno:%d, wbytes:%ld",
|
|
client_fd, gid, ctx_id, errno, wbytes);
|
|
} else {
|
|
ussl_log_info("write client fd succ, fd:%d, gid:0x%lx, need_send_negotiation:%d", client_fd, gid, send_negotiation);
|
|
}
|
|
|
|
return ret;
|
|
}
|