diff --git a/log_manager/log_manager.cc b/log_manager/log_manager.cc index 0e9213d7e..984f24cab 100644 --- a/log_manager/log_manager.cc +++ b/log_manager/log_manager.cc @@ -328,8 +328,6 @@ bool skygw_logmanager_init( { bool succp = false; - ss_dfprintf(stderr, ">> skygw_logmanager_init\n"); - acquire_lock(&lmlock); if (lm != NULL) { @@ -342,7 +340,6 @@ bool skygw_logmanager_init( return_succp: release_lock(&lmlock); - ss_dfprintf(stderr, "<< skygw_logmanager_init\n"); return succp; } @@ -422,8 +419,6 @@ void skygw_logmanager_exit(void) */ void skygw_logmanager_done(void) { - ss_dfprintf(stderr, ">> skygw_logmanager_done\n"); - acquire_lock(&lmlock); if (lm == NULL) { @@ -452,8 +447,6 @@ void skygw_logmanager_done(void) return_void: release_lock(&lmlock); - - ss_dfprintf(stderr, "<< skygw_logmanager_done\n"); } static logfile_t* logmanager_get_logfile( @@ -1319,12 +1312,25 @@ static bool fnames_conf_init( fn->fn_logpath = (fn->fn_logpath == NULL) ? strdup(get_logpath_default()) : fn->fn_logpath; - ss_dfprintf(stderr, "Command line : "); + /* ss_dfprintf(stderr, "\n\n\tCommand line : "); for (i=0; ifn_logpath, + fn->fn_err_prefix, + fn->fn_err_suffix, + fn->fn_msg_prefix, + fn->fn_msg_suffix, + fn->fn_trace_prefix, + fn->fn_trace_suffix); + succp = true; fn->fn_state = RUN; CHK_FNAMES_CONF(fn); diff --git a/server/core/dcb.c b/server/core/dcb.c index 6d74a7739..0b458f28c 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -426,6 +426,7 @@ dcb_connect(SERVER *server, SESSION *session, const char *protocol) DCB *dcb; GWPROTOCOL *funcs; int fd; +int rc; if ((dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER)) == NULL) { @@ -483,7 +484,7 @@ int fd; session->client, session->client->fd); } - ss_dassert(dcb->fd == -1); + ss_dassert(dcb->fd == -1); /**< must be uninitialized at this point */ /** * Successfully connected to backend. Assign file descriptor to dcb */ @@ -505,7 +506,13 @@ int fd; /** * Add the dcb in the poll set */ - poll_add_dcb(dcb); + rc = poll_add_dcb(dcb); + + if (rc == -1) { + dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL); + dcb_final_free(dcb); + return NULL; + } return dcb; } @@ -518,7 +525,8 @@ int fd; * * @param dcb The DCB to read from * @param head Pointer to linked list to append data to - * @return -1 on error, otherwise the number of read bytes on the last. 0 is returned if no data available. + * @return -1 on error, otherwise the number of read bytes on the last. + * 0 is returned if no data available. * iteration of while loop. */ int @@ -694,15 +702,30 @@ int w, saved_errno = 0; if (w < 0) { - skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Write to dcb %p in " - "state %s fd %d failed due errno %d, %s", - dcb, - STRDCBSTATE(dcb->state), - dcb->fd, - saved_errno, - strerror(saved_errno)); + if (saved_errno == EPIPE) { + skygw_log_write( + LOGFILE_TRACE, + "%lu [dcb_write] Write to dcb " + "%p in state %s fd %d failed " + "due errno %d, %s", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state), + dcb->fd, + saved_errno, + strerror(saved_errno)); + } else { + skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Write to dcb %p in " + "state %s fd %d failed due " + "errno %d, %s", + dcb, + STRDCBSTATE(dcb->state), + dcb->fd, + saved_errno, + strerror(saved_errno)); + } break; } @@ -836,10 +859,17 @@ dcb_close(DCB *dcb) int rc; CHK_DCB(dcb); + ss_dassert(dcb->state == DCB_STATE_POLLING || + dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE); + /** * Stop dcb's listening and modify state accordingly. */ rc = poll_remove_dcb(dcb); + + ss_dassert(dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE); if (rc == 0) { skygw_log_write( @@ -850,13 +880,15 @@ dcb_close(DCB *dcb) dcb, STRDCBSTATE(dcb->state)); } else { - skygw_log_write_flush( - LOGFILE_ERROR, - "Error : Removing dcb %p in state %s from " - "poll set failed.", - dcb, - STRDCBSTATE(dcb->state)); + skygw_log_write( + LOGFILE_ERROR, + "%lu [dcb_close] Error : Removing dcb %p in state %s from " + "poll set failed.", + pthread_self(), + dcb, + STRDCBSTATE(dcb->state)); } + if (dcb->state == DCB_STATE_NOPOLLING) { dcb_add_to_zombieslist(dcb); } diff --git a/server/core/gateway.c b/server/core/gateway.c index d101a971a..508a266ba 100644 --- a/server/core/gateway.c +++ b/server/core/gateway.c @@ -97,7 +97,7 @@ static bool libmysqld_started = FALSE; static void log_flush_shutdown(void); static void log_flush_cb(void* arg); static void libmysqld_done(void); - +static bool file_write_header(FILE* outfile); /** * Handler for SIGHUP signal. Reload the configuration for the * gateway. @@ -126,9 +126,10 @@ sigint_handler (int i) skygw_log_write_flush( LOGFILE_ERROR, - "Error : Signal SIGINT %i received ...Exiting!", i); + "Error : Signal SIGINT %i received ...Exiting!", + i); shutdown_gateway(); - fprintf(stderr, "Shuting down MaxScale\n"); + fprintf(stderr, "\n\nShutting down MaxScale\n\n"); } /* wrapper for sigaction */ @@ -210,6 +211,60 @@ return_home: } #endif + +static bool file_write_header( + FILE* outfile) +{ + bool succp = false; + size_t wbytes1; + size_t wbytes2; + size_t wbytes3; + size_t len1; + size_t len2; + size_t len3; + const char* header_buf1; + char* header_buf2 = NULL; + const char* header_buf3; + time_t* t; + struct tm* tm; + + t = (time_t *)malloc(sizeof(time_t)); + tm = (struct tm *)malloc(sizeof(struct tm)); + *t = time(NULL); + *tm = *localtime(t); + + header_buf1 = "\n\nSkySQL MaxScale\t"; + header_buf2 = strdup(asctime(tm)); + header_buf3 = "------------------------------------------------------\n"; + + if (header_buf2 == NULL) { + goto return_succp; + } + + len1 = strlen(header_buf1); + len2 = strlen(header_buf2); + len3 = strlen(header_buf3); +#if defined(LAPTOP_TEST) + usleep(DISKWRITE_LATENCY); +#else + wbytes1=fwrite((void*)header_buf1, len1, 1, outfile); + wbytes2=fwrite((void*)header_buf2, len2, 1, outfile); + wbytes3=fwrite((void*)header_buf3, len3, 1, outfile); +#endif + + succp = true; +return_succp: + if (header_buf2 != NULL) { + free(header_buf2); + } + free(t); + free(tm); + return succp; +} + + + + /** * The main entry point into the gateway * @@ -219,59 +274,66 @@ return_home: int main(int argc, char **argv) { -int daemon_mode = 1; -sigset_t sigset; -int i, n, n_threads, n_services; -void **threads; -char mysql_home[1024], buf[1024], *home, *cnf_file = NULL; -char ddopt[1024]; -void* log_flush_thr = NULL; -ssize_t log_flush_timeout_ms = 0; -int l; + int daemon_mode = 1; + sigset_t sigset; + int i, n, n_threads, n_services; + void **threads; + char mysql_home[1024], buf[1024], *home, *cnf_file = NULL; + char ddopt[1024]; + void* log_flush_thr = NULL; + ssize_t log_flush_timeout_ms = 0; + int l; + sigset_t sigpipe_mask; + sigset_t saved_mask; + + sigemptyset(&sigpipe_mask); + sigaddset(&sigpipe_mask, SIGPIPE); #if defined(SS_DEBUG) -memset(conn_open, 0, sizeof(bool)*1024); -memset(dcb_fake_write_errno, 0, sizeof(unsigned char)*1024); -memset(dcb_fake_write_ev, 0, sizeof(__int32_t)*1024); -fail_next_backend_fd = false; -fail_next_client_fd = false; -fail_next_accept = 0; -fail_accept_errno = 0; + memset(conn_open, 0, sizeof(bool)*1024); + memset(dcb_fake_write_errno, 0, sizeof(unsigned char)*1024); + memset(dcb_fake_write_ev, 0, sizeof(__int32_t)*1024); + fail_next_backend_fd = false; + fail_next_client_fd = false; + fail_next_accept = 0; + fail_accept_errno = 0; #endif + file_write_header(stderr); + l = atexit(skygw_logmanager_exit); if (l != 0) { - fprintf(stderr, "Couldn't register exit function.\n"); + fprintf(stderr, "Couldn't register exit function.\n"); } atexit(datadir_cleanup); for (n = 0; n < argc; n++) { - if (strcmp(argv[n], "-d") == 0) - { - /** Debug mode, maxscale runs in this same process */ - daemon_mode = 0; - } - /** - * 1. Resolve config file location from command-line argument. - */ - if (strncmp(argv[n], "-c", 2) == 0) - { - int s=2; + if (strcmp(argv[n], "-d") == 0) + { + /** Debug mode, maxscale runs in this same process */ + daemon_mode = 0; + } + /** + * 1. Resolve config file location from command-line argument. + */ + if (strncmp(argv[n], "-c", 2) == 0) + { + int s=2; - while (argv[n][s] == 0 && s<10) s++; + while (argv[n][s] == 0 && s<10) s++; - if (s==10) { - skygw_log_write_flush( - LOGFILE_ERROR, - "Fatal : Unable to find a MaxScale " - "configuration file, either install one in " - "/etc/MaxScale.cnf, " - "$MAXSCALE_HOME/etc/MaxScale.cnf " - "or use the -c option. Exiting."); - } - cnf_file = &argv[n][s]; - } + if (s==10) { + skygw_log_write_flush( + LOGFILE_ERROR, + "Fatal : Unable to find a MaxScale " + "configuration file, either install one in " + "/etc/MaxScale.cnf, " + "$MAXSCALE_HOME/etc/MaxScale.cnf " + "or use the -c option. Exiting."); + } + cnf_file = &argv[n][s]; + } } /** @@ -280,41 +342,45 @@ fail_accept_errno = 0; */ if (daemon_mode == 1) { - if (sigfillset(&sigset) != 0) { - skygw_log_write_flush( - LOGFILE_ERROR, - "Error : sigfillset() error %s", - strerror(errno)); - return 1; - } + if (sigfillset(&sigset) != 0) { + skygw_log_write_flush( + LOGFILE_ERROR, + "Error : sigfillset() error %s", + strerror(errno)); + return 1; + } - if (sigdelset(&sigset, SIGHUP) != 0) { - skygw_log_write_flush( - LOGFILE_ERROR, - "Error : sigdelset(SIGHUP) error %s", - strerror(errno)); - } + if (sigdelset(&sigset, SIGHUP) != 0) { + skygw_log_write_flush( + LOGFILE_ERROR, + "Error : sigdelset(SIGHUP) error %s", + strerror(errno)); + } - if (sigdelset(&sigset, SIGTERM) != 0) { - skygw_log_write_flush( - LOGFILE_ERROR, - "Error : sigdelset(SIGTERM) error %s", - strerror(errno)); - } + if (sigdelset(&sigset, SIGTERM) != 0) { + skygw_log_write_flush( + LOGFILE_ERROR, + "Error : sigdelset(SIGTERM) error %s", + strerror(errno)); + } - if (sigprocmask(SIG_SETMASK, &sigset, NULL) != 0) { - skygw_log_write_flush( - LOGFILE_ERROR, - "Error : sigprocmask() error %s", - strerror(errno)); - } - gw_daemonize(); + if (sigprocmask(SIG_SETMASK, &sigset, NULL) != 0) { + skygw_log_write_flush( + LOGFILE_ERROR, + "Error : sigprocmask() error %s", + strerror(errno)); + } + gw_daemonize(); } - signal_set(SIGHUP, sighup_handler); - signal_set(SIGTERM, sigterm_handler); - signal_set(SIGINT, sigint_handler); + signal_set(SIGHUP, sighup_handler); + signal_set(SIGTERM, sigterm_handler); + signal_set(SIGINT, sigint_handler); + if (pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &saved_mask) == -1) { + perror("pthread_sigmask"); + exit(1); + } l = atexit(libmysqld_done); if (l != 0) { @@ -326,37 +392,37 @@ fail_accept_errno = 0; if ((home = getenv("MAXSCALE_HOME")) != NULL) { - if (access(home, R_OK) != 0) - { - fprintf(stderr, - "The configured value of MAXSCALE_HOME '%s' does not " - "exist.\n", - home); - skygw_log_write_flush( - LOGFILE_ERROR, - "Fatal : The configured value of MAXSCALE_HOME '%s' does " - "not exist.", - home); - exit(1); - } - sprintf(mysql_home, "%s/mysql", home); - setenv("MYSQL_HOME", mysql_home, 1); - /** - * 2. Resolve config file location from $MAXSCALE_HOME/etc. - */ - if (cnf_file == NULL) { - sprintf(buf, "%s/etc/MaxScale.cnf", home); - if (access(buf, R_OK) == 0) { - cnf_file = buf; - } - } + if (access(home, R_OK) != 0) + { + fprintf(stderr, + "The configured value of MAXSCALE_HOME '%s' does not " + "exist.\n", + home); + skygw_log_write_flush( + LOGFILE_ERROR, + "Fatal : The configured value of MAXSCALE_HOME '%s' does " + "not exist.", + home); + exit(1); + } + sprintf(mysql_home, "%s/mysql", home); + setenv("MYSQL_HOME", mysql_home, 1); + /** + * 2. Resolve config file location from $MAXSCALE_HOME/etc. + */ + if (cnf_file == NULL) { + sprintf(buf, "%s/etc/MaxScale.cnf", home); + if (access(buf, R_OK) == 0) { + cnf_file = buf; + } + } } /** * If not done yet, * 3. Resolve config file location from /etc/MaxScale. */ if (cnf_file == NULL && access("/etc/MaxScale.cnf", R_OK) == 0) - cnf_file = "/etc/MaxScale.cnf"; + cnf_file = "/etc/MaxScale.cnf"; /* * Set a data directory for the mysqld library, we use @@ -366,61 +432,61 @@ fail_accept_errno = 0; */ if (home) { - sprintf(datadir, "%s/data%d", home, getpid()); - mkdir(datadir, 0777); + sprintf(datadir, "%s/data%d", home, getpid()); + mkdir(datadir, 0777); } else { - sprintf(datadir, "/tmp/MaxScale/data%d", getpid()); - mkdir("/tmp/MaxScale", 0777); - mkdir(datadir, 0777); + sprintf(datadir, "/tmp/MaxScale/data%d", getpid()); + mkdir("/tmp/MaxScale", 0777); + mkdir(datadir, 0777); } - /* - * If $MAXSCALE_HOME is set then write the logs into $MAXSCALE_HOME/log. - * The skygw_logmanager_init expects to take arguments as passed to main - * and proesses them with getopt, therefore we need to give it a dummy - * argv[0] - */ - if (home) - { - char buf[1024]; - char *argv[4]; + /* + * If $MAXSCALE_HOME is set then write the logs into $MAXSCALE_HOME/log. + * The skygw_logmanager_init expects to take arguments as passed to main + * and proesses them with getopt, therefore we need to give it a dummy + * argv[0] + */ + if (home) + { + char buf[1024]; + char *argv[4]; - sprintf(buf, "%s/log", home); - mkdir(buf, 0777); - argv[0] = "MaxScale"; - argv[1] = "-g"; - argv[2] = buf; - argv[3] = NULL; - skygw_logmanager_init(3, argv); - } + sprintf(buf, "%s/log", home); + mkdir(buf, 0777); + argv[0] = "MaxScale"; + argv[1] = "-g"; + argv[2] = buf; + argv[3] = NULL; + skygw_logmanager_init(3, argv); + } - if (cnf_file == NULL) { - skygw_log_write_flush( - LOGFILE_ERROR, - "Fatal : Unable to find a MaxScale configuration " + if (cnf_file == NULL) { + skygw_log_write_flush( + LOGFILE_ERROR, + "Fatal : Unable to find a MaxScale configuration " "file, either install one in /etc/MaxScale.cnf, " "$MAXSCALE_HOME/etc/MaxScale.cnf " - "or use the -c option. Exiting."); - fprintf(stderr, "Unable to find MaxScale configuration file. " + "or use the -c option. Exiting."); + fprintf(stderr, "Unable to find MaxScale configuration file. " "Exiting.\n"); - exit(1); - } + exit(1); + } - /* Update the server options */ - for (i = 0; server_options[i]; i++) - { - if (!strcmp(server_options[i], "--datadir=")) - { - sprintf(ddopt, "--datadir=%s", datadir); - server_options[i] = ddopt; - } - } + /* Update the server options */ + for (i = 0; server_options[i]; i++) + { + if (!strcmp(server_options[i], "--datadir=")) + { + sprintf(ddopt, "--datadir=%s", datadir); + server_options[i] = ddopt; + } + } - if (mysql_library_init(num_elements, server_options, server_groups)) - { - skygw_log_write_flush( + if (mysql_library_init(num_elements, server_options, server_groups)) + { + skygw_log_write_flush( LOGFILE_ERROR, "Fatal : mysql_library_init failed. It is a " "mandatory component, required by router services and " @@ -428,50 +494,50 @@ fail_accept_errno = 0; mysql_error(NULL), __FILE__, __LINE__); - fprintf(stderr, + fprintf(stderr, "Failed to initialise the MySQL library. Exiting.\n"); - exit(1); - } + exit(1); + } libmysqld_started = TRUE; - if (!config_load(cnf_file)) - { - skygw_log_write_flush( + if (!config_load(cnf_file)) + { + skygw_log_write_flush( LOGFILE_ERROR, "Fatal : Failed to load MaxScale configuration file %s. " "Exiting.", cnf_file); - fprintf(stderr, + fprintf(stderr, "Failed to load MaxScale configuration file. " "Exiting.\n"); - exit(1); - } + exit(1); + } - skygw_log_write( + skygw_log_write( LOGFILE_MESSAGE, - "SkySQL MaxScale (C) SkySQL Ab 2013"); - skygw_log_write( + "SkySQL MaxScale (C) SkySQL Ab 2013"); + skygw_log_write( LOGFILE_MESSAGE, - "MaxScale is starting, PID %i", - getpid()); + "MaxScale is starting, PID %i", + getpid()); - poll_init(); + poll_init(); - /* - * Start the services that were created above - */ + /* + * Start the services that were created above + */ n_services = serviceStartAll(); - if (n_services == 0) - { - skygw_log_write_flush( + if (n_services == 0) + { + skygw_log_write_flush( LOGFILE_ERROR, "Fatal : Failed to start any MaxScale services. " "Exiting."); - fprintf(stderr, + fprintf(stderr, "Failed to start any MaxScale services. Exiting.\n"); - exit(1); - } - skygw_log_write( + exit(1); + } + skygw_log_write( LOGFILE_MESSAGE, "Started %d services succesfully.", n_services); @@ -481,17 +547,17 @@ fail_accept_errno = 0; */ log_flush_timeout_ms = 1000; log_flush_thr = thread_start(log_flush_cb, (void *)&log_flush_timeout_ms); - /* - * Start the polling threads, note this is one less than is - * configured as the main thread will also poll. - */ - n_threads = config_threadcount(); - threads = (void **)calloc(n_threads, sizeof(void *)); - for (n = 0; n < n_threads - 1; n++) - threads[n] = thread_start(poll_waitevents, (void *)(n + 1)); - poll_waitevents((void *)0); - for (n = 0; n < n_threads - 1; n++) - thread_wait(threads[n]); + /* + * Start the polling threads, note this is one less than is + * configured as the main thread will also poll. + */ + n_threads = config_threadcount(); + threads = (void **)calloc(n_threads, sizeof(void *)); + for (n = 0; n < n_threads - 1; n++) + threads[n] = thread_start(poll_waitevents, (void *)(n + 1)); + poll_waitevents((void *)0); + for (n = 0; n < n_threads - 1; n++) + thread_wait(threads[n]); free(threads); @@ -500,26 +566,26 @@ fail_accept_errno = 0; */ thread_wait(log_flush_thr); - /* Stop all the monitors */ - monitorStopAll(); + /* Stop all the monitors */ + monitorStopAll(); - skygw_log_write( - LOGFILE_MESSAGE, - "MaxScale shutdown, PID %i\n", - getpid()); + skygw_log_write( + LOGFILE_MESSAGE, + "MaxScale shutdown, PID %i\n", + getpid()); - datadir_cleanup(); + datadir_cleanup(); - return 0; + return 0; } // End of main /** * Shutdown the gateway */ void -shutdown_gateway() + shutdown_gateway() { - poll_shutdown(); + poll_shutdown(); log_flush_shutdown(); } diff --git a/server/core/poll.c b/server/core/poll.c index d1c9375df..248612e10 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -171,6 +171,11 @@ poll_remove_dcb(DCB *dcb) /** It is possible that dcb has already been removed from the set */ if (dcb->state != DCB_STATE_POLLING) { + if (dcb->state == DCB_STATE_NOPOLLING || + dcb->state == DCB_STATE_ZOMBIE) + { + rc = 0; + } goto return_rc; } @@ -370,26 +375,55 @@ poll_waitevents(void *arg) } if (ev & EPOLLHUP) { + int eno = 0; + eno = gw_getsockerrno(dcb->fd); + + skygw_log_write( + LOGFILE_TRACE, + "%lu [poll_waitevents] " + "EPOLLHUP on dcb %p, fd %d. " + "Errno %d, %s.", + pthread_self(), + dcb, + dcb->fd, + eno, + strerror(eno)); atomic_add(&pollStats.n_hup, 1); dcb->func.hangup(dcb); } if (ev & EPOLLOUT) { int eno = 0; - simple_mutex_lock(&dcb->dcb_write_lock, - true); eno = gw_getsockerrno(dcb->fd); - ss_dassert(eno == 0); - ss_info_dassert(!dcb->dcb_write_active, + + if (eno == 0) { + simple_mutex_lock( + &dcb->dcb_write_lock, + true); + ss_info_dassert( + !dcb->dcb_write_active, "Write already active"); - dcb->dcb_write_active = TRUE; - atomic_add(&pollStats.n_write, 1); - dcb->func.write_ready(dcb); - dcb->dcb_write_active = FALSE; - simple_mutex_unlock(&dcb->dcb_write_lock); - } - if (ev & EPOLLIN) - { + dcb->dcb_write_active = TRUE; + atomic_add(&pollStats.n_write, 1); + dcb->func.write_ready(dcb); + dcb->dcb_write_active = FALSE; + simple_mutex_unlock( + &dcb->dcb_write_lock); + } else { + skygw_log_write( + LOGFILE_TRACE, + "%lu [poll_waitevents] " + "EPOLLOUT due %d, %s. " + "dcb %p, fd %i", + pthread_self(), + eno, + strerror(eno), + dcb, + dcb->fd); + } + } + if (ev & EPOLLIN) + { simple_mutex_lock(&dcb->dcb_read_lock, true); ss_info_dassert(!dcb->dcb_read_active, diff --git a/server/modules/include/readconnection.h b/server/modules/include/readconnection.h index f4fad3447..29f1316c3 100644 --- a/server/modules/include/readconnection.h +++ b/server/modules/include/readconnection.h @@ -46,7 +46,7 @@ typedef struct backend { */ typedef struct router_client_session { BACKEND *backend; /**< Backend used by the client session */ - DCB *backend_dcb; /**< DCB Connection to the backend */ + DCB *backend_dcb; /**< DCB Connection to the backend */ struct router_client_session *next; } ROUTER_CLIENT_SES; @@ -67,7 +67,7 @@ typedef struct router_instance { SERVICE *service; /**< Pointer to the service using this router */ ROUTER_CLIENT_SES *connections; /**< Link list of all the client connections */ SPINLOCK lock; /**< Spinlock for the instance data */ - BACKEND **servers; /**< The set of backend servers for this router*/ + BACKEND **servers; /**< List of backend servers */ unsigned int bitmask; /**< Bitmask to apply to server->status */ unsigned int bitvalue; /**< Required value of server->status */ ROUTER_STATS stats; /**< Statistics for this router */ diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 56e43860e..20d5e4c1d 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -31,28 +31,26 @@ #include -typedef struct client_session CLIENT_SESSION; -typedef struct instance INSTANCE; /** * Internal structure used to define the set of backend servers we are routing * connections to. This provides the storage for routing module specific data * that is required for each of the backend servers. */ typedef struct backend { - SERVER* server; /**< The server itself */ - int count; /**< Number of connections to the server */ + SERVER* backend_server; /**< The server itself */ + int backend_conn_count; /**< Number of connections to the server */ } BACKEND; /** * The client session structure used within this router. */ -struct client_session { - BACKEND* slave; /**< Slave used by the client session */ - BACKEND* master; /**< Master used by the client session */ - DCB* slaveconn; /**< Slave connection */ - DCB* masterconn; /**< Master connection */ - CLIENT_SESSION* next; -}; +typedef struct router_client_session { + BACKEND* be_slave; /**< Slave backend used by client session */ + BACKEND* be_master; /**< Master backend used by client session */ + DCB* slave_dcb; /**< Slave connection */ + DCB* master_dcb; /**< Master connection */ + struct router_client_session* next; +} ROUTER_CLIENT_SES; /** * The statistics for this router instance @@ -60,24 +58,26 @@ struct client_session { typedef struct { int n_sessions; /**< Number sessions created */ int n_queries; /**< Number of queries forwarded */ - int n_master; /**< Number of statements sent to master */ - int n_slave; /**< Number of statements sent to slave */ - int n_all; /**< Number of statements sent to all */ + int n_master; /**< Number of stmts sent to master */ + int n_slave; /**< Number of stmts sent to slave */ + int n_all; /**< Number of stmts sent to all */ } ROUTER_STATS; /** * The per instance data for the router. */ -struct instance { - SERVICE* service; /**< Pointer to the service using this router */ - CLIENT_SESSION* connections; /**< Link list of all the client connections */ - SPINLOCK lock; /**< Spinlock for the instance data */ - BACKEND** servers; /**< The set of backend servers for this instance */ - BACKEND* master; /**< NULL if not known, pointer otherwise */ - ROUTER_STATS stats; /**< Statistics for this router */ - INSTANCE* next; -}; +typedef struct router_instance { + SERVICE* service; /**< Pointer to service */ + ROUTER_CLIENT_SES* connections; /**< List of client connections */ + SPINLOCK lock; /**< Lock for the instance data */ + BACKEND** servers; /**< Backend servers */ + BACKEND* master; /**< NULL or pointer */ + unsigned int bitmask; /**< Bitmask to apply to server->status */ + unsigned int bitvalue; /**< Required value of server->status */ + ROUTER_STATS stats; /**< Statistics for this router */ + struct router_instance* next; /**< Next router on the list */ +} ROUTER_INSTANCE; #endif diff --git a/server/modules/protocol/httpd.c b/server/modules/protocol/httpd.c index f17bff33c..4798b26b0 100644 --- a/server/modules/protocol/httpd.c +++ b/server/modules/protocol/httpd.c @@ -89,7 +89,6 @@ version() void ModuleInit() { - fprintf(stderr, "Initialise HTTPD Protocol module.\n"); } /** @@ -369,6 +368,7 @@ struct sockaddr_in addr; char *port; int one = 1; short pnum; +int rc; memcpy(&listener->func, &MyObject, sizeof(GWPROTOCOL)); @@ -403,8 +403,24 @@ short pnum; { return 0; } - listen(listener->fd, SOMAXCONN); + rc = listen(listener->fd, SOMAXCONN); + + if (rc == 0) { + fprintf(stderr, + "Listening http connections at %s\n", + config); + } else { + int eno = errno; + errno = 0; + fprintf(stderr, + "\n* Failed to start listening http due error %d, %s\n\n", + eno, + strerror(eno)); + return 0; + } + + if (poll_add_dcb(listener) == -1) { return 0; diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 9e68ebeac..7c3bfeac7 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -90,12 +90,6 @@ version() void ModuleInit() { -#if defined(SS_DEBUG) - skygw_log_write( - LOGFILE_MESSAGE, - strdup("Initial MySQL Backend Protcol module.")); -#endif - fprintf(stderr, "Initial MySQL Backend Protcol module.\n"); } /* @@ -187,17 +181,17 @@ static int gw_read_backend_event(DCB *dcb) { current_session->client_sha1, backend_protocol) != 0) { - backend_protocol->state = MYSQL_AUTH_FAILED; + ss_dassert(backend_protocol->state == MYSQL_AUTH_FAILED); rc = 1; } else { /** * next step is to wait server's response with * a new EPOLLIN event */ - backend_protocol->state = MYSQL_AUTH_RECV; - rc = 0; + ss_dassert(backend_protocol->state == MYSQL_AUTH_RECV); + rc = 0; goto return_rc; - } + } } } /* diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index ccae0892b..d83f15a4c 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -83,7 +83,6 @@ version() void ModuleInit() { - fprintf(stderr, "Initialise MySQL Client Protocol module.\n"); } /** @@ -380,7 +379,7 @@ static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue) { */ // now get the user strcpy(username, (char *)(client_auth_packet + 4 + 4 + 4 + 1 + 23)); - fprintf(stderr, "<<< Client username is [%s]\n", username); + /* fprintf(stderr, "<<< Client username is [%s]\n", username); */ // get the auth token len memcpy(&auth_token_len, @@ -392,9 +391,9 @@ static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue) { strcpy(database, (char *)(client_auth_packet + 4 + 4 + 4 + 1 + 23 + strlen(username) + 1 + 1 + auth_token_len)); - fprintf(stderr, "<<< Client selected db is [%s]\n", database); + /* fprintf(stderr, "<<< Client selected db is [%s]\n", database); */ } else { - fprintf(stderr, "<<< Client is NOT connected with db\n"); + /* fprintf(stderr, "<<< Client is NOT connected with db\n"); */ } // allocate memory for token only if auth_token_len > 0 @@ -832,6 +831,7 @@ int gw_MySQLListener( char address[1024] = ""; int port = 0; int one = 1; + int rc; /* this gateway, as default, will bind on port 4404 for localhost only */ if (config_bind != NULL) { @@ -862,13 +862,15 @@ int gw_MySQLListener( sprintf(address, "0.0.0.0"); } serv_addr.sin_port = htons(port); - + // socket create if ((l_so = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - fprintf(stderr, - ">>> Error: can't open listening socket. Errno %i, %s\n", - errno, strerror(errno)); - return 0; + fprintf(stderr, + "\n* Error: can't open listening socket due " + "error %i, %s.\n\n\t", + errno, + strerror(errno)); + return 0; } // socket options setsockopt(l_so, SOL_SOCKET, SO_REUSEADDR, (char *)&one, sizeof(one)); @@ -879,32 +881,51 @@ int gw_MySQLListener( // bind address and port if (bind(l_so, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { fprintf(stderr, - ">>> Bind failed !!! %i, [%s]\n", + "\n* Bind failed due error %i, %s.\n", errno, strerror(errno)); - fprintf(stderr, ">>> can't bind to address and port"); + fprintf(stderr, "* Can't bind to %s\n\n", + bind_address_and_port); return 0; } + /* fprintf(stderr, ">> GATEWAY bind is: %s:%i. FD is %i\n", address, port, l_so); + */ + + rc = listen(l_so, 10 * SOMAXCONN); - listen(l_so, 10 * SOMAXCONN); + if (rc == 0) { + fprintf(stderr, + "Listening MySQL connections at %s\n", + bind_address_and_port); + } else { + int eno = errno; + errno = 0; + fprintf(stderr, + "\n* Failed to start listening MySQL due error %d, %s\n\n", + eno, + strerror(eno)); + return 0; + } + /* fprintf(stderr, ">> GATEWAY listen backlog queue is %i\n", 10 * SOMAXCONN); + */ // assign l_so to dcb listen_dcb->fd = l_so; // add listening socket to poll structure if (poll_add_dcb(listen_dcb) == -1) { - fprintf(stderr, - ">>> poll_add_dcb: can't add the listen_sock! Errno " - "%i, %s\n", - errno, - strerror(errno)); + fprintf(stderr, + "\n* Failed to start polling the socket due error " + "%i, %s.\n\n", + errno, + strerror(errno)); return 0; } #if defined(SS_DEBUG) @@ -943,7 +964,6 @@ int gw_MySQLAccept(DCB *listener) int i = 0; CHK_DCB(listener); - fprintf(stderr, "MySQL Listener socket is: %i\n", listener->fd); while (1) { @@ -1026,13 +1046,14 @@ int gw_MySQLAccept(DCB *listener) c_sock); conn_open[c_sock] = true; #endif + /* fprintf(stderr, "Processing %i connection fd %i for listener %i\n", listener->stats.n_accepts, c_sock, listener->fd); - // set nonblocking - + */ + /* set nonblocking */ setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen); setnonblocking(c_sock); @@ -1065,37 +1086,37 @@ int gw_MySQLAccept(DCB *listener) // client protocol state change protocol->state = MYSQL_AUTH_SENT; - /** - * Set new descriptor to event set. At the same time, - * change state to DCB_STATE_POLLING so that - * thread which wakes up sees correct state. - */ - if (poll_add_dcb(client_dcb) == -1) - { - /** delete client_dcb */ - dcb_close(client_dcb); + /** + * Set new descriptor to event set. At the same time, + * change state to DCB_STATE_POLLING so that + * thread which wakes up sees correct state. + */ + if (poll_add_dcb(client_dcb) == -1) + { + /** delete client_dcb */ + dcb_close(client_dcb); - /** Previous state is recovered in poll_add_dcb. */ - skygw_log_write_flush( - LOGFILE_ERROR, - "%lu [gw_MySQLAccept] Failed to add dcb %p for " - "fd %d to epoll set.", - pthread_self(), - client_dcb, - client_dcb->fd); - rc = 1; - goto return_rc; - } - else - { - skygw_log_write( - LOGFILE_TRACE, - "%lu [gw_MySQLAccept] Added dcb %p for fd " - "%d to epoll set.", - pthread_self(), - client_dcb, - client_dcb->fd); - } + /** Previous state is recovered in poll_add_dcb. */ + skygw_log_write_flush( + LOGFILE_ERROR, + "%lu [gw_MySQLAccept] Failed to add dcb %p for " + "fd %d to epoll set.", + pthread_self(), + client_dcb, + client_dcb->fd); + rc = 1; + goto return_rc; + } + else + { + skygw_log_write( + LOGFILE_TRACE, + "%lu [gw_MySQLAccept] Added dcb %p for fd " + "%d to epoll set.", + pthread_self(), + client_dcb, + client_dcb->fd); + } } /**< while 1 */ #if defined(SS_DEBUG) if (rc == 0) { @@ -1103,9 +1124,9 @@ int gw_MySQLAccept(DCB *listener) CHK_PROTOCOL(((MySQLProtocol *)client_dcb->protocol)); } #endif - return_rc: - return rc; - } +return_rc: + return rc; +} /* */ diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index adb72381b..04b762621 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -336,7 +336,12 @@ bool gw_receive_backend_auth( * @param passwd The SHA1(real_password): Note real_password is unknown * @return 0 on success, 1 on failure */ -int gw_send_authentication_to_backend(char *dbname, char *user, uint8_t *passwd, MySQLProtocol *conn) { +int gw_send_authentication_to_backend( + char *dbname, + char *user, + uint8_t *passwd, + MySQLProtocol *conn) +{ int compress = 0; int rv; uint8_t *payload = NULL; diff --git a/server/modules/protocol/telnetd.c b/server/modules/protocol/telnetd.c index 94c42b3cc..da05cc1ae 100644 --- a/server/modules/protocol/telnetd.c +++ b/server/modules/protocol/telnetd.c @@ -353,6 +353,7 @@ struct sockaddr_in addr; char *port; int one = 1; short pnum; +int rc; memcpy(&listener->func, &MyObject, sizeof(GWPROTOCOL)); @@ -381,8 +382,24 @@ short pnum; { return 0; } - listen(listener->fd, SOMAXCONN); + rc = listen(listener->fd, SOMAXCONN); + + if (rc == 0) { + fprintf(stderr, + "Listening telnet connections at %s\n", + config); + } else { + int eno = errno; + errno = 0; + fprintf(stderr, + "\n* Failed to start listening telnet due error %d, %s\n\n", + eno, + strerror(eno)); + return 0; + } + + if (poll_add_dcb(listener) == -1) { return 0; diff --git a/server/modules/routing/readconnroute.c b/server/modules/routing/readconnroute.c index ef882bd07..502b7efae 100644 --- a/server/modules/routing/readconnroute.c +++ b/server/modules/routing/readconnroute.c @@ -233,8 +233,8 @@ int i, n; else { skygw_log_write(LOGFILE_ERROR, - "Unsupported router option %s for " - "readconnroute\n", + "Warning : Unsupported router " + "option %s for readconnroute.", options[i]); } } @@ -349,10 +349,9 @@ int i; if (!candidate) { skygw_log_write_flush( LOGFILE_ERROR, - "%lu [newSession] Failed to create new routing session. " + "Error : Failed to create new routing session. " "Couldn't find eligible candidate server. Freeing " - "allocated resources.", - pthread_self()); + "allocated resources."); free(client_ses); return NULL; } @@ -382,10 +381,9 @@ int i; atomic_add(&candidate->current_connection_count, -1); skygw_log_write( LOGFILE_ERROR, - "%lu [newSession] Failed to create new routing session. " + "Error : Failed to create new routing session. " "Couldn't establish connection to candidate server " "listening to port %d. Freeing allocated resources.", - pthread_self(), candidate->server->port); free(client_ses); return NULL; diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index b54924963..05ef5f0cf 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -18,10 +18,11 @@ #include #include #include +#include + #include #include -#include #include #include #include @@ -58,20 +59,29 @@ static void closeSession(ROUTER *instance, void *session); static void freeSession(ROUTER *instance, void *session); static int routeQuery(ROUTER *instance, void *session, GWBUF *queue); static void diagnostic(ROUTER *instance, DCB *dcb); -static void clientReply(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb); +static void clientReply( + ROUTER* instance, + void* router_session, + GWBUF* queue, + DCB* backend_dcb); +static bool search_backend_servers( + BACKEND** p_master, + BACKEND** p_slave, + ROUTER_INSTANCE* router); static ROUTER_OBJECT MyObject = { - createInstance, - newSession, - closeSession, - freeSession, - routeQuery, - diagnostic, - clientReply + createInstance, + newSession, + closeSession, + freeSession, + routeQuery, + diagnostic, + clientReply }; + static SPINLOCK instlock; -static INSTANCE* instances; +static ROUTER_INSTANCE* instances; /** * Implementation of the mandatory version entry point @@ -106,8 +116,9 @@ ModuleInit() * @return The module object */ ROUTER_OBJECT* GetModuleObject() { - skygw_log_write(LOGFILE_TRACE, - "Returning readwritesplit router module object."); + skygw_log_write( + LOGFILE_TRACE, + "Returning readwritesplit router module object."); return &MyObject; } @@ -131,67 +142,117 @@ static ROUTER* createInstance( SERVICE* service, char** options) { - INSTANCE* inst; - SERVER* server; - int n; - int i; + ROUTER_INSTANCE* router; + SERVER* server; + int n; + int i; - if ((inst = calloc(1, sizeof(INSTANCE))) == NULL) { + if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { return NULL; } - inst->service = service; - spinlock_init(&inst->lock); - inst->connections = NULL; + router->service = service; + spinlock_init(&router->lock); + router->connections = NULL; /** Calculate number of servers */ - for (server = service->databases, n = 0; server; server = server->nextdb) { + server = service->databases; + + for (n=0; server != NULL; server=server->nextdb) { n++; } - inst->servers = (BACKEND **)calloc(n + 1, sizeof(BACKEND *)); - if (!inst->servers) { - free(inst); + router->servers = (BACKEND **)calloc(n + 1, sizeof(BACKEND *)); + + if (router->servers == NULL) + { + free(router); return NULL; } - if (options) + if (options != NULL) { - skygw_log_write_flush(LOGFILE_MESSAGE, - "Router options supplied to read/write split router module but none are supported. The options will be ignored.\n"); + skygw_log_write_flush( + LOGFILE_MESSAGE, + "Router options supplied to read/write split router " + "module but none are supported. The options will be " + "ignored."); } /** - * We need an array of the backend servers in the instance structure so - * that we can maintain a count of the number of connections to each + * Create an array of the backend servers in the router structure to + * maintain a count of the number of connections to each * backend server. */ - for (server = service->databases, n = 0; server; server = server->nextdb) { - - if ((inst->servers[n] = malloc(sizeof(BACKEND))) == NULL) { - for (i = 0; i < n; i++) { - free(inst->servers[i]); + server = service->databases; + n = 0; + while (server != NULL) { + if ((router->servers[n] = malloc(sizeof(BACKEND))) == NULL) + { + for (i = 0; i < n; i++) { + free(router->servers[i]); + } + free(router->servers); + free(router); + return NULL; } - free(inst->servers); - free(inst); - return NULL; - } - inst->servers[n]->server = server; - inst->servers[n]->count = 0; - n++; - } - inst->servers[n] = NULL; + router->servers[n]->backend_server = server; + router->servers[n]->backend_conn_count = 0; + n += 1; + server = server->nextdb; + } + router->servers[n] = NULL; /** - * We have completed the creation of the instance data, so now - * insert this router instance into the linked list of routers + * vraa : is this necessary for readwritesplit ? + * Option : where can a read go? + * - master (only) + * - slave (only) + * - joined (to both) + * + * Process the options + */ + router->bitmask = 0; + router->bitvalue = 0; + if (options) + { + for (i = 0; options[i]; i++) + { + if (!strcasecmp(options[i], "master")) + { + router->bitmask |= (SERVER_MASTER|SERVER_SLAVE); + router->bitvalue |= SERVER_MASTER; + } + else if (!strcasecmp(options[i], "slave")) + { + router->bitmask |= (SERVER_MASTER|SERVER_SLAVE); + router->bitvalue |= SERVER_SLAVE; + } + else if (!strcasecmp(options[i], "joined")) + { + router->bitmask |= (SERVER_JOINED); + router->bitvalue |= SERVER_JOINED; + } + else + { + skygw_log_write_flush( + LOGFILE_ERROR, + "Warning : Unsupported router option %s " + "for readwritesplitrouter.", + options[i]); + } + } + } + /** + * We have completed the creation of the router data, so now + * insert this router into the linked list of routers * that have been created with this module. */ spinlock_acquire(&instlock); - inst->next = instances; - instances = inst; + router->next = instances; + instances = router; spinlock_release(&instlock); - return (ROUTER *)inst; + return (ROUTER *)router; } /** @@ -205,15 +266,19 @@ static ROUTER* createInstance( * @return Session specific data for this session */ static void* newSession( - ROUTER* instance, + ROUTER* router_inst, SESSION* session) { - BACKEND* candidate = NULL; - CLIENT_SESSION* client; - INSTANCE* inst = (INSTANCE *)instance; - int i; - - if ((client = (CLIENT_SESSION *)malloc(sizeof(CLIENT_SESSION))) == NULL) + BACKEND* be_slave = NULL; + BACKEND* be_master = NULL; + ROUTER_CLIENT_SES* client_rses; + ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst; + bool succp; + + client_rses = + (ROUTER_CLIENT_SES *)malloc(sizeof(ROUTER_CLIENT_SES)); + + if (client_rses == NULL) { return NULL; } @@ -223,101 +288,58 @@ static void* newSession( * load balancing algorithm we need to implement for this simple * connection router. */ - for (i = 0; inst->servers[i]; i++) - { - - if (inst->servers[i] && SERVER_IS_SLAVE(inst->servers[i]->server)) - { - candidate = inst->servers[i]; - break; - } + succp = search_backend_servers(&be_master, &be_slave, router); + + /** Both Master and Slave must be found */ + if (!succp) { + free(client_rses); + return NULL; } - - /** - * Loop over all the servers and find any that have fewer connections - * than our candidate server. - * - * If a server has less connections than the current candidate we mark this - * as the new candidate to connect to. - * - * If a server has the same number of connections currently as the candidate - * and has had less connections over time than the candidate it will also - * become the new candidate. This has the effect of spreading the connections - * over different servers during periods of very low load. - */ - for (i = 0; inst->servers[i]; i++) { - - if (inst->servers[i] - && SERVER_IS_RUNNING(inst->servers[i]->server)) - { - if (SERVER_IS_SLAVE(inst->servers[i]->server)) - { - if (inst->servers[i]->count < candidate->count) { - candidate = inst->servers[i]; - } else if (inst->servers[i]->count == candidate->count && - inst->servers[i]->server->stats.n_connections - < candidate->server->stats.n_connections) - { - candidate = inst->servers[i]; - } - } else if (SERVER_IS_MASTER(inst->servers[i]->server)) { - /** master is found */ - inst->master = inst->servers[i]; - } - } - } /* for */ - - if (candidate == NULL) - { - - skygw_log_write_flush(LOGFILE_MESSAGE, - "No suitable servers found for connection."); - free(client); - return NULL; - } - - if (inst->master == NULL) { - inst->master = inst->servers[i-1]; - } - /** - * We now have a master and a slave server with the least connections. - * Bump the connection counts for these servers. - */ - atomic_add(&candidate->count, 1); - client->slave = candidate; - atomic_add(&inst->master->count, 1); - client->master = inst->master; - ss_dassert(client->master->server != candidate->server); - /** * Open the slave connection. */ - if ((client->slaveconn = dcb_connect(candidate->server, session, - candidate->server->protocol)) == NULL) - { - atomic_add(&candidate->count, -1); - free(client); + client_rses->slave_dcb = dcb_connect(be_slave->backend_server, + session, + be_slave->backend_server->protocol); + + if (client_rses->slave_dcb == NULL) { + free(client_rses); return NULL; } /** * Open the master connection. */ - if ((client->masterconn = dcb_connect(client->master->server, session, - client->master->server->protocol)) == NULL) + client_rses->master_dcb = dcb_connect(be_master->backend_server, + session, + be_master->backend_server->protocol); + + if (client_rses->master_dcb == NULL) { - atomic_add(&client->master->count, -1); - free(client); + /** Close slave connection first. */ + client_rses->slave_dcb->func.close(client_rses->slave_dcb); + free(client_rses); return NULL; } - inst->stats.n_sessions += 1; + /** + * We now have a master and a slave server with the least connections. + * Bump the connection counts for these servers. + */ + atomic_add(&be_slave->backend_conn_count, 1); + atomic_add(&be_master->backend_conn_count, 1); + + client_rses->be_slave = be_slave; + client_rses->be_master = be_master; + router->stats.n_sessions += 1; - /* Add this session to end of the list of active sessions */ - spinlock_acquire(&inst->lock); - client->next = inst->connections; - inst->connections = client; - spinlock_release(&inst->lock); + /** + * Add this session to end of the list of active sessions in router. + */ + spinlock_acquire(&router->lock); + client_rses->next = router->connections; + router->connections = client_rses; + spinlock_release(&router->lock); - return (void *)client; + return (void *)client_rses; } /** @@ -331,47 +353,88 @@ static void closeSession( ROUTER* instance, void* router_session) { - INSTANCE* inst = (INSTANCE *)instance; - CLIENT_SESSION* session = (CLIENT_SESSION *)router_session; - +#if 0 + ROUTER_INSTANCE* router; +#endif + ROUTER_CLIENT_SES* rsession; + + rsession = (ROUTER_CLIENT_SES *)router_session; +#if 0 + router = (ROUTER_INSTANCE *)instance; + atomic_add(&rsession->be_slave->backend_conn_count, -1); + atomic_add(&rsession->be_master->backend_conn_count, -1); + atomic_add(&rsession->be_slave->backend_server->stats.n_current, -1); + atomic_add(&rsession->be_master->backend_server->stats.n_current, -1); +#endif /** * Close the connection to the backend servers */ - session->slaveconn->func.close(session->slaveconn); - session->masterconn->func.close(session->masterconn); - atomic_add(&session->slave->count, -1); - atomic_add(&session->master->count, -1); - atomic_add(&session->slave->server->stats.n_current, -1); - atomic_add(&session->master->server->stats.n_current, -1); - - spinlock_acquire(&inst->lock); - if (inst->connections == session) { - inst->connections = session->next; + rsession->slave_dcb->func.close(rsession->slave_dcb); + rsession->master_dcb->func.close(rsession->master_dcb); +#if 0 + spinlock_acquire(&router->lock); + if (router->connections == rsession) { + router->connections = rsession->next; } else { - CLIENT_SESSION* ptr = inst->connections; + ROUTER_CLIENT_SES* ptr = router->connections; - while (ptr && ptr->next != session) { + while (ptr && ptr->next != rsession) { ptr = ptr->next; } if (ptr) { - ptr->next = session->next; + ptr->next = rsession->next; } } - spinlock_release(&inst->lock); + spinlock_release(&router->lock); /* * We are no longer in the linked list, free * all the memory and other resources associated * to the client session. */ - free(session); + free(rsession); +#endif } static void freeSession( ROUTER* router_instance, void* router_client_session) { + ROUTER_CLIENT_SES* rsession; + ROUTER_INSTANCE* router; + + rsession = (ROUTER_CLIENT_SES *)router_client_session; + router = (ROUTER_INSTANCE *)router_instance; + + atomic_add(&rsession->be_slave->backend_conn_count, -1); + atomic_add(&rsession->be_master->backend_conn_count, -1); + atomic_add(&rsession->be_slave->backend_server->stats.n_current, -1); + atomic_add(&rsession->be_master->backend_server->stats.n_current, -1); + + spinlock_acquire(&router->lock); + + if (router->connections == rsession) { + router->connections = rsession->next; + } else { + ROUTER_CLIENT_SES* ptr = router->connections; + + while (ptr && ptr->next != rsession) { + ptr = ptr->next; + } + + if (ptr) { + ptr->next = rsession->next; + } + } + spinlock_release(&router->lock); + + /* + * We are no longer in the linked list, free + * all the memory and other resources associated + * to the client session. + */ + free(rsession); return; } @@ -405,8 +468,8 @@ static int routeQuery( int ret = 0; GWBUF *cq = NULL; - INSTANCE* inst = (INSTANCE *)instance; - CLIENT_SESSION* session = (CLIENT_SESSION *)router_session; + ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; + ROUTER_CLIENT_SES* rsession = (ROUTER_CLIENT_SES *)router_session; inst->stats.n_queries++; packet = GWBUF_DATA(queue); @@ -459,32 +522,32 @@ static int routeQuery( #endif switch (qtype) { - case QUERY_TYPE_WRITE: + case QUERY_TYPE_WRITE: #if defined(SS_DEBUG_) skygw_log_write(NULL, LOGFILE_TRACE, "Query type\t%s, routing to Master.", STRQTYPE(qtype)); #endif - ret = session->masterconn->func.write(session->masterconn, queue); + ret = rsession->master_dcb->func.write(rsession->master_dcb, queue); atomic_add(&inst->stats.n_master, 1); goto return_ret; break; - - case QUERY_TYPE_READ: + + case QUERY_TYPE_READ: #if defined(SS_DEBUG_) skygw_log_write(NULL, LOGFILE_TRACE, "Query type\t%s, routing to Slave.", STRQTYPE(qtype)); #endif - ret = session->slaveconn->func.write(session->slaveconn, queue); + ret = rsession->slave_dcb->func.write(rsession->slave_dcb, queue); atomic_add(&inst->stats.n_slave, 1); goto return_ret; break; - - case QUERY_TYPE_SESSION_WRITE: + + case QUERY_TYPE_SESSION_WRITE: #if defined(SS_DEBUG_) skygw_log_write(NULL, LOGFILE_TRACE, @@ -499,25 +562,29 @@ static int routeQuery( cq = gwbuf_clone(queue); switch(packet_type) { - case COM_QUIT: - ret = session->masterconn->func.write(session->masterconn, queue); - session->slaveconn->func.write(session->slaveconn, cq); - break; - case COM_CHANGE_USER: - session->masterconn->func.auth(session->masterconn, NULL, session->masterconn->session, queue); - session->slaveconn->func.auth(session->slaveconn, NULL, session->masterconn->session, cq); - break; - default: - ret = session->masterconn->func.session(session->masterconn, (void *)queue); - session->slaveconn->func.session(session->slaveconn, (void *)cq); - break; + case COM_QUIT: + ret = rsession->master_dcb->func.write(rsession->master_dcb, queue); + rsession->slave_dcb->func.write(rsession->slave_dcb, cq); + break; + case COM_CHANGE_USER: + rsession->master_dcb->func.auth(rsession->master_dcb, NULL, rsession->master_dcb->session, queue); + rsession->slave_dcb->func.auth(rsession->slave_dcb, NULL, rsession->master_dcb->session, cq); + break; + default: + ret = rsession->master_dcb->func.session( + rsession->master_dcb, + (void *)queue); + rsession->slave_dcb->func.session( + rsession->slave_dcb, + (void *)cq); + break; } atomic_add(&inst->stats.n_all, 1); goto return_ret; break; - default: + default: #if defined(SS_DEBUG_) skygw_log_write(NULL, LOGFILE_TRACE, @@ -525,7 +592,7 @@ static int routeQuery( STRQTYPE(qtype)); #endif /** Is this really ok? */ - ret = session->masterconn->func.write(session->masterconn, queue); + ret = rsession->master_dcb->func.write(rsession->master_dcb, queue); atomic_add(&inst->stats.n_master, 1); goto return_ret; break; @@ -547,26 +614,37 @@ return_ret: static void diagnostic(ROUTER *instance, DCB *dcb) { -CLIENT_SESSION *session; -INSTANCE *inst = (INSTANCE *)instance; -int i = 0; +ROUTER_CLIENT_SES *rsession; +ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; +int i = 0; - spinlock_acquire(&inst->lock); - session = inst->connections; - while (session) + spinlock_acquire(&router->lock); + rsession = router->connections; + while (rsession) { i++; - session = session->next; + rsession = rsession->next; } - spinlock_release(&inst->lock); + spinlock_release(&router->lock); - dcb_printf(dcb, "\tNumber of router sessions: %d\n", inst->stats.n_sessions); - dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n", i); - dcb_printf(dcb, "\tNumber of queries forwarded: %d\n", inst->stats.n_queries); - dcb_printf(dcb, "\tNumber of queries forwarded to master: %d\n", inst->stats.n_master); - dcb_printf(dcb, "\tNumber of queries forwarded to slave: %d\n", inst->stats.n_slave); - dcb_printf(dcb, "\tNumber of queries forwarded to all: %d\n", inst->stats.n_all); - + dcb_printf(dcb, + "\tNumber of router sessions: %d\n", + router->stats.n_sessions); + dcb_printf(dcb, + "\tCurrent no. of router sessions: %d\n", + i); + dcb_printf(dcb, + "\tNumber of queries forwarded: %d\n", + router->stats.n_queries); + dcb_printf(dcb, + "\tNumber of queries forwarded to master: %d\n", + router->stats.n_master); + dcb_printf(dcb, + "\tNumber of queries forwarded to slave: %d\n", + router->stats.n_slave); + dcb_printf(dcb, + "\tNumber of queries forwarded to all: %d\n", + router->stats.n_all); } /** @@ -579,30 +657,186 @@ int i = 0; * @param backend_dcb The backend DCB * @param queue The GWBUF with reply data */ -static void -clientReply(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb) +static void clientReply( + ROUTER* instance, + void* router_session, + GWBUF* queue, + DCB* backend_dcb) { -INSTANCE* inst = NULL; -DCB *master = NULL; -DCB *client = NULL; -CLIENT_SESSION* session = NULL; - - inst = (INSTANCE *)instance; - session = (CLIENT_SESSION *)router_session; - master = session->masterconn; - client = backend_dcb->session->client; + DCB* master_dcb; + DCB* client_dcb; + ROUTER_CLIENT_SES* rsession; + + rsession = (ROUTER_CLIENT_SES *)router_session; + master_dcb = rsession->master_dcb; + client_dcb = backend_dcb->session->client; if (backend_dcb->command == ROUTER_CHANGE_SESSION) { /* if backend_dcb is the master we can reply to the client */ - if (backend_dcb == master) { - master->session->client->func.write(master->session->client, queue); + if (backend_dcb == master_dcb) { + client_dcb->func.write(client_dcb, queue); } else { /* just consume the gwbuf without writing to the client */ gwbuf_consume(queue, gwbuf_length(queue)); } } else { /* normal flow */ - client->func.write(client, queue); + client_dcb->func.write(client_dcb, queue); } } -/// + +/** + * @node Search suitable backend server from those of router instance. + * + * Parameters: + * @param p_master - in, use, out + * Pointer to location where master's address is to be stored. + * If NULL, then master is not searched. + * + * @param p_slave - in, use, out + * Pointer to location where slave's address is to be stored. + * if NULL, then slave is not searched. + * + * @param inst - in, use + * Pointer to router instance + * + * @return true, if all what what requested found, false if the request + * was not satisfied or was partially satisfied. + * + * + * @details It is assumed that there is only one master among servers of + * a router instance. As a result, thr first master is always chosen. + */ +static bool search_backend_servers( + BACKEND** p_master, + BACKEND** p_slave, + ROUTER_INSTANCE* router) +{ + BACKEND* be_master = NULL; + BACKEND* be_slave = NULL; + int i; + bool succp = true; + + /* + * Loop over all the servers and find any that have fewer connections + * than current candidate server. + * + * If a server has less connections than the current candidate it is + * chosen to a new candidate. + * + * If a server has the same number of connections currently as the + * candidate and has had less connections over time than the candidate + * it will also become the new candidate. This has the effect of + * spreading the connections over different servers during periods of + * very low load. + * + * If master is searched for, the first master found is chosen. + */ + for (i = 0; router->servers[i] != NULL; i++) { + BACKEND* be = router->servers[i]; + + if (be != NULL) { + skygw_log_write( + LOGFILE_TRACE, + "%lu [newSession] Examine server %s:%d with " + "%d connections. Status is %d, " + "router->bitvalue is %d", + pthread_self(), + be->backend_server->name, + be->backend_server->port, + be->backend_conn_count, + be->backend_server->status, + router->bitmask); + } + + if (be != NULL && + SERVER_IS_RUNNING(be->backend_server) && + (be->backend_server->status & router->bitmask) == + router->bitvalue) + { + if (SERVER_IS_SLAVE(be->backend_server) && + p_slave != NULL) + { + /** + * If no candidate set, set first running + * server as an initial candidate server. + */ + if (be_slave == NULL) + { + be_slave = be; + } + else if (be->backend_conn_count < + be_slave->backend_conn_count) + { + /** + * This running server has fewer + * connections, set it as a new + * candidate. + */ + be_slave = be; + } + else if (be->backend_conn_count == + be_slave->backend_conn_count && + be->backend_server->stats.n_connections < + be_slave->backend_server->stats.n_connections) + { + /** + * This running server has the same + * number of connections currently + * as the candidate but has had + * fewer connections over time + * than candidate, set this server + * to candidate. + */ + be_slave = be; + } + } + else if (p_master != NULL && + be_master == NULL && + SERVER_IS_MASTER(be->backend_server)) + { + be_master = be; + } + } + } + + if (p_slave != NULL && be_slave == NULL) { + succp = false; + skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Couldn't find suitable Slave from %d candidates.", + i); + } + + if (p_master != NULL && be_master == NULL) { + succp = false; + skygw_log_write_flush( + LOGFILE_ERROR, + "Error : Couldn't find suitable Master from %d candidates.", + i); + } + + if (be_slave != NULL) { + *p_slave = be_slave; + skygw_log_write( + LOGFILE_TRACE, + "%lu [readwritesplit:newSession] Selected Slave %s:%d " + "from %d candidates.", + pthread_self(), + be_slave->backend_server->name, + be_slave->backend_server->port, + i); + } + if (be_master != NULL) { + *p_master = be_master; + skygw_log_write( + LOGFILE_TRACE, + "%lu [readwritesplit:newSession] Selected Master %s:%d " + "from %d candidates.", + pthread_self(), + be_master->backend_server->name, + be_master->backend_server->port, + i); + } + return succp; +} diff --git a/server/modules/routing/testroute.c b/server/modules/routing/testroute.c index e4e9fb9e2..d31ccac92 100644 --- a/server/modules/routing/testroute.c +++ b/server/modules/routing/testroute.c @@ -56,7 +56,7 @@ version() void ModuleInit() { - fprintf(stderr, "Initial test router module.\n"); + } /** @@ -70,7 +70,6 @@ ModuleInit() ROUTER_OBJECT * GetModuleObject() { - fprintf(stderr, "Returning test router module object.\n"); return &MyObject; }