131 lines
3.4 KiB
C
131 lines
3.4 KiB
C
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#define _GNU_SOURCE 1
|
|
#include <unistd.h>
|
|
#include <stdio.h>
|
|
#include <errno.h>
|
|
#include <dlfcn.h>
|
|
#include <libaio.h>
|
|
#include <libconfig.h>
|
|
#include <stdlib.h>
|
|
#include <signal.h>
|
|
#include <pthread.h>
|
|
#include <sys/stat.h>
|
|
#include <sys/time.h>
|
|
#include <sys/types.h>
|
|
#include <sys/syscall.h>
|
|
#include <fcntl.h>
|
|
|
|
static int (*real_io_getevents)(
|
|
io_context_t ctx_id, long min_nr, long nr, struct io_event* events, struct timespec* timeout);
|
|
static int (*real_io_submit)(io_context_t ctx, long nr, struct iocb** iocbpp);
|
|
|
|
struct aio_conf_t {
|
|
int work_fd_;
|
|
int io_submit_failed_;
|
|
int io_hang_;
|
|
int io_timeout_;
|
|
} aio_conf_t;
|
|
|
|
static struct aio_conf_t aio_conf;
|
|
static int is_inited = 0;
|
|
static pthread_mutex_t init_mutex;
|
|
static time_t load_conf_time = 0;
|
|
static pthread_mutex_t load_conf_mutex;
|
|
|
|
void io_init(void)
|
|
{
|
|
void* handle = dlopen("libaio.so", RTLD_LAZY);
|
|
*(void**)&real_io_getevents = dlsym(handle, "io_getevents");
|
|
*(void**)&real_io_submit = dlsym(handle, "io_submit");
|
|
printf("init read aio func succeed\n");
|
|
}
|
|
|
|
void check_init()
|
|
{
|
|
pthread_mutex_lock(&init_mutex);
|
|
if (0 == is_inited) {
|
|
io_init();
|
|
is_inited = 1;
|
|
}
|
|
pthread_mutex_unlock(&init_mutex);
|
|
}
|
|
|
|
void load_conf()
|
|
{
|
|
int fd = open("aio_conf", O_RDONLY);
|
|
char buf[64];
|
|
memset(buf, 0, sizeof(buf));
|
|
read(fd, buf, sizeof(buf));
|
|
close(fd);
|
|
sscanf(
|
|
buf, "%d,%d,%d,%d", &aio_conf.work_fd_, &aio_conf.io_submit_failed_, &aio_conf.io_hang_, &aio_conf.io_timeout_);
|
|
printf("init read aio func succeed, %s\n", buf);
|
|
}
|
|
|
|
void check_load_conf()
|
|
{
|
|
pthread_mutex_lock(&load_conf_mutex);
|
|
time_t now = time(NULL);
|
|
if (now - load_conf_time > 5) {
|
|
load_conf();
|
|
load_conf_time = now;
|
|
}
|
|
pthread_mutex_unlock(&load_conf_mutex);
|
|
}
|
|
int io_submit(io_context_t ctx_id, long nr, struct iocb** iocbpp)
|
|
{
|
|
int ret = 0;
|
|
int is_triggered = 0;
|
|
check_init();
|
|
check_load_conf();
|
|
if (iocbpp[0]->aio_fildes == aio_conf.work_fd_) {
|
|
is_triggered = 1;
|
|
}
|
|
|
|
if (!is_triggered || !aio_conf.io_submit_failed_) {
|
|
ret = (*real_io_submit)(ctx_id, nr, iocbpp);
|
|
} else {
|
|
ret = EAGAIN;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int io_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event* events, struct timespec* timeout)
|
|
{
|
|
int ret = 0;
|
|
int is_triggered = 0;
|
|
int nevent = 0;
|
|
check_init();
|
|
check_load_conf();
|
|
nevent = (*real_io_getevents)(ctx_id, min_nr, nr, events, timeout);
|
|
if (nevent < 0) {
|
|
printf("real_io_getevents failed\n");
|
|
} else if (0 == nevent) {
|
|
// printf("real_io_getevents get nothing\n");
|
|
} else {
|
|
struct iocb* cb = events[0].obj;
|
|
is_triggered = aio_conf.work_fd_ == cb->aio_fildes;
|
|
if (!is_triggered) {
|
|
ret = nevent;
|
|
} else if (aio_conf.io_hang_) {
|
|
ret = 0; // 0 event finish
|
|
} else if (aio_conf.io_timeout_) {
|
|
this_routine::usleep(10 * 1000 * 1000); // sleep 10s
|
|
printf("finish io_getevents");
|
|
ret = nevent;
|
|
}
|
|
}
|
|
return ret;
|
|
}
|