131 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			131 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| /**
 | |
|  * Copyright (c) 2021 OceanBase
 | |
|  * OceanBase CE is licensed under Mulan PubL v2.
 | |
|  * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | |
|  * You may obtain a copy of Mulan PubL v2 at:
 | |
|  *          http://license.coscl.org.cn/MulanPubL-2.0
 | |
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | |
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | |
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | |
|  * See the Mulan PubL v2 for more details.
 | |
|  */
 | |
| 
 | |
| #define _GNU_SOURCE 1
 | |
| #include <unistd.h>
 | |
| #include <stdio.h>
 | |
| #include <errno.h>
 | |
| #include <dlfcn.h>
 | |
| #include <libaio.h>
 | |
| #include <libconfig.h>
 | |
| #include <stdlib.h>
 | |
| #include <signal.h>
 | |
| #include <pthread.h>
 | |
| #include <sys/stat.h>
 | |
| #include <sys/time.h>
 | |
| #include <sys/types.h>
 | |
| #include <sys/syscall.h>
 | |
| #include <fcntl.h>
 | |
| 
 | |
| static int (*real_io_getevents)(io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);
 | |
| static int (*real_io_submit)(io_context_t ctx, long nr, struct iocb **iocbpp);
 | |
| 
 | |
| struct aio_conf_t
 | |
| {
 | |
|   int work_fd_;
 | |
|   int io_submit_failed_;
 | |
|   int io_hang_;
 | |
|   int io_timeout_;
 | |
| } aio_conf_t;
 | |
| 
 | |
| static struct aio_conf_t aio_conf;
 | |
| static int is_inited = 0;
 | |
| static pthread_mutex_t init_mutex;
 | |
| static time_t load_conf_time = 0;
 | |
| static pthread_mutex_t load_conf_mutex;
 | |
| 
 | |
| void io_init(void)
 | |
| {
 | |
|   void *handle = dlopen("libaio.so", RTLD_LAZY);
 | |
|   *(void**) &real_io_getevents = dlsym(handle, "io_getevents");
 | |
|   *(void**) &real_io_submit = dlsym(handle, "io_submit");
 | |
|   printf("init read aio func succeed\n");
 | |
| }
 | |
| 
 | |
| void check_init()
 | |
| {
 | |
|   pthread_mutex_lock(&init_mutex);
 | |
|   if (0 == is_inited) {
 | |
|     io_init();
 | |
|     is_inited = 1;
 | |
|   }
 | |
|   pthread_mutex_unlock(&init_mutex);
 | |
| }
 | |
| 
 | |
| void load_conf()
 | |
| {
 | |
|   int fd = open("aio_conf", O_RDONLY);
 | |
|   char buf[64];
 | |
|   memset(buf, 0, sizeof(buf));
 | |
|   read(fd, buf, sizeof(buf));
 | |
|   close(fd);
 | |
|   sscanf(buf, "%d,%d,%d,%d", &aio_conf.work_fd_, &aio_conf.io_submit_failed_, &aio_conf.io_hang_, &aio_conf.io_timeout_);
 | |
|   printf("init read aio func succeed, %s\n", buf);
 | |
| }
 | |
| 
 | |
| void check_load_conf()
 | |
| {
 | |
|   pthread_mutex_lock(&load_conf_mutex);
 | |
|   time_t now = time(NULL);
 | |
|   if (now - load_conf_time > 5) {
 | |
|     load_conf();
 | |
|     load_conf_time = now;
 | |
|   }
 | |
|   pthread_mutex_unlock(&load_conf_mutex);
 | |
| 
 | |
| }
 | |
| int io_submit(io_context_t ctx_id, long nr, struct iocb **iocbpp)
 | |
| {
 | |
|   int ret = 0;
 | |
|   int is_triggered = 0;
 | |
|   check_init();
 | |
|   check_load_conf();
 | |
|   if (iocbpp[0]->aio_fildes == aio_conf.work_fd_) {
 | |
|     is_triggered = 1;
 | |
|   }
 | |
| 
 | |
|   if (!is_triggered || !aio_conf.io_submit_failed_) {
 | |
|     ret = (*real_io_submit)(ctx_id, nr, iocbpp);
 | |
|   } else {
 | |
|     ret = EAGAIN;
 | |
|   }
 | |
|   return ret;
 | |
| }
 | |
| 
 | |
| int io_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout)
 | |
| {
 | |
|   int ret = 0;
 | |
|   int is_triggered = 0;
 | |
|   int nevent = 0;
 | |
|   check_init();
 | |
|   check_load_conf();
 | |
|   nevent = (*real_io_getevents)(ctx_id, min_nr, nr, events, timeout);
 | |
|   if (nevent < 0) {
 | |
|     printf("real_io_getevents failed\n");
 | |
|   } else if (0 == nevent) {
 | |
|     //printf("real_io_getevents get nothing\n");
 | |
|   } else {
 | |
|     struct iocb *cb = events[0].obj;
 | |
|     is_triggered = aio_conf.work_fd_ == cb->aio_fildes;
 | |
|     if (!is_triggered) {
 | |
|       ret = nevent;
 | |
|     } else if (aio_conf.io_hang_) {
 | |
|       ret = 0; // 0 event finish
 | |
|     } else if (aio_conf.io_timeout_) {
 | |
|       ::usleep(10 * 1000 * 1000); // sleep 10s
 | |
|       printf("finish io_getevents");
 | |
|       ret = nevent;
 | |
|     }
 | |
|   }
 | |
|   return ret;
 | |
| }
 | 
