log_manager.cc :

tuned error printing and log writing output format
dcb.c : 
	dcb_connect, check return value of poll_add_dcb and behave accordingly.
	dcb_write, in case of SIFPIPE, only write to trace log.
	dcb_close, dassert with incorrect dcb states.
gateway.c :
	added file_write_header to print header similar than in logs to stderr.
	main, add signal handler for SIGPIPE
poll.c : 
	poll_remove_dcb, don't fail if dcb is in NOPOLLING or in ZOMBIE states.
	poll_waitevents, write EPOLLHUPs to trace log, don't even attempt to write to closed socket.
readconnection.h : 
	shortened comment.
readwritesplit.h : 
	replaced generic names with more specific ones. 
httpd.c : 
	Check listen return value and behave accordingly.
mysql_backend.c : 
	 Tiny clean up.
mysql_client.c : 
	gw_MySQLListener, Check listen return value and behave accordingly. 
mysql_common.c : 
	Shortened a header.
telnetd.c : 
	telnetd_listen, check listen return value and behave accordingly.
readconnroute.c : 
	Tuned log writing format.
readwritesplit.c : 
	Added function search_backend_servers, which chooses suitable backend and master server among those known by Maxscale. Fixed clean-up routines. Not ready yet but works somehow.
testroute.c : 
	Cleanup.
skygw_utils.cc : 
	Log writing clean up.
This commit is contained in:
vraatikka
2013-10-04 12:06:44 +03:00
parent 9f2f0ac006
commit 849a366e95
14 changed files with 955 additions and 533 deletions

View File

