Files
oceanbase/deps/ussl-hook/ussl-loop.c

269 lines
9.3 KiB
C

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "ussl-loop.h"
#include <errno.h>
#include <netinet/in.h>
#include <pthread.h>
#include <sys/prctl.h>
#define TIMEOUT_THRESHOLD_SEC 3
typedef struct uloop_t
{
ussl_eloop_t ep;
ussl_listenfd_t listenfd4;
ussl_listenfd_t listenfd6;
pipefd_t pipefd;
ussl_dlink_t timeout_list;
} uloop_t;
uloop_t global_ussl_loop_struct;
static ussl_sf_t acceptfd_fty;
static ussl_sf_t clientfd_fty;
static void *ussl_bg_thread_id;
int ob_pthread_create(void **ptr, void *(*start_routine) (void *), void *arg);
void ob_pthread_join(void *ptr);
void ob_set_thread_name(const char* type);
static int uloop_init(uloop_t *l)
{
int err = 0;
ussl_dlink_init(&l->timeout_list);
if (0 != (err = ussl_eloop_init(&l->ep))) {
ussl_log_error("eloop init fail: %d", err);
} else if (0 != clientfd_sf_init(&clientfd_fty)) {
ussl_log_error("initialize clientfd factory fail.")
} else if (0 !=
(err = pipefd_init(&l->ep, &l->pipefd, (ussl_sf_t *)&clientfd_fty, client_comm_pipe[0]))) {
ussl_log_error("pipefd init fail: %d", err);
}
return err;
}
static int uloop_run(uloop_t *loop)
{
return ussl_eloop_run(&loop->ep);
}
static int uloop_add_listen(uloop_t *l, int listen_fd, int backlog)
{
int ret = 0;
in_port_t net_port = 0;
struct sockaddr_storage ussl_listened_addr;
socklen_t ussl_listened_addrlen = sizeof(ussl_listened_addr);
if (0 != acceptfd_sf_init(&acceptfd_fty)) {
ret = -1;
ussl_log_error("acceptfd_sf_init failed, fd:%d", listen_fd);
} else if (0 != getsockname(listen_fd, (struct sockaddr *)&ussl_listened_addr,
&ussl_listened_addrlen)) {
ret = -1;
ussl_log_error("getsockname failed, fd:%d, errno:%d", listen_fd, errno);
} else if (AF_INET != ussl_listened_addr.ss_family && AF_INET6 != ussl_listened_addr.ss_family) {
ret = -1;
ussl_log_error("the protocol is not supported, fd:%d, family:%hu", listen_fd,
ussl_listened_addr.ss_family);
} else {
int fd = listen_fd;
int ret = 0;
if (AF_INET == ussl_listened_addr.ss_family) {
net_port = ((struct sockaddr_in *)(&ussl_listened_addr))->sin_port;
} else {
net_port = ((struct sockaddr_in6 *)(&ussl_listened_addr))->sin6_port;
}
if (libc_listen(fd, backlog) < 0) {
ret = -1;
ussl_log_error("listen failed, fd:%d, port:%hu", fd, ntohs(net_port));
} else if (0 != ussl_listenfd_init(&l->ep,
AF_INET == ussl_listened_addr.ss_family ? &l->listenfd4 : &l->listenfd6, (ussl_sf_t *)&acceptfd_fty, fd)) {
ret = -1;
ussl_log_error("listenfd_init failed, fd:%d, port:%hu", fd, ntohs(net_port));
} else {
ussl_log_info("listen success, fd:%d, port:%hu", fd, ntohs(net_port));
}
}
return ret;
}
static void *bg_thread_func(void *arg)
{
ob_set_thread_name("ussl_loop");
uloop_run(&global_ussl_loop_struct);
return NULL;
}
#ifndef F_SETPIPE_SZ
#define F_SETPIPE_SZ 1031
#endif
int ussl_init_bg_thread()
{
int ret = 0;
static const int pipe_resize = 128 * 1024;
if (0 != pipe2(client_comm_pipe, O_NONBLOCK | O_CLOEXEC)) {
ret = EIO;
ussl_log_error("create client_communicate_pipe failed, errno:%d", errno);
} else if (fcntl(client_comm_pipe[1], F_SETPIPE_SZ, pipe_resize) < 0) {
int pipe_size = fcntl(client_comm_pipe[1], F_GETPIPE_SZ);
ussl_log_warn("resize pipe failed, pipefd:%d, errno:%d, pipe_size:%d", client_comm_pipe[1], errno, pipe_size);
}
if (0 == ret) {
if (0 != uloop_init(&global_ussl_loop_struct)) {
ussl_log_error("initialize uloop failed.")
} else if (0 != ob_pthread_create(&ussl_bg_thread_id, bg_thread_func, NULL)) {
ret = EIO;
ussl_log_error("create background thread failed, errno:%d", errno);
} else {
ussl_log_info("create background thread success!");
}
}
return ret;
}
extern int is_ussl_bg_thread_started;
void ussl_wait_bg_thread()
{
if (ATOMIC_LOAD(&is_ussl_bg_thread_started)) {
ob_pthread_join(ussl_bg_thread_id);
ussl_bg_thread_id = NULL;
ATOMIC_STORE(&is_ussl_bg_thread_started, 0);
}
}
void add_to_timeout_list(ussl_dlink_t *l)
{
ussl_dlink_insert(&global_ussl_loop_struct.timeout_list, l);
}
void remove_from_timeout_list(ussl_dlink_t *l)
{
ussl_dlink_delete(l);
}
static void handle_client_timeout_event(clientfd_sk_t *client_sk)
{
ussl_sf_t *clientfd_fty = client_sk->fty;
shutdown(client_sk->fd, SHUT_WR);
if (libc_epoll_ctl(client_sk->ep->fd, EPOLL_CTL_DEL, client_sk->fd, NULL) < 0) {
ussl_log_warn("remove fd from current epoll failed, fd:%d, cur_epollfd:%d, errno:%d",
client_sk->fd, client_sk->ep->fd, errno);
} else if (libc_epoll_ctl(client_sk->fd_info.org_epfd, EPOLL_CTL_ADD, client_sk->fd,
&client_sk->fd_info.event) < 0) {
ussl_log_warn("give back fd to original epoll failed, fd:%d, origin_epfd:%d, errno:%d",
client_sk->fd, client_sk->fd_info.org_epfd, errno);
}
client_sk->fd = -1;
clientfd_fty->destroy(clientfd_fty, (ussl_sock_t *)client_sk);
}
static void handle_server_timeout_event(acceptfd_sk_t *accept_sk)
{
ussl_sf_t *acceptfd_fty = accept_sk->fty;
if (libc_epoll_ctl(accept_sk->ep->fd, EPOLL_CTL_DEL, accept_sk->fd, NULL) < 0) {
ussl_log_warn("remove fd from current epoll failed, fd:%d, cur_epollfd:%d, errno:%d",
accept_sk->fd, accept_sk->ep->fd, errno);
}
close(accept_sk->fd);
accept_sk->fd = -1;
acceptfd_fty->destroy(acceptfd_fty, (ussl_sock_t *)accept_sk);
}
void check_and_handle_timeout_event()
{
time_t cur_time = time(NULL);
ussl_dlink_for(&global_ussl_loop_struct.timeout_list, p)
{
int type = *(int *)(p + 1);
if (CLIENT_SOCK == type) {
clientfd_sk_t *client_sk = ussl_structof(p, clientfd_sk_t, timeout_link);
if (cur_time - client_sk->start_time > TIMEOUT_THRESHOLD_SEC) {
char dst_addr[IP_STRING_MAX_LEN] = {0};
ussl_get_peer_addr(client_sk->fd, dst_addr, IP_STRING_MAX_LEN);
ussl_log_warn("clientfd timeout, fd:%d, dst_addr:%s", client_sk->fd, dst_addr);
ussl_dlink_delete(&client_sk->ready_link);
ussl_dlink_delete(p);
handle_client_timeout_event(client_sk);
}
} else {
acceptfd_sk_t *accept_sk = ussl_structof(p, acceptfd_sk_t, timeout_link);
if (cur_time - accept_sk->start_time > TIMEOUT_THRESHOLD_SEC) {
char src_addr[IP_STRING_MAX_LEN] = {0};
ussl_get_peer_addr(accept_sk->fd, src_addr, IP_STRING_MAX_LEN);
ussl_log_warn("acceptfd timeout, fd:%d, src_addr:%s", accept_sk->fd, src_addr);
ussl_dlink_delete(&accept_sk->ready_link);
ussl_dlink_delete(p);
handle_server_timeout_event(accept_sk);
}
}
}
}
int ussl_loop_add_listen(int listen_fd, int backlog)
{
int ret = 0;
// get addr from fd
struct sockaddr_storage ussl_listened_addr;
socklen_t ussl_listened_addrlen = sizeof(ussl_listened_addr);
if (0 !=
getsockname(listen_fd, (struct sockaddr *)&ussl_listened_addr, &ussl_listened_addrlen)) {
ret = -1;
ussl_log_error("getsockname failed, fd:%d, errno:%d", listen_fd, errno);
} else if (AF_INET != ussl_listened_addr.ss_family &&
AF_INET6 != ussl_listened_addr.ss_family) {
ret = -1;
ussl_log_info("the protocol family is not IPv4 or IPv6, fd:%d", listen_fd);
} else if (0 != uloop_add_listen(&global_ussl_loop_struct, listen_fd, backlog)) {
ret = -1;
ussl_log_error("uloop_add_listen failed, fd:%d errno:%d", listen_fd, errno);
} else {
int port = 0;
if (AF_INET == ussl_listened_addr.ss_family) {
struct sockaddr_in *s = (struct sockaddr_in *)(&ussl_listened_addr);
port = s->sin_port;
} else if (AF_INET6 == ussl_listened_addr.ss_family) {
struct sockaddr_in6 * s = (struct sockaddr_in6 *)(&ussl_listened_addr);
port = s->sin6_port;
}
ussl_log_info("uloop add listen success! port:%d", ntohs(port));
}
return ret;
}
int ussl_loop_add_clientfd(int client_fd, uint64_t gid, int ctx_id, int send_negotiation, int auth_methods, int epfd,
struct epoll_event *event)
{
int ret = 0;
client_fd_info_t client_fd_info;
client_fd_info.client_gid = gid;
client_fd_info.ssl_ctx_id = ctx_id;
client_fd_info.event = *event;
client_fd_info.org_epfd = epfd;
client_fd_info.client_fd = client_fd;
client_fd_info.auth_methods = auth_methods;
client_fd_info.send_negotiation = send_negotiation;
ssize_t wbytes = 0;
if (sizeof(client_fd_info) !=
(wbytes = libc_write(client_comm_pipe[1], &client_fd_info, sizeof(client_fd_info)))) {
ret = -1;
ussl_log_error(
"write client_fd_info failed, clientfd:%d, gid:0x%lx, ctx_id:%d, errno:%d, wbytes:%ld",
client_fd, gid, ctx_id, errno, wbytes);
} else {
ussl_log_info("write client fd succ, fd:%d, gid:0x%lx, need_send_negotiation:%d", client_fd, gid, send_negotiation);
}
return ret;
}