@ -328,8 +328,6 @@ bool skygw_logmanager_init(
{ {
bool succp = false; bool succp = false;
ss_dfprintf(stderr, ">> skygw_logmanager_init\n");
acquire_lock(&lmlock); acquire_lock(&lmlock);
if (lm != NULL) { if (lm != NULL) {
@ -342,7 +340,6 @@ bool skygw_logmanager_init(
return_succp: return_succp:
release_lock(&lmlock); release_lock(&lmlock);
ss_dfprintf(stderr, "<< skygw_logmanager_init\n");
return succp; return succp;
} }
@ -422,8 +419,6 @@ void skygw_logmanager_exit(void)
*/ */
void skygw_logmanager_done(void) void skygw_logmanager_done(void)
{ {
ss_dfprintf(stderr, ">> skygw_logmanager_done\n");
acquire_lock(&lmlock); acquire_lock(&lmlock);
if (lm == NULL) { if (lm == NULL) {
@ -452,8 +447,6 @@ void skygw_logmanager_done(void)
return_void: return_void:
release_lock(&lmlock); release_lock(&lmlock);
ss_dfprintf(stderr, "<< skygw_logmanager_done\n");
} }
static logfile_t* logmanager_get_logfile( static logfile_t* logmanager_get_logfile(
@ -1319,11 +1312,24 @@ static bool fnames_conf_init(
fn->fn_logpath = (fn->fn_logpath == NULL) ? fn->fn_logpath = (fn->fn_logpath == NULL) ?
strdup(get_logpath_default()) : fn->fn_logpath; strdup(get_logpath_default()) : fn->fn_logpath;
ss_dfprintf(stderr, "Command line : "); /* ss_dfprintf(stderr, "\n\n\tCommand line : ");
for (i=0; i<argc; i++) { for (i=0; i<argc; i++) {
ss_dfprintf(stderr, "%s ", argv[i]); ss_dfprintf(stderr, "%s ", argv[i]);
} }
ss_dfprintf(stderr, "\n"); ss_dfprintf(stderr, "\n");*/
fprintf(stderr,
"Log directory :\t%s\n"
"Error log :\t%s1%s\n"
"Message log :\t%s1%s\n"
"Trace log :\t%s1%s\n\n",
fn->fn_logpath,
fn->fn_err_prefix,
fn->fn_err_suffix,
fn->fn_msg_prefix,
fn->fn_msg_suffix,
fn->fn_trace_prefix,
fn->fn_trace_suffix);
succp = true; succp = true;
fn->fn_state = RUN; fn->fn_state = RUN;

View File

@ -426,6 +426,7 @@ dcb_connect(SERVER *server, SESSION *session, const char *protocol)
DCB *dcb; DCB *dcb;
GWPROTOCOL *funcs; GWPROTOCOL *funcs;
int fd; int fd;
int rc;
if ((dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER)) == NULL) if ((dcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER)) == NULL)
{ {
@ -483,7 +484,7 @@ int fd;
session->client, session->client,
session->client->fd); session->client->fd);
} }
ss_dassert(dcb->fd == -1); ss_dassert(dcb->fd == -1); /**< must be uninitialized at this point */
/** /**
* Successfully connected to backend. Assign file descriptor to dcb * Successfully connected to backend. Assign file descriptor to dcb
*/ */
@ -505,7 +506,13 @@ int fd;
/** /**
* Add the dcb in the poll set * Add the dcb in the poll set
*/ */
poll_add_dcb(dcb); rc = poll_add_dcb(dcb);
if (rc == -1) {
dcb_set_state(dcb, DCB_STATE_DISCONNECTED, NULL);
dcb_final_free(dcb);
return NULL;
}
return dcb; return dcb;
} }
@ -518,7 +525,8 @@ int fd;
* *
* @param dcb The DCB to read from * @param dcb The DCB to read from
* @param head Pointer to linked list to append data to * @param head Pointer to linked list to append data to
* @return -1 on error, otherwise the number of read bytes on the last. 0 is returned if no data available. * @return -1 on error, otherwise the number of read bytes on the last.
* 0 is returned if no data available.
* iteration of while loop. * iteration of while loop.
*/ */
int int
@ -694,15 +702,30 @@ int w, saved_errno = 0;
if (w < 0) if (w < 0)
{ {
skygw_log_write_flush( if (saved_errno == EPIPE) {
LOGFILE_ERROR, skygw_log_write(
"Error : Write to dcb %p in " LOGFILE_TRACE,
"state %s fd %d failed due errno %d, %s", "%lu [dcb_write] Write to dcb "
"%p in state %s fd %d failed "
"due errno %d, %s",
pthread_self(),
dcb, dcb,
STRDCBSTATE(dcb->state), STRDCBSTATE(dcb->state),
dcb->fd, dcb->fd,
saved_errno, saved_errno,
strerror(saved_errno)); strerror(saved_errno));
} else {
skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Write to dcb %p in "
"state %s fd %d failed due "
"errno %d, %s",
dcb,
STRDCBSTATE(dcb->state),
dcb->fd,
saved_errno,
strerror(saved_errno));
}
break; break;
} }
@ -836,11 +859,18 @@ dcb_close(DCB *dcb)
int rc; int rc;
CHK_DCB(dcb); CHK_DCB(dcb);
ss_dassert(dcb->state == DCB_STATE_POLLING ||
dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE);
/** /**
* Stop dcb's listening and modify state accordingly. * Stop dcb's listening and modify state accordingly.
*/ */
rc = poll_remove_dcb(dcb); rc = poll_remove_dcb(dcb);
ss_dassert(dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE);
if (rc == 0) { if (rc == 0) {
skygw_log_write( skygw_log_write(
LOGFILE_TRACE, LOGFILE_TRACE,
@ -850,13 +880,15 @@ dcb_close(DCB *dcb)
dcb, dcb,
STRDCBSTATE(dcb->state)); STRDCBSTATE(dcb->state));
} else { } else {
skygw_log_write_flush( skygw_log_write(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Removing dcb %p in state %s from " "%lu [dcb_close] Error : Removing dcb %p in state %s from "
"poll set failed.", "poll set failed.",
pthread_self(),
dcb, dcb,
STRDCBSTATE(dcb->state)); STRDCBSTATE(dcb->state));
} }
if (dcb->state == DCB_STATE_NOPOLLING) { if (dcb->state == DCB_STATE_NOPOLLING) {
dcb_add_to_zombieslist(dcb); dcb_add_to_zombieslist(dcb);
} }

View File

@ -97,7 +97,7 @@ static bool libmysqld_started = FALSE;
static void log_flush_shutdown(void); static void log_flush_shutdown(void);
static void log_flush_cb(void* arg); static void log_flush_cb(void* arg);
static void libmysqld_done(void); static void libmysqld_done(void);
static bool file_write_header(FILE* outfile);
/** /**
* Handler for SIGHUP signal. Reload the configuration for the * Handler for SIGHUP signal. Reload the configuration for the
* gateway. * gateway.
@ -126,9 +126,10 @@ sigint_handler (int i)
skygw_log_write_flush( skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Signal SIGINT %i received ...Exiting!", i); "Error : Signal SIGINT %i received ...Exiting!",
i);
shutdown_gateway(); shutdown_gateway();
fprintf(stderr, "Shuting down MaxScale\n"); fprintf(stderr, "\n\nShutting down MaxScale\n\n");
} }
/* wrapper for sigaction */ /* wrapper for sigaction */
@ -210,6 +211,60 @@ return_home:
} }
#endif #endif
static bool file_write_header(
FILE* outfile)
{
bool succp = false;
size_t wbytes1;
size_t wbytes2;
size_t wbytes3;
size_t len1;
size_t len2;
size_t len3;
const char* header_buf1;
char* header_buf2 = NULL;
const char* header_buf3;
time_t* t;
struct tm* tm;
t = (time_t *)malloc(sizeof(time_t));
tm = (struct tm *)malloc(sizeof(struct tm));
*t = time(NULL);
*tm = *localtime(t);
header_buf1 = "\n\nSkySQL MaxScale\t";
header_buf2 = strdup(asctime(tm));
header_buf3 = "------------------------------------------------------\n";
if (header_buf2 == NULL) {
goto return_succp;
}
len1 = strlen(header_buf1);
len2 = strlen(header_buf2);
len3 = strlen(header_buf3);
#if defined(LAPTOP_TEST)
usleep(DISKWRITE_LATENCY);
#else
wbytes1=fwrite((void*)header_buf1, len1, 1, outfile);
wbytes2=fwrite((void*)header_buf2, len2, 1, outfile);
wbytes3=fwrite((void*)header_buf3, len3, 1, outfile);
#endif
succp = true;
return_succp:
if (header_buf2 != NULL) {
free(header_buf2);
}
free(t);
free(tm);
return succp;
}
/** /**
* The main entry point into the gateway * The main entry point into the gateway
* *
@ -219,25 +274,32 @@ return_home:
int int
main(int argc, char **argv) main(int argc, char **argv)
{ {
int daemon_mode = 1; int daemon_mode = 1;
sigset_t sigset; sigset_t sigset;
int i, n, n_threads, n_services; int i, n, n_threads, n_services;
void **threads; void **threads;
char mysql_home[1024], buf[1024], *home, *cnf_file = NULL; char mysql_home[1024], buf[1024], *home, *cnf_file = NULL;
char ddopt[1024]; char ddopt[1024];
void* log_flush_thr = NULL; void* log_flush_thr = NULL;
ssize_t log_flush_timeout_ms = 0; ssize_t log_flush_timeout_ms = 0;
int l; int l;
sigset_t sigpipe_mask;
sigset_t saved_mask;
sigemptyset(&sigpipe_mask);
sigaddset(&sigpipe_mask, SIGPIPE);
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
memset(conn_open, 0, sizeof(bool)*1024); memset(conn_open, 0, sizeof(bool)*1024);
memset(dcb_fake_write_errno, 0, sizeof(unsigned char)*1024); memset(dcb_fake_write_errno, 0, sizeof(unsigned char)*1024);
memset(dcb_fake_write_ev, 0, sizeof(__int32_t)*1024); memset(dcb_fake_write_ev, 0, sizeof(__int32_t)*1024);
fail_next_backend_fd = false; fail_next_backend_fd = false;
fail_next_client_fd = false; fail_next_client_fd = false;
fail_next_accept = 0; fail_next_accept = 0;
fail_accept_errno = 0; fail_accept_errno = 0;
#endif #endif
file_write_header(stderr);
l = atexit(skygw_logmanager_exit); l = atexit(skygw_logmanager_exit);
if (l != 0) { if (l != 0) {
@ -315,6 +377,10 @@ fail_accept_errno = 0;
signal_set(SIGTERM, sigterm_handler); signal_set(SIGTERM, sigterm_handler);
signal_set(SIGINT, sigint_handler); signal_set(SIGINT, sigint_handler);
if (pthread_sigmask(SIG_BLOCK, &sigpipe_mask, &saved_mask) == -1) {
perror("pthread_sigmask");
exit(1);
}
l = atexit(libmysqld_done); l = atexit(libmysqld_done);
if (l != 0) { if (l != 0) {
@ -517,7 +583,7 @@ fail_accept_errno = 0;
* Shutdown the gateway * Shutdown the gateway
*/ */
void void
shutdown_gateway() shutdown_gateway()
{ {
poll_shutdown(); poll_shutdown();
log_flush_shutdown(); log_flush_shutdown();

View File

@ -171,6 +171,11 @@ poll_remove_dcb(DCB *dcb)
/** It is possible that dcb has already been removed from the set */ /** It is possible that dcb has already been removed from the set */
if (dcb->state != DCB_STATE_POLLING) { if (dcb->state != DCB_STATE_POLLING) {
if (dcb->state == DCB_STATE_NOPOLLING ||
dcb->state == DCB_STATE_ZOMBIE)
{
rc = 0;
}
goto return_rc; goto return_rc;
} }
@ -370,23 +375,52 @@ poll_waitevents(void *arg)
} }
if (ev & EPOLLHUP) if (ev & EPOLLHUP)
{ {
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] "
"EPOLLHUP on dcb %p, fd %d. "
"Errno %d, %s.",
pthread_self(),
dcb,
dcb->fd,
eno,
strerror(eno));
atomic_add(&pollStats.n_hup, 1); atomic_add(&pollStats.n_hup, 1);
dcb->func.hangup(dcb); dcb->func.hangup(dcb);
} }
if (ev & EPOLLOUT) if (ev & EPOLLOUT)
{ {
int eno = 0; int eno = 0;
simple_mutex_lock(&dcb->dcb_write_lock,
true);
eno = gw_getsockerrno(dcb->fd); eno = gw_getsockerrno(dcb->fd);
ss_dassert(eno == 0);
ss_info_dassert(!dcb->dcb_write_active, if (eno == 0) {
simple_mutex_lock(
&dcb->dcb_write_lock,
true);
ss_info_dassert(
!dcb->dcb_write_active,
"Write already active"); "Write already active");
dcb->dcb_write_active = TRUE; dcb->dcb_write_active = TRUE;
atomic_add(&pollStats.n_write, 1); atomic_add(&pollStats.n_write, 1);
dcb->func.write_ready(dcb); dcb->func.write_ready(dcb);
dcb->dcb_write_active = FALSE; dcb->dcb_write_active = FALSE;
simple_mutex_unlock(&dcb->dcb_write_lock); simple_mutex_unlock(
&dcb->dcb_write_lock);
} else {
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] "
"EPOLLOUT due %d, %s. "
"dcb %p, fd %i",
pthread_self(),
eno,
strerror(eno),
dcb,
dcb->fd);
}
} }
if (ev & EPOLLIN) if (ev & EPOLLIN)
{ {

View File

@ -67,7 +67,7 @@ typedef struct router_instance {
SERVICE *service; /**< Pointer to the service using this router */ SERVICE *service; /**< Pointer to the service using this router */
ROUTER_CLIENT_SES *connections; /**< Link list of all the client connections */ ROUTER_CLIENT_SES *connections; /**< Link list of all the client connections */
SPINLOCK lock; /**< Spinlock for the instance data */ SPINLOCK lock; /**< Spinlock for the instance data */
BACKEND **servers; /**< The set of backend servers for this router*/ BACKEND **servers; /**< List of backend servers */
unsigned int bitmask; /**< Bitmask to apply to server->status */ unsigned int bitmask; /**< Bitmask to apply to server->status */
unsigned int bitvalue; /**< Required value of server->status */ unsigned int bitvalue; /**< Required value of server->status */
ROUTER_STATS stats; /**< Statistics for this router */ ROUTER_STATS stats; /**< Statistics for this router */

View File

@ -31,28 +31,26 @@
#include <dcb.h> #include <dcb.h>
typedef struct client_session CLIENT_SESSION;
typedef struct instance INSTANCE;
/** /**
* Internal structure used to define the set of backend servers we are routing * Internal structure used to define the set of backend servers we are routing
* connections to. This provides the storage for routing module specific data * connections to. This provides the storage for routing module specific data
* that is required for each of the backend servers. * that is required for each of the backend servers.
*/ */
typedef struct backend { typedef struct backend {
SERVER* server; /**< The server itself */ SERVER* backend_server; /**< The server itself */
int count; /**< Number of connections to the server */ int backend_conn_count; /**< Number of connections to the server */
} BACKEND; } BACKEND;
/** /**
* The client session structure used within this router. * The client session structure used within this router.
*/ */
struct client_session { typedef struct router_client_session {
BACKEND* slave; /**< Slave used by the client session */ BACKEND* be_slave; /**< Slave backend used by client session */
BACKEND* master; /**< Master used by the client session */ BACKEND* be_master; /**< Master backend used by client session */
DCB* slaveconn; /**< Slave connection */ DCB* slave_dcb; /**< Slave connection */
DCB* masterconn; /**< Master connection */ DCB* master_dcb; /**< Master connection */
CLIENT_SESSION* next; struct router_client_session* next;
}; } ROUTER_CLIENT_SES;
/** /**
* The statistics for this router instance * The statistics for this router instance
@ -60,24 +58,26 @@ struct client_session {
typedef struct { typedef struct {
int n_sessions; /**< Number sessions created */ int n_sessions; /**< Number sessions created */
int n_queries; /**< Number of queries forwarded */ int n_queries; /**< Number of queries forwarded */
int n_master; /**< Number of statements sent to master */ int n_master; /**< Number of stmts sent to master */
int n_slave; /**< Number of statements sent to slave */ int n_slave; /**< Number of stmts sent to slave */
int n_all; /**< Number of statements sent to all */ int n_all; /**< Number of stmts sent to all */
} ROUTER_STATS; } ROUTER_STATS;
/** /**
* The per instance data for the router. * The per instance data for the router.
*/ */
struct instance { typedef struct router_instance {
SERVICE* service; /**< Pointer to the service using this router */ SERVICE* service; /**< Pointer to service */
CLIENT_SESSION* connections; /**< Link list of all the client connections */ ROUTER_CLIENT_SES* connections; /**< List of client connections */
SPINLOCK lock; /**< Spinlock for the instance data */ SPINLOCK lock; /**< Lock for the instance data */
BACKEND** servers; /**< The set of backend servers for this instance */ BACKEND** servers; /**< Backend servers */
BACKEND* master; /**< NULL if not known, pointer otherwise */ BACKEND* master; /**< NULL or pointer */
unsigned int bitmask; /**< Bitmask to apply to server->status */
unsigned int bitvalue; /**< Required value of server->status */
ROUTER_STATS stats; /**< Statistics for this router */ ROUTER_STATS stats; /**< Statistics for this router */
INSTANCE* next; struct router_instance* next; /**< Next router on the list */
}; } ROUTER_INSTANCE;
#endif #endif

View File

@ -89,7 +89,6 @@ version()
void void
ModuleInit() ModuleInit()
{ {
fprintf(stderr, "Initialise HTTPD Protocol module.\n");
} }
/** /**
@ -369,6 +368,7 @@ struct sockaddr_in addr;
char *port; char *port;
int one = 1; int one = 1;
short pnum; short pnum;
int rc;
memcpy(&listener->func, &MyObject, sizeof(GWPROTOCOL)); memcpy(&listener->func, &MyObject, sizeof(GWPROTOCOL));
@ -403,7 +403,23 @@ short pnum;
{ {
return 0; return 0;
} }
listen(listener->fd, SOMAXCONN);
rc = listen(listener->fd, SOMAXCONN);
if (rc == 0) {
fprintf(stderr,
"Listening http connections at %s\n",
config);
} else {
int eno = errno;
errno = 0;
fprintf(stderr,
"\n* Failed to start listening http due error %d, %s\n\n",
eno,
strerror(eno));
return 0;
}
if (poll_add_dcb(listener) == -1) if (poll_add_dcb(listener) == -1)
{ {

View File

@ -90,12 +90,6 @@ version()
void void
ModuleInit() ModuleInit()
{ {
#if defined(SS_DEBUG)
skygw_log_write(
LOGFILE_MESSAGE,
strdup("Initial MySQL Backend Protcol module."));
#endif
fprintf(stderr, "Initial MySQL Backend Protcol module.\n");
} }
/* /*
@ -187,14 +181,14 @@ static int gw_read_backend_event(DCB *dcb) {
current_session->client_sha1, current_session->client_sha1,
backend_protocol) != 0) backend_protocol) != 0)
{ {
backend_protocol->state = MYSQL_AUTH_FAILED; ss_dassert(backend_protocol->state == MYSQL_AUTH_FAILED);
rc = 1; rc = 1;
} else { } else {
/** /**
* next step is to wait server's response with * next step is to wait server's response with
* a new EPOLLIN event * a new EPOLLIN event
*/ */
backend_protocol->state = MYSQL_AUTH_RECV; ss_dassert(backend_protocol->state == MYSQL_AUTH_RECV);
rc = 0; rc = 0;
goto return_rc; goto return_rc;
} }

View File

@ -83,7 +83,6 @@ version()
void void
ModuleInit() ModuleInit()
{ {
fprintf(stderr, "Initialise MySQL Client Protocol module.\n");
} }
/** /**
@ -380,7 +379,7 @@ static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue) {
*/ */
// now get the user // now get the user
strcpy(username, (char *)(client_auth_packet + 4 + 4 + 4 + 1 + 23)); strcpy(username, (char *)(client_auth_packet + 4 + 4 + 4 + 1 + 23));
fprintf(stderr, "<<< Client username is [%s]\n", username); /* fprintf(stderr, "<<< Client username is [%s]\n", username); */
// get the auth token len // get the auth token len
memcpy(&auth_token_len, memcpy(&auth_token_len,
@ -392,9 +391,9 @@ static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue) {
strcpy(database, strcpy(database,
(char *)(client_auth_packet + 4 + 4 + 4 + 1 + 23 + strlen(username) + (char *)(client_auth_packet + 4 + 4 + 4 + 1 + 23 + strlen(username) +
1 + 1 + auth_token_len)); 1 + 1 + auth_token_len));
fprintf(stderr, "<<< Client selected db is [%s]\n", database); /* fprintf(stderr, "<<< Client selected db is [%s]\n", database); */
} else { } else {
fprintf(stderr, "<<< Client is NOT connected with db\n"); /* fprintf(stderr, "<<< Client is NOT connected with db\n"); */
} }
// allocate memory for token only if auth_token_len > 0 // allocate memory for token only if auth_token_len > 0
@ -832,6 +831,7 @@ int gw_MySQLListener(
char address[1024] = ""; char address[1024] = "";
int port = 0; int port = 0;
int one = 1; int one = 1;
int rc;
/* this gateway, as default, will bind on port 4404 for localhost only */ /* this gateway, as default, will bind on port 4404 for localhost only */
if (config_bind != NULL) { if (config_bind != NULL) {
@ -866,8 +866,10 @@ int gw_MySQLListener(
// socket create // socket create
if ((l_so = socket(AF_INET, SOCK_STREAM, 0)) < 0) { if ((l_so = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
fprintf(stderr, fprintf(stderr,
">>> Error: can't open listening socket. Errno %i, %s\n", "\n* Error: can't open listening socket due "
errno, strerror(errno)); "error %i, %s.\n\n\t",
errno,
strerror(errno));
return 0; return 0;
} }
// socket options // socket options
@ -879,30 +881,49 @@ int gw_MySQLListener(
// bind address and port // bind address and port
if (bind(l_so, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { if (bind(l_so, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
fprintf(stderr, fprintf(stderr,
">>> Bind failed !!! %i, [%s]\n", "\n* Bind failed due error %i, %s.\n",
errno, errno,
strerror(errno)); strerror(errno));
fprintf(stderr, ">>> can't bind to address and port"); fprintf(stderr, "* Can't bind to %s\n\n",
bind_address_and_port);
return 0; return 0;
} }
/*
fprintf(stderr, fprintf(stderr,
">> GATEWAY bind is: %s:%i. FD is %i\n", ">> GATEWAY bind is: %s:%i. FD is %i\n",
address, address,
port, port,
l_so); l_so);
*/
listen(l_so, 10 * SOMAXCONN); rc = listen(l_so, 10 * SOMAXCONN);
if (rc == 0) {
fprintf(stderr,
"Listening MySQL connections at %s\n",
bind_address_and_port);
} else {
int eno = errno;
errno = 0;
fprintf(stderr,
"\n* Failed to start listening MySQL due error %d, %s\n\n",
eno,
strerror(eno));
return 0;
}
/*
fprintf(stderr, fprintf(stderr,
">> GATEWAY listen backlog queue is %i\n", ">> GATEWAY listen backlog queue is %i\n",
10 * SOMAXCONN); 10 * SOMAXCONN);
*/
// assign l_so to dcb // assign l_so to dcb
listen_dcb->fd = l_so; listen_dcb->fd = l_so;
// add listening socket to poll structure // add listening socket to poll structure
if (poll_add_dcb(listen_dcb) == -1) { if (poll_add_dcb(listen_dcb) == -1) {
fprintf(stderr, fprintf(stderr,
">>> poll_add_dcb: can't add the listen_sock! Errno " "\n* Failed to start polling the socket due error "
"%i, %s\n", "%i, %s.\n\n",
errno, errno,
strerror(errno)); strerror(errno));
return 0; return 0;
@ -943,7 +964,6 @@ int gw_MySQLAccept(DCB *listener)
int i = 0; int i = 0;
CHK_DCB(listener); CHK_DCB(listener);
fprintf(stderr, "MySQL Listener socket is: %i\n", listener->fd);
while (1) { while (1) {
@ -1026,13 +1046,14 @@ int gw_MySQLAccept(DCB *listener)
c_sock); c_sock);
conn_open[c_sock] = true; conn_open[c_sock] = true;
#endif #endif
/*
fprintf(stderr, fprintf(stderr,
"Processing %i connection fd %i for listener %i\n", "Processing %i connection fd %i for listener %i\n",
listener->stats.n_accepts, listener->stats.n_accepts,
c_sock, c_sock,
listener->fd); listener->fd);
// set nonblocking */
/* set nonblocking */
setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen); setsockopt(c_sock, SOL_SOCKET, SO_SNDBUF, &sendbuf, optlen);
setnonblocking(c_sock); setnonblocking(c_sock);
@ -1103,9 +1124,9 @@ int gw_MySQLAccept(DCB *listener)
CHK_PROTOCOL(((MySQLProtocol *)client_dcb->protocol)); CHK_PROTOCOL(((MySQLProtocol *)client_dcb->protocol));
} }
#endif #endif
return_rc: return_rc:
return rc; return rc;
} }
/* /*
*/ */

View File

@ -336,7 +336,12 @@ bool gw_receive_backend_auth(
* @param passwd The SHA1(real_password): Note real_password is unknown * @param passwd The SHA1(real_password): Note real_password is unknown
* @return 0 on success, 1 on failure * @return 0 on success, 1 on failure
*/ */
int gw_send_authentication_to_backend(char *dbname, char *user, uint8_t *passwd, MySQLProtocol *conn) { int gw_send_authentication_to_backend(
char *dbname,
char *user,
uint8_t *passwd,
MySQLProtocol *conn)
{
int compress = 0; int compress = 0;
int rv; int rv;
uint8_t *payload = NULL; uint8_t *payload = NULL;

View File

@ -353,6 +353,7 @@ struct sockaddr_in addr;
char *port; char *port;
int one = 1; int one = 1;
short pnum; short pnum;
int rc;
memcpy(&listener->func, &MyObject, sizeof(GWPROTOCOL)); memcpy(&listener->func, &MyObject, sizeof(GWPROTOCOL));
@ -381,7 +382,23 @@ short pnum;
{ {
return 0; return 0;
} }
listen(listener->fd, SOMAXCONN);
rc = listen(listener->fd, SOMAXCONN);
if (rc == 0) {
fprintf(stderr,
"Listening telnet connections at %s\n",
config);
} else {
int eno = errno;
errno = 0;
fprintf(stderr,
"\n* Failed to start listening telnet due error %d, %s\n\n",
eno,
strerror(eno));
return 0;
}
if (poll_add_dcb(listener) == -1) if (poll_add_dcb(listener) == -1)
{ {

View File

@ -233,8 +233,8 @@ int i, n;
else else
{ {
skygw_log_write(LOGFILE_ERROR, skygw_log_write(LOGFILE_ERROR,
"Unsupported router option %s for " "Warning : Unsupported router "
"readconnroute\n", "option %s for readconnroute.",
options[i]); options[i]);
} }
} }
@ -349,10 +349,9 @@ int i;
if (!candidate) { if (!candidate) {
skygw_log_write_flush( skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"%lu [newSession] Failed to create new routing session. " "Error : Failed to create new routing session. "
"Couldn't find eligible candidate server. Freeing " "Couldn't find eligible candidate server. Freeing "
"allocated resources.", "allocated resources.");
pthread_self());
free(client_ses); free(client_ses);
return NULL; return NULL;
} }
@ -382,10 +381,9 @@ int i;
atomic_add(&candidate->current_connection_count, -1); atomic_add(&candidate->current_connection_count, -1);
skygw_log_write( skygw_log_write(
LOGFILE_ERROR, LOGFILE_ERROR,
"%lu [newSession] Failed to create new routing session. " "Error : Failed to create new routing session. "
"Couldn't establish connection to candidate server " "Couldn't establish connection to candidate server "
"listening to port %d. Freeing allocated resources.", "listening to port %d. Freeing allocated resources.",
pthread_self(),
candidate->server->port); candidate->server->port);
free(client_ses); free(client_ses);
return NULL; return NULL;

View File

@ -18,10 +18,11 @@
#include <stdio.h> #include <stdio.h>
#include <strings.h> #include <strings.h>
#include <string.h> #include <string.h>
#include <stdlib.h>
#include <router.h> #include <router.h>
#include <readwritesplit.h> #include <readwritesplit.h>
#include <stdlib.h>
#include <mysql.h> #include <mysql.h>
#include <skygw_utils.h> #include <skygw_utils.h>
#include <log_manager.h> #include <log_manager.h>
@ -58,8 +59,16 @@ static void closeSession(ROUTER *instance, void *session);
static void freeSession(ROUTER *instance, void *session); static void freeSession(ROUTER *instance, void *session);
static int routeQuery(ROUTER *instance, void *session, GWBUF *queue); static int routeQuery(ROUTER *instance, void *session, GWBUF *queue);
static void diagnostic(ROUTER *instance, DCB *dcb); static void diagnostic(ROUTER *instance, DCB *dcb);
static void clientReply(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb); static void clientReply(
ROUTER* instance,
void* router_session,
GWBUF* queue,
DCB* backend_dcb);
static bool search_backend_servers(
BACKEND** p_master,
BACKEND** p_slave,
ROUTER_INSTANCE* router);
static ROUTER_OBJECT MyObject = { static ROUTER_OBJECT MyObject = {
createInstance, createInstance,
@ -70,8 +79,9 @@ static ROUTER_OBJECT MyObject = {
diagnostic, diagnostic,
clientReply clientReply
}; };
static SPINLOCK instlock; static SPINLOCK instlock;
static INSTANCE* instances; static ROUTER_INSTANCE* instances;
/** /**
* Implementation of the mandatory version entry point * Implementation of the mandatory version entry point
@ -106,7 +116,8 @@ ModuleInit()
* @return The module object * @return The module object
*/ */
ROUTER_OBJECT* GetModuleObject() { ROUTER_OBJECT* GetModuleObject() {
skygw_log_write(LOGFILE_TRACE, skygw_log_write(
LOGFILE_TRACE,
"Returning readwritesplit router module object."); "Returning readwritesplit router module object.");
return &MyObject; return &MyObject;
} }
@ -131,67 +142,117 @@ static ROUTER* createInstance(
SERVICE* service, SERVICE* service,
char** options) char** options)
{ {
INSTANCE* inst; ROUTER_INSTANCE* router;
SERVER* server; SERVER* server;
int n; int n;
int i; int i;
if ((inst = calloc(1, sizeof(INSTANCE))) == NULL) { if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
return NULL; return NULL;
} }
inst->service = service; router->service = service;
spinlock_init(&inst->lock); spinlock_init(&router->lock);
inst->connections = NULL; router->connections = NULL;
/** Calculate number of servers */ /** Calculate number of servers */
for (server = service->databases, n = 0; server; server = server->nextdb) { server = service->databases;
for (n=0; server != NULL; server=server->nextdb) {
n++; n++;
} }
inst->servers = (BACKEND **)calloc(n + 1, sizeof(BACKEND *));
if (!inst->servers) { router->servers = (BACKEND **)calloc(n + 1, sizeof(BACKEND *));
free(inst);
if (router->servers == NULL)
{
free(router);
return NULL; return NULL;
} }
if (options) if (options != NULL)
{ {
skygw_log_write_flush(LOGFILE_MESSAGE, skygw_log_write_flush(
"Router options supplied to read/write split router module but none are supported. The options will be ignored.\n"); LOGFILE_MESSAGE,
"Router options supplied to read/write split router "
"module but none are supported. The options will be "
"ignored.");
} }
/** /**
* We need an array of the backend servers in the instance structure so * Create an array of the backend servers in the router structure to
* that we can maintain a count of the number of connections to each * maintain a count of the number of connections to each
* backend server. * backend server.
*/ */
for (server = service->databases, n = 0; server; server = server->nextdb) { server = service->databases;
n = 0;
if ((inst->servers[n] = malloc(sizeof(BACKEND))) == NULL) { while (server != NULL) {
if ((router->servers[n] = malloc(sizeof(BACKEND))) == NULL)
{
for (i = 0; i < n; i++) { for (i = 0; i < n; i++) {
free(inst->servers[i]); free(router->servers[i]);
} }
free(inst->servers); free(router->servers);
free(inst); free(router);
return NULL; return NULL;
} }
inst->servers[n]->server = server; router->servers[n]->backend_server = server;
inst->servers[n]->count = 0; router->servers[n]->backend_conn_count = 0;
n++; n += 1;
server = server->nextdb;
} }
inst->servers[n] = NULL; router->servers[n] = NULL;
/** /**
* We have completed the creation of the instance data, so now * vraa : is this necessary for readwritesplit ?
* insert this router instance into the linked list of routers * Option : where can a read go?
* - master (only)
* - slave (only)
* - joined (to both)
*
* Process the options
*/
router->bitmask = 0;
router->bitvalue = 0;
if (options)
{
for (i = 0; options[i]; i++)
{
if (!strcasecmp(options[i], "master"))
{
router->bitmask |= (SERVER_MASTER|SERVER_SLAVE);
router->bitvalue |= SERVER_MASTER;
}
else if (!strcasecmp(options[i], "slave"))
{
router->bitmask |= (SERVER_MASTER|SERVER_SLAVE);
router->bitvalue |= SERVER_SLAVE;
}
else if (!strcasecmp(options[i], "joined"))
{
router->bitmask |= (SERVER_JOINED);
router->bitvalue |= SERVER_JOINED;
}
else
{
skygw_log_write_flush(
LOGFILE_ERROR,
"Warning : Unsupported router option %s "
"for readwritesplitrouter.",
options[i]);
}
}
}
/**
* We have completed the creation of the router data, so now
* insert this router into the linked list of routers
* that have been created with this module. * that have been created with this module.
*/ */
spinlock_acquire(&instlock); spinlock_acquire(&instlock);
inst->next = instances; router->next = instances;
instances = inst; instances = router;
spinlock_release(&instlock); spinlock_release(&instlock);
return (ROUTER *)inst; return (ROUTER *)router;
} }
/** /**
@ -205,15 +266,19 @@ static ROUTER* createInstance(
* @return Session specific data for this session * @return Session specific data for this session
*/ */
static void* newSession( static void* newSession(
ROUTER* instance, ROUTER* router_inst,
SESSION* session) SESSION* session)
{ {
BACKEND* candidate = NULL; BACKEND* be_slave = NULL;
CLIENT_SESSION* client; BACKEND* be_master = NULL;
INSTANCE* inst = (INSTANCE *)instance; ROUTER_CLIENT_SES* client_rses;
int i; ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst;
bool succp;
if ((client = (CLIENT_SESSION *)malloc(sizeof(CLIENT_SESSION))) == NULL) client_rses =
(ROUTER_CLIENT_SES *)malloc(sizeof(ROUTER_CLIENT_SES));
if (client_rses == NULL)
{ {
return NULL; return NULL;
} }
@ -223,101 +288,58 @@ static void* newSession(
* load balancing algorithm we need to implement for this simple * load balancing algorithm we need to implement for this simple
* connection router. * connection router.
*/ */
for (i = 0; inst->servers[i]; i++) succp = search_backend_servers(&be_master, &be_slave, router);
{
if (inst->servers[i] && SERVER_IS_SLAVE(inst->servers[i]->server)) /** Both Master and Slave must be found */
{ if (!succp) {
candidate = inst->servers[i]; free(client_rses);
break;
}
}
/**
* Loop over all the servers and find any that have fewer connections
* than our candidate server.
*
* If a server has less connections than the current candidate we mark this
* as the new candidate to connect to.
*
* If a server has the same number of connections currently as the candidate
* and has had less connections over time than the candidate it will also
* become the new candidate. This has the effect of spreading the connections
* over different servers during periods of very low load.
*/
for (i = 0; inst->servers[i]; i++) {
if (inst->servers[i]
&& SERVER_IS_RUNNING(inst->servers[i]->server))
{
if (SERVER_IS_SLAVE(inst->servers[i]->server))
{
if (inst->servers[i]->count < candidate->count) {
candidate = inst->servers[i];
} else if (inst->servers[i]->count == candidate->count &&
inst->servers[i]->server->stats.n_connections
< candidate->server->stats.n_connections)
{
candidate = inst->servers[i];
}
} else if (SERVER_IS_MASTER(inst->servers[i]->server)) {
/** master is found */
inst->master = inst->servers[i];
}
}
} /* for */
if (candidate == NULL)
{
skygw_log_write_flush(LOGFILE_MESSAGE,
"No suitable servers found for connection.");
free(client);
return NULL; return NULL;
} }
if (inst->master == NULL) {
inst->master = inst->servers[i-1];
}
/**
* We now have a master and a slave server with the least connections.
* Bump the connection counts for these servers.
*/
atomic_add(&candidate->count, 1);
client->slave = candidate;
atomic_add(&inst->master->count, 1);
client->master = inst->master;
ss_dassert(client->master->server != candidate->server);
/** /**
* Open the slave connection. * Open the slave connection.
*/ */
if ((client->slaveconn = dcb_connect(candidate->server, session, client_rses->slave_dcb = dcb_connect(be_slave->backend_server,
candidate->server->protocol)) == NULL) session,
{ be_slave->backend_server->protocol);
atomic_add(&candidate->count, -1);
free(client); if (client_rses->slave_dcb == NULL) {
free(client_rses);
return NULL; return NULL;
} }
/** /**
* Open the master connection. * Open the master connection.
*/ */
if ((client->masterconn = dcb_connect(client->master->server, session, client_rses->master_dcb = dcb_connect(be_master->backend_server,
client->master->server->protocol)) == NULL) session,
be_master->backend_server->protocol);
if (client_rses->master_dcb == NULL)
{ {
atomic_add(&client->master->count, -1); /** Close slave connection first. */
free(client); client_rses->slave_dcb->func.close(client_rses->slave_dcb);
free(client_rses);
return NULL; return NULL;
} }
inst->stats.n_sessions += 1; /**
* We now have a master and a slave server with the least connections.
* Bump the connection counts for these servers.
*/
atomic_add(&be_slave->backend_conn_count, 1);
atomic_add(&be_master->backend_conn_count, 1);
/* Add this session to end of the list of active sessions */ client_rses->be_slave = be_slave;
spinlock_acquire(&inst->lock); client_rses->be_master = be_master;
client->next = inst->connections; router->stats.n_sessions += 1;
inst->connections = client;
spinlock_release(&inst->lock);
return (void *)client; /**
* Add this session to end of the list of active sessions in router.
*/
spinlock_acquire(&router->lock);
client_rses->next = router->connections;
router->connections = client_rses;
spinlock_release(&router->lock);
return (void *)client_rses;
} }
/** /**
@ -331,47 +353,88 @@ static void closeSession(
ROUTER* instance, ROUTER* instance,
void* router_session) void* router_session)
{ {
INSTANCE* inst = (INSTANCE *)instance; #if 0
CLIENT_SESSION* session = (CLIENT_SESSION *)router_session; ROUTER_INSTANCE* router;
#endif
ROUTER_CLIENT_SES* rsession;
rsession = (ROUTER_CLIENT_SES *)router_session;
#if 0
router = (ROUTER_INSTANCE *)instance;
atomic_add(&rsession->be_slave->backend_conn_count, -1);
atomic_add(&rsession->be_master->backend_conn_count, -1);
atomic_add(&rsession->be_slave->backend_server->stats.n_current, -1);
atomic_add(&rsession->be_master->backend_server->stats.n_current, -1);
#endif
/** /**
* Close the connection to the backend servers * Close the connection to the backend servers
*/ */
session->slaveconn->func.close(session->slaveconn); rsession->slave_dcb->func.close(rsession->slave_dcb);
session->masterconn->func.close(session->masterconn); rsession->master_dcb->func.close(rsession->master_dcb);
atomic_add(&session->slave->count, -1); #if 0
atomic_add(&session->master->count, -1); spinlock_acquire(&router->lock);
atomic_add(&session->slave->server->stats.n_current, -1); if (router->connections == rsession) {
atomic_add(&session->master->server->stats.n_current, -1); router->connections = rsession->next;
spinlock_acquire(&inst->lock);
if (inst->connections == session) {
inst->connections = session->next;
} else { } else {
CLIENT_SESSION* ptr = inst->connections; ROUTER_CLIENT_SES* ptr = router->connections;
while (ptr && ptr->next != session) { while (ptr && ptr->next != rsession) {
ptr = ptr->next; ptr = ptr->next;
} }
if (ptr) { if (ptr) {
ptr->next = session->next; ptr->next = rsession->next;
} }
} }
spinlock_release(&inst->lock); spinlock_release(&router->lock);
/* /*
* We are no longer in the linked list, free * We are no longer in the linked list, free
* all the memory and other resources associated * all the memory and other resources associated
* to the client session. * to the client session.
*/ */
free(session); free(rsession);
#endif
} }
static void freeSession( static void freeSession(
ROUTER* router_instance, ROUTER* router_instance,
void* router_client_session) void* router_client_session)
{ {
ROUTER_CLIENT_SES* rsession;
ROUTER_INSTANCE* router;
rsession = (ROUTER_CLIENT_SES *)router_client_session;
router = (ROUTER_INSTANCE *)router_instance;
atomic_add(&rsession->be_slave->backend_conn_count, -1);
atomic_add(&rsession->be_master->backend_conn_count, -1);
atomic_add(&rsession->be_slave->backend_server->stats.n_current, -1);
atomic_add(&rsession->be_master->backend_server->stats.n_current, -1);
spinlock_acquire(&router->lock);
if (router->connections == rsession) {
router->connections = rsession->next;
} else {
ROUTER_CLIENT_SES* ptr = router->connections;
while (ptr && ptr->next != rsession) {
ptr = ptr->next;
}
if (ptr) {
ptr->next = rsession->next;
}
}
spinlock_release(&router->lock);
/*
* We are no longer in the linked list, free
* all the memory and other resources associated
* to the client session.
*/
free(rsession);
return; return;
} }
@ -405,8 +468,8 @@ static int routeQuery(
int ret = 0; int ret = 0;
GWBUF *cq = NULL; GWBUF *cq = NULL;
INSTANCE* inst = (INSTANCE *)instance; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
CLIENT_SESSION* session = (CLIENT_SESSION *)router_session; ROUTER_CLIENT_SES* rsession = (ROUTER_CLIENT_SES *)router_session;
inst->stats.n_queries++; inst->stats.n_queries++;
packet = GWBUF_DATA(queue); packet = GWBUF_DATA(queue);
@ -466,7 +529,7 @@ static int routeQuery(
"Query type\t%s, routing to Master.", "Query type\t%s, routing to Master.",
STRQTYPE(qtype)); STRQTYPE(qtype));
#endif #endif
ret = session->masterconn->func.write(session->masterconn, queue); ret = rsession->master_dcb->func.write(rsession->master_dcb, queue);
atomic_add(&inst->stats.n_master, 1); atomic_add(&inst->stats.n_master, 1);
goto return_ret; goto return_ret;
break; break;
@ -478,7 +541,7 @@ static int routeQuery(
"Query type\t%s, routing to Slave.", "Query type\t%s, routing to Slave.",
STRQTYPE(qtype)); STRQTYPE(qtype));
#endif #endif
ret = session->slaveconn->func.write(session->slaveconn, queue); ret = rsession->slave_dcb->func.write(rsession->slave_dcb, queue);
atomic_add(&inst->stats.n_slave, 1); atomic_add(&inst->stats.n_slave, 1);
goto return_ret; goto return_ret;
break; break;
@ -500,16 +563,20 @@ static int routeQuery(
switch(packet_type) { switch(packet_type) {
case COM_QUIT: case COM_QUIT:
ret = session->masterconn->func.write(session->masterconn, queue); ret = rsession->master_dcb->func.write(rsession->master_dcb, queue);
session->slaveconn->func.write(session->slaveconn, cq); rsession->slave_dcb->func.write(rsession->slave_dcb, cq);
break; break;
case COM_CHANGE_USER: case COM_CHANGE_USER:
session->masterconn->func.auth(session->masterconn, NULL, session->masterconn->session, queue); rsession->master_dcb->func.auth(rsession->master_dcb, NULL, rsession->master_dcb->session, queue);
session->slaveconn->func.auth(session->slaveconn, NULL, session->masterconn->session, cq); rsession->slave_dcb->func.auth(rsession->slave_dcb, NULL, rsession->master_dcb->session, cq);
break; break;
default: default:
ret = session->masterconn->func.session(session->masterconn, (void *)queue); ret = rsession->master_dcb->func.session(
session->slaveconn->func.session(session->slaveconn, (void *)cq); rsession->master_dcb,
(void *)queue);
rsession->slave_dcb->func.session(
rsession->slave_dcb,
(void *)cq);
break; break;
} }
@ -525,7 +592,7 @@ static int routeQuery(
STRQTYPE(qtype)); STRQTYPE(qtype));
#endif #endif
/** Is this really ok? */ /** Is this really ok? */
ret = session->masterconn->func.write(session->masterconn, queue); ret = rsession->master_dcb->func.write(rsession->master_dcb, queue);
atomic_add(&inst->stats.n_master, 1); atomic_add(&inst->stats.n_master, 1);
goto return_ret; goto return_ret;
break; break;
@ -547,26 +614,37 @@ return_ret:
static void static void
diagnostic(ROUTER *instance, DCB *dcb) diagnostic(ROUTER *instance, DCB *dcb)
{ {
CLIENT_SESSION *session; ROUTER_CLIENT_SES *rsession;
INSTANCE *inst = (INSTANCE *)instance; ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
int i = 0; int i = 0;
spinlock_acquire(&inst->lock); spinlock_acquire(&router->lock);
session = inst->connections; rsession = router->connections;
while (session) while (rsession)
{ {
i++; i++;
session = session->next; rsession = rsession->next;
} }
spinlock_release(&inst->lock); spinlock_release(&router->lock);
dcb_printf(dcb, "\tNumber of router sessions: %d\n", inst->stats.n_sessions);
dcb_printf(dcb, "\tCurrent no. of router sessions: %d\n", i);
dcb_printf(dcb, "\tNumber of queries forwarded: %d\n", inst->stats.n_queries);
dcb_printf(dcb, "\tNumber of queries forwarded to master: %d\n", inst->stats.n_master);
dcb_printf(dcb, "\tNumber of queries forwarded to slave: %d\n", inst->stats.n_slave);
dcb_printf(dcb, "\tNumber of queries forwarded to all: %d\n", inst->stats.n_all);
dcb_printf(dcb,
"\tNumber of router sessions: %d\n",
router->stats.n_sessions);
dcb_printf(dcb,
"\tCurrent no. of router sessions: %d\n",
i);
dcb_printf(dcb,
"\tNumber of queries forwarded: %d\n",
router->stats.n_queries);
dcb_printf(dcb,
"\tNumber of queries forwarded to master: %d\n",
router->stats.n_master);
dcb_printf(dcb,
"\tNumber of queries forwarded to slave: %d\n",
router->stats.n_slave);
dcb_printf(dcb,
"\tNumber of queries forwarded to all: %d\n",
router->stats.n_all);
} }
/** /**
@ -579,30 +657,186 @@ int i = 0;
* @param backend_dcb The backend DCB * @param backend_dcb The backend DCB
* @param queue The GWBUF with reply data * @param queue The GWBUF with reply data
*/ */
static void static void clientReply(
clientReply(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb) ROUTER* instance,
void* router_session,
GWBUF* queue,
DCB* backend_dcb)
{ {
INSTANCE* inst = NULL; DCB* master_dcb;
DCB *master = NULL; DCB* client_dcb;
DCB *client = NULL; ROUTER_CLIENT_SES* rsession;
CLIENT_SESSION* session = NULL;
inst = (INSTANCE *)instance; rsession = (ROUTER_CLIENT_SES *)router_session;
session = (CLIENT_SESSION *)router_session; master_dcb = rsession->master_dcb;
master = session->masterconn; client_dcb = backend_dcb->session->client;
client = backend_dcb->session->client;
if (backend_dcb->command == ROUTER_CHANGE_SESSION) { if (backend_dcb->command == ROUTER_CHANGE_SESSION) {
/* if backend_dcb is the master we can reply to the client */ /* if backend_dcb is the master we can reply to the client */
if (backend_dcb == master) { if (backend_dcb == master_dcb) {
master->session->client->func.write(master->session->client, queue); client_dcb->func.write(client_dcb, queue);
} else { } else {
/* just consume the gwbuf without writing to the client */ /* just consume the gwbuf without writing to the client */
gwbuf_consume(queue, gwbuf_length(queue)); gwbuf_consume(queue, gwbuf_length(queue));
} }
} else { } else {
/* normal flow */ /* normal flow */
client->func.write(client, queue); client_dcb->func.write(client_dcb, queue);
} }
} }
///
/**
* @node Search suitable backend server from those of router instance.
*
* Parameters:
* @param p_master - in, use, out
* Pointer to location where master's address is to be stored.
* If NULL, then master is not searched.
*
* @param p_slave - in, use, out
* Pointer to location where slave's address is to be stored.
* if NULL, then slave is not searched.
*
* @param inst - in, use
* Pointer to router instance
*
* @return true, if all what what requested found, false if the request
* was not satisfied or was partially satisfied.
*
*
* @details It is assumed that there is only one master among servers of
* a router instance. As a result, thr first master is always chosen.
*/
static bool search_backend_servers(
BACKEND** p_master,
BACKEND** p_slave,
ROUTER_INSTANCE* router)
{
BACKEND* be_master = NULL;
BACKEND* be_slave = NULL;
int i;
bool succp = true;
/*
* Loop over all the servers and find any that have fewer connections
* than current candidate server.
*
* If a server has less connections than the current candidate it is
* chosen to a new candidate.
*
* If a server has the same number of connections currently as the
* candidate and has had less connections over time than the candidate
* it will also become the new candidate. This has the effect of
* spreading the connections over different servers during periods of
* very low load.
*
* If master is searched for, the first master found is chosen.
*/
for (i = 0; router->servers[i] != NULL; i++) {
BACKEND* be = router->servers[i];
if (be != NULL) {
skygw_log_write(
LOGFILE_TRACE,
"%lu [newSession] Examine server %s:%d with "
"%d connections. Status is %d, "
"router->bitvalue is %d",
pthread_self(),
be->backend_server->name,
be->backend_server->port,
be->backend_conn_count,
be->backend_server->status,
router->bitmask);
}
if (be != NULL &&
SERVER_IS_RUNNING(be->backend_server) &&
(be->backend_server->status & router->bitmask) ==
router->bitvalue)
{
if (SERVER_IS_SLAVE(be->backend_server) &&
p_slave != NULL)
{
/**
* If no candidate set, set first running
* server as an initial candidate server.
*/
if (be_slave == NULL)
{
be_slave = be;
}
else if (be->backend_conn_count <
be_slave->backend_conn_count)
{
/**
* This running server has fewer
* connections, set it as a new
* candidate.
*/
be_slave = be;
}
else if (be->backend_conn_count ==
be_slave->backend_conn_count &&
be->backend_server->stats.n_connections <
be_slave->backend_server->stats.n_connections)
{
/**
* This running server has the same
* number of connections currently
* as the candidate but has had
* fewer connections over time
* than candidate, set this server
* to candidate.
*/
be_slave = be;
}
}
else if (p_master != NULL &&
be_master == NULL &&
SERVER_IS_MASTER(be->backend_server))
{
be_master = be;
}
}
}
if (p_slave != NULL && be_slave == NULL) {
succp = false;
skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Couldn't find suitable Slave from %d candidates.",
i);
}
if (p_master != NULL && be_master == NULL) {
succp = false;
skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Couldn't find suitable Master from %d candidates.",
i);
}
if (be_slave != NULL) {
*p_slave = be_slave;
skygw_log_write(
LOGFILE_TRACE,
"%lu [readwritesplit:newSession] Selected Slave %s:%d "
"from %d candidates.",
pthread_self(),
be_slave->backend_server->name,
be_slave->backend_server->port,
i);
}
if (be_master != NULL) {
*p_master = be_master;
skygw_log_write(
LOGFILE_TRACE,
"%lu [readwritesplit:newSession] Selected Master %s:%d "
"from %d candidates.",
pthread_self(),
be_master->backend_server->name,
be_master->backend_server->port,
i);
}
return succp;
}

View File

@ -56,7 +56,7 @@ version()
void void
ModuleInit() ModuleInit()
{ {
fprintf(stderr, "Initial test router module.\n");
} }
/** /**
@ -70,7 +70,6 @@ ModuleInit()
ROUTER_OBJECT * ROUTER_OBJECT *
GetModuleObject() GetModuleObject()
{ {
fprintf(stderr, "Returning test router module object.\n");
return &MyObject; return &MyObject;
} }