This commit is contained in:
VilhoRaatikka
2014-05-08 13:13:42 +03:00
parent bc84dd13f8
commit 9af5d9fb06
8 changed files with 221 additions and 155 deletions

View File

@ -253,6 +253,7 @@ static int logmanager_write_log(
va_list valist); va_list valist);
static blockbuf_t* blockbuf_init(logfile_id_t id); static blockbuf_t* blockbuf_init(logfile_id_t id);
static void blockbuf_node_done(void* bb_data);
static char* blockbuf_get_writepos( static char* blockbuf_get_writepos(
#if 0 #if 0
int** refcount, int** refcount,
@ -996,8 +997,13 @@ static char* blockbuf_get_writepos(
simple_mutex_unlock(&bb->bb_mutex); simple_mutex_unlock(&bb->bb_mutex);
return pos; return pos;
} }
static void blockbuf_node_done(
void* bb_data)
{
blockbuf_t* bb = (blockbuf_t *)bb_data;
simple_mutex_done(&bb->bb_mutex);
}
static blockbuf_t* blockbuf_init( static blockbuf_t* blockbuf_init(
@ -2059,7 +2065,7 @@ static bool logfile_init(
if (mlist_init(&logfile->lf_blockbuf_list, if (mlist_init(&logfile->lf_blockbuf_list,
NULL, NULL,
strdup("logfile block buffer list"), strdup("logfile block buffer list"),
NULL, blockbuf_node_done,
MAXNBLOCKBUFS) == NULL) MAXNBLOCKBUFS) == NULL)
{ {
ss_dfprintf(stderr, ss_dfprintf(stderr,

View File

@ -706,6 +706,10 @@ static bool skygw_stmt_causes_implicit_commit(
{ {
succp = true; succp = true;
} }
else
{
succp =false;
}
break; break;
default: default:
succp = true; succp = true;

View File

@ -203,6 +203,19 @@ int error_count = 0;
char *enable_root_user = char *enable_root_user =
config_get_value(obj->parameters, "enable_root_user"); config_get_value(obj->parameters, "enable_root_user");
if (obj->element == NULL) /*< if module load failed */
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Reading configuration "
"for router service '%s' failed. "
"Router %s is not loaded.",
obj->object,
obj->object)));
obj = obj->next;
continue; /*< process next obj */
}
if (enable_root_user) if (enable_root_user)
serviceEnableRootUser(obj->element, atoi(enable_root_user)); serviceEnableRootUser(obj->element, atoi(enable_root_user));

View File

@ -53,6 +53,17 @@ static void register_module(const char *module,
void *modobj); void *modobj);
static void unregister_module(const char *module); static void unregister_module(const char *module);
char* get_maxscale_home(void)
{
char* home = getenv("MAXSCALE_HOME");
if (home == NULL)
{
home = "/usr/local/skysql/MaxScale";
}
return home;
}
/** /**
* Load the dynamic library related to a gateway module. The routine * Load the dynamic library related to a gateway module. The routine
* will look for library files in the current directory, * will look for library files in the current directory,
@ -82,10 +93,10 @@ MODULES *mod;
sprintf(fname, "./lib%s.so", module); sprintf(fname, "./lib%s.so", module);
if (access(fname, F_OK) == -1) if (access(fname, F_OK) == -1)
{ {
if ((home = getenv("MAXSCALE_HOME")) == NULL) home = get_maxscale_home ();
home = "/usr/local/skysql/MaxScale";
sprintf(fname, "%s/modules/lib%s.so", home, module); sprintf(fname, "%s/modules/lib%s.so", home, module);
if (access(fname, F_OK) == -1)
if (access(fname, F_OK) == -1)
{ {
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
@ -100,7 +111,7 @@ MODULES *mod;
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Unable to load library for module: " "Error : Unable to load library for module: "
"%s, %s.", "%s\n\t\t\t %s.",
module, module,
dlerror()))); dlerror())));
return NULL; return NULL;
@ -111,7 +122,7 @@ MODULES *mod;
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Version interface not supported by " "Error : Version interface not supported by "
"module: %s, %s.", "module: %s\n\t\t\t %s.",
module, module,
dlerror()))); dlerror())));
dlclose(dlhandle); dlclose(dlhandle);
@ -134,7 +145,7 @@ MODULES *mod;
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Expected entry point interface missing " "Error : Expected entry point interface missing "
"from module: %s, %s.", "from module: %s\n\t\t\t %s.",
module, module,
dlerror()))); dlerror())));
dlclose(dlhandle); dlclose(dlhandle);

View File

@ -34,6 +34,8 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <ctype.h>
#include <errno.h>
#include <session.h> #include <session.h>
#include <service.h> #include <service.h>
#include <server.h> #include <server.h>
@ -70,6 +72,20 @@ SERVICE *service;
return NULL; return NULL;
if ((service->router = load_module(router, MODULE_ROUTER)) == NULL) if ((service->router = load_module(router, MODULE_ROUTER)) == NULL)
{ {
char* home = get_maxscale_home();
char* ldpath = getenv("LD_LIBRARY_PATH");
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Unable to load %s module \"%s\".\n\t\t\t"
" Ensure that lib%s.so exists in one of the "
"following directories :\n\t\t\t "
"- %s/modules\n\t\t\t - %s",
MODULE_ROUTER,
router,
router,
home,
ldpath)));
free(service); free(service);
return NULL; return NULL;
} }

View File

@ -55,4 +55,6 @@ extern void *load_module(const char *module, const char *type);
extern void unload_module(const char *module); extern void unload_module(const char *module);
extern void printModules(); extern void printModules();
extern void dprintAllModules(DCB *); extern void dprintAllModules(DCB *);
char* get_maxscale_home(void);
#endif #endif

View File

@ -122,6 +122,8 @@ struct router_client_session {
/*< cursor is pointer and status variable to current session command */ /*< cursor is pointer and status variable to current session command */
sescmd_cursor_t rses_cursor[BE_COUNT]; sescmd_cursor_t rses_cursor[BE_COUNT];
int rses_capabilities; /*< input type, for example */ int rses_capabilities; /*< input type, for example */
bool rses_autocommit_enabled;
bool rses_transaction_active;
struct router_client_session* next; struct router_client_session* next;
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail; skygw_chk_t rses_chk_tail;
@ -155,4 +157,8 @@ typedef struct router_instance {
struct router_instance* next; /*< Next router on the list */ struct router_instance* next; /*< Next router on the list */
} ROUTER_INSTANCE; } ROUTER_INSTANCE;
#define BACKEND_TYPE(b) (SERVER_IS_MASTER((b)->backend_server) ? BE_MASTER : \
(SERVER_IS_SLAVE((b)->backend_server) ? BE_SLAVE : \
(SERVER_IS_JOINED((b)->backend_server) ? BE_JOINED : BE_UNDEFINED)));
#endif /*< _RWSPLITROUTER_H */ #endif /*< _RWSPLITROUTER_H */

View File

@ -74,7 +74,7 @@ static uint8_t getCapabilities (ROUTER* inst, void* router_session);
static bool search_backend_servers( static bool search_backend_servers(
BACKEND** p_master, BACKEND** p_master,
BACKEND** p_slave, BACKEND** p_slave,
ROUTER_INSTANCE* router); ROUTER_INSTANCE* router);
static ROUTER_OBJECT MyObject = { static ROUTER_OBJECT MyObject = {
createInstance, createInstance,
@ -218,10 +218,10 @@ static ROUTER* createInstance(
SERVICE* service, SERVICE* service,
char** options) char** options)
{ {
ROUTER_INSTANCE* router; ROUTER_INSTANCE* router;
SERVER* server; SERVER* server;
int n; int n;
int i; int i;
if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) { if ((router = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
return NULL; return NULL;
@ -251,7 +251,6 @@ static ROUTER* createInstance(
"module but none are supported. The options will be " "module but none are supported. The options will be "
"ignored."))); "ignored.")));
} }
/** /**
* Create an array of the backend servers in the router structure to * Create an array of the backend servers in the router structure to
* maintain a count of the number of connections to each * maintain a count of the number of connections to each
@ -273,7 +272,7 @@ static ROUTER* createInstance(
router->servers[n]->backend_conn_count = 0; router->servers[n]->backend_conn_count = 0;
n += 1; n += 1;
server = server->nextdb; server = server->nextdb;
} }
router->servers[n] = NULL; router->servers[n] = NULL;
/** /**
@ -307,7 +306,7 @@ static ROUTER* createInstance(
} }
} }
} }
/** /**
* We have completed the creation of the router data, so now * We have completed the creation of the router data, so now
* insert this router into the linked list of routers * insert this router into the linked list of routers
* that have been created with this module. * that have been created with this module.
@ -336,12 +335,12 @@ static void* newSession(
{ {
BACKEND* local_backend[BE_COUNT]; BACKEND* local_backend[BE_COUNT];
ROUTER_CLIENT_SES* client_rses; ROUTER_CLIENT_SES* client_rses;
ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst; ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_inst;
bool succp; bool succp;
client_rses = client_rses =
(ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES)); (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
if (client_rses == NULL) if (client_rses == NULL)
{ {
ss_dassert(false); ss_dassert(false);
@ -352,7 +351,7 @@ static void* newSession(
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
client_rses->rses_chk_top = CHK_NUM_ROUTER_SES; client_rses->rses_chk_top = CHK_NUM_ROUTER_SES;
client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES;
#endif #endif
/** store pointers to sescmd list to both cursors */ /** store pointers to sescmd list to both cursors */
client_rses->rses_cursor[BE_MASTER].scmd_cur_rses = client_rses; client_rses->rses_cursor[BE_MASTER].scmd_cur_rses = client_rses;
client_rses->rses_cursor[BE_MASTER].scmd_cur_active = false; client_rses->rses_cursor[BE_MASTER].scmd_cur_active = false;
@ -367,8 +366,8 @@ static void* newSession(
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
client_rses->rses_cursor[BE_SLAVE].scmd_cur_cmd = NULL; client_rses->rses_cursor[BE_SLAVE].scmd_cur_cmd = NULL;
client_rses->rses_cursor[BE_SLAVE].scmd_cur_be_type = BE_SLAVE; client_rses->rses_cursor[BE_SLAVE].scmd_cur_be_type = BE_SLAVE;
/** /**
* Find a backend server to connect to. This is the extent of the * Find a backend server to connect to. This is the extent of the
* load balancing algorithm we need to implement for this simple * load balancing algorithm we need to implement for this simple
* connection router. * connection router.
@ -376,7 +375,7 @@ static void* newSession(
succp = search_backend_servers(&local_backend[BE_MASTER], succp = search_backend_servers(&local_backend[BE_MASTER],
&local_backend[BE_SLAVE], &local_backend[BE_SLAVE],
router); router);
/** Both Master and Slave must be found */ /** Both Master and Slave must be found */
if (!succp) { if (!succp) {
free(client_rses); free(client_rses);
@ -392,23 +391,23 @@ static void* newSession(
if (client_rses->rses_dcb[BE_SLAVE] == NULL) { if (client_rses->rses_dcb[BE_SLAVE] == NULL) {
ss_dassert(session->refcount == 1); ss_dassert(session->refcount == 1);
free(client_rses); free(client_rses);
return NULL; return NULL;
} }
/** /**
* Open the master connection. * Open the master connection.
*/ */
client_rses->rses_dcb[BE_MASTER] = dcb_connect( client_rses->rses_dcb[BE_MASTER] = dcb_connect(
local_backend[BE_MASTER]->backend_server, local_backend[BE_MASTER]->backend_server,
session, session,
local_backend[BE_MASTER]->backend_server->protocol); local_backend[BE_MASTER]->backend_server->protocol);
if (client_rses->rses_dcb[BE_MASTER] == NULL) if (client_rses->rses_dcb[BE_MASTER] == NULL)
{ {
/** Close slave connection first. */ /** Close slave connection first. */
client_rses->rses_dcb[BE_SLAVE]->func.close(client_rses->rses_dcb[BE_SLAVE]); client_rses->rses_dcb[BE_SLAVE]->func.close(client_rses->rses_dcb[BE_SLAVE]);
free(client_rses); free(client_rses);
return NULL; return NULL;
} }
/** /**
* We now have a master and a slave server with the least connections. * We now have a master and a slave server with the least connections.
* Bump the connection counts for these servers. * Bump the connection counts for these servers.
@ -418,7 +417,7 @@ static void* newSession(
client_rses->rses_backend[BE_SLAVE] = local_backend[BE_SLAVE]; client_rses->rses_backend[BE_SLAVE] = local_backend[BE_SLAVE];
client_rses->rses_backend[BE_MASTER] = local_backend[BE_MASTER]; client_rses->rses_backend[BE_MASTER] = local_backend[BE_MASTER];
router->stats.n_sessions += 1; router->stats.n_sessions += 1;
client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT; client_rses->rses_capabilities = RCAP_TYPE_STMT_INPUT;
/** /**
@ -430,15 +429,17 @@ static void* newSession(
* Add this session to end of the list of active sessions in router. * Add this session to end of the list of active sessions in router.
*/ */
spinlock_acquire(&router->lock); spinlock_acquire(&router->lock);
client_rses->next = router->connections; client_rses->next = router->connections;
router->connections = client_rses; router->connections = client_rses;
spinlock_release(&router->lock); spinlock_release(&router->lock);
CHK_CLIENT_RSES(client_rses); CHK_CLIENT_RSES(client_rses);
return (void *)client_rses; return (void *)client_rses;
} }
/** /**
* Close a session with the router, this is the mechanism * Close a session with the router, this is the mechanism
* by which a router may cleanup data structure etc. * by which a router may cleanup data structure etc.
@ -453,7 +454,7 @@ static void closeSession(
ROUTER_CLIENT_SES* router_cli_ses; ROUTER_CLIENT_SES* router_cli_ses;
DCB* slave_dcb; DCB* slave_dcb;
DCB* master_dcb; DCB* master_dcb;
router_cli_ses = (ROUTER_CLIENT_SES *)router_session; router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
CHK_CLIENT_RSES(router_cli_ses); CHK_CLIENT_RSES(router_cli_ses);
/** /**
@ -464,7 +465,7 @@ static void closeSession(
/** decrease server current connection counters */ /** decrease server current connection counters */
atomic_add(&router_cli_ses->rses_backend[BE_SLAVE]->backend_server->stats.n_current, -1); atomic_add(&router_cli_ses->rses_backend[BE_SLAVE]->backend_server->stats.n_current, -1);
atomic_add(&router_cli_ses->rses_backend[BE_MASTER]->backend_server->stats.n_current, -1); atomic_add(&router_cli_ses->rses_backend[BE_MASTER]->backend_server->stats.n_current, -1);
slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE];
router_cli_ses->rses_dcb[BE_SLAVE] = NULL; router_cli_ses->rses_dcb[BE_SLAVE] = NULL;
master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; master_dcb = router_cli_ses->rses_dcb[BE_MASTER];
@ -473,14 +474,14 @@ static void closeSession(
router_cli_ses->rses_closed = true; router_cli_ses->rses_closed = true;
/** Unlock */ /** Unlock */
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
/** /**
* Close the backend server connections * Close the backend server connections
*/ */
if (slave_dcb != NULL) { if (slave_dcb != NULL) {
CHK_DCB(slave_dcb); CHK_DCB(slave_dcb);
slave_dcb->func.close(slave_dcb); slave_dcb->func.close(slave_dcb);
} }
if (master_dcb != NULL) { if (master_dcb != NULL) {
master_dcb->func.close(master_dcb); master_dcb->func.close(master_dcb);
@ -498,8 +499,8 @@ static void freeSession(
int i; int i;
router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session; router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session;
router = (ROUTER_INSTANCE *)router_instance; router = (ROUTER_INSTANCE *)router_instance;
atomic_add(&router_cli_ses->rses_backend[BE_SLAVE]->backend_conn_count, -1); atomic_add(&router_cli_ses->rses_backend[BE_SLAVE]->backend_conn_count, -1);
atomic_add(&router_cli_ses->rses_backend[BE_MASTER]->backend_conn_count, -1); atomic_add(&router_cli_ses->rses_backend[BE_MASTER]->backend_conn_count, -1);
@ -567,15 +568,15 @@ static int routeQuery(
void* router_session, void* router_session,
GWBUF* querybuf) GWBUF* querybuf)
{ {
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
GWBUF* plainsqlbuf = NULL; GWBUF* plainsqlbuf = NULL;
char* querystr = NULL; char* querystr = NULL;
char* startpos; char* startpos;
unsigned char packet_type; unsigned char packet_type;
uint8_t* packet; uint8_t* packet;
int ret = 0; int ret = 0;
DCB* master_dcb = NULL; DCB* master_dcb = NULL;
DCB* slave_dcb = NULL; DCB* slave_dcb = NULL;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
bool rses_is_closed; bool rses_is_closed;
@ -618,16 +619,16 @@ static int routeQuery(
"Error: Failed to route %s:%s:\"%s\" to " "Error: Failed to route %s:%s:\"%s\" to "
"backend server. %s.", "backend server. %s.",
STRPACKETTYPE(packet_type), STRPACKETTYPE(packet_type),
STRQTYPE(qtype), STRQTYPE(qtype),
(querystr == NULL ? "(empty)" : querystr), (querystr == NULL ? "(empty)" : querystr),
(rses_is_closed ? "Router was closed" : (rses_is_closed ? "Router was closed" :
"Router has no backend servers where to " "Router has no backend servers where to "
"route to")))); "route to"))));
goto return_ret; goto return_ret;
} }
inst->stats.n_queries++; inst->stats.n_queries++;
startpos = (char *)&packet[5]; startpos = (char *)&packet[5];
switch(packet_type) { switch(packet_type) {
case COM_QUIT: /**< 1 QUIT will close all sessions */ case COM_QUIT: /**< 1 QUIT will close all sessions */
case COM_INIT_DB: /**< 2 DDL must go to the master */ case COM_INIT_DB: /**< 2 DDL must go to the master */
@ -656,7 +657,7 @@ static int routeQuery(
querystr = master_dcb->func.getquerystr( querystr = master_dcb->func.getquerystr(
(void *) gwbuf_clone(querybuf), (void *) gwbuf_clone(querybuf),
&querystr_is_copy); &querystr_is_copy);
*/ */
qtype = skygw_query_classifier_get_type(querystr, 0); qtype = skygw_query_classifier_get_type(querystr, 0);
break; break;
@ -683,88 +684,94 @@ static int routeQuery(
* transaction becomes active and master gets all statements until * transaction becomes active and master gets all statements until
* transaction is committed and autocommit is enabled again. * transaction is committed and autocommit is enabled again.
*/ */
if (autocommit_enabled && if (router_cli_ses->rses_autocommit_enabled &&
QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT)) QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT))
{ {
autocommit_enabled = false; router_cli_ses->rses_autocommit_enabled = false;
if (!transaction_active) if (!router_cli_ses->rses_transaction_active)
{ {
transaction_active = true; router_cli_ses->rses_transaction_active = true;
} }
} }
else if (!transaction_active && else if (!router_cli_ses->rses_transaction_active &&
QUERY_IS_TYPE(qtype, QUERY_TYPE_BEGIN_TRX)) QUERY_IS_TYPE(qtype, QUERY_TYPE_BEGIN_TRX))
{ {
transaction_active = true; router_cli_ses->rses_transaction_active = true;
} }
/** /**
* Explicit COMMIT and ROLLBACK, implicit COMMIT. * Explicit COMMIT and ROLLBACK, implicit COMMIT.
*/ */
if (autocommit_enabled && if (router_cli_ses->rses_autocommit_enabled &&
transaction_active && router_cli_ses->rses_transaction_active &&
(QUERY_IS_TYPE(qtype,QUERY_TYPE_COMMIT) || (QUERY_IS_TYPE(qtype,QUERY_TYPE_COMMIT) ||
QUERY_IS_TYPE(qtype,QUERY_TYPE_ROLLBACK))) QUERY_IS_TYPE(qtype,QUERY_TYPE_ROLLBACK)))
{ {
transaction_active = false; router_cli_ses->rses_transaction_active = false;
} }
else if (!autocommit_enabled && else if (!router_cli_ses->rses_autocommit_enabled &&
QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT)) QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT))
{ {
autocommit_enabled = true; router_cli_ses->rses_autocommit_enabled = true;
transaction_active = false; router_cli_ses->rses_transaction_active = false;
} }
/** /**
* Session update is always routed in the same way. * Session update is always routed in the same way.
*/ */
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE)) if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE))
{ {
if (route_session_write( bool succp = route_session_write(
router_cli_ses, router_cli_ses,
querybuf, querybuf,
inst, inst,
packet_type, packet_type,
qtype)) qtype);
if (succp)
{ {
ret = 1; ret = 1;
} }
else ss_dassert(succp);
{ ss_dassert(ret == 1);
ret = 0;
}
goto return_ret; goto return_ret;
} }
else if (transaction_active) /*< all to master */ else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) &&
!router_cli_ses->rses_transaction_active)
{ {
LOGIF(LT, (skygw_log_write( bool succp;
LOGFILE_TRACE,
"Transaction is active, routing to Master.")));
ret = master_dcb->func.write(master_dcb, querybuf);
atomic_add(&inst->stats.n_master, 1);
goto return_ret;
}
else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ))
{
LOGIF(LT, (skygw_log_write( LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE, LOGFILE_TRACE,
"Read-only query, routing to Slave."))); "Read-only query, routing to Slave.")));
ss_dassert(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ)); ss_dassert(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ));
ret = slave_dcb->func.write(slave_dcb, querybuf); ret = slave_dcb->func.write(slave_dcb, querybuf);
atomic_add(&inst->stats.n_slave, 1); atomic_add(&inst->stats.n_slave, 1);
goto return_ret; goto return_ret;
} }
else else
{ {
LOGIF(LT, (skygw_log_write( bool succp = true;
LOGFILE_TRACE,
"Begin transaction, write or unspecified type, " if (LOG_IS_ENABLED(LOGFILE_TRACE))
"routing to Master."))); {
if (router_cli_ses->rses_transaction_active) /*< all to master */
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Transaction is active, routing to Master.")));
}
else
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Begin transaction, write or unspecified type, "
"routing to Master.")));
}
}
ret = master_dcb->func.write(master_dcb, querybuf); ret = master_dcb->func.write(master_dcb, querybuf);
atomic_add(&inst->stats.n_master, 1); atomic_add(&inst->stats.n_master, 1);
goto return_ret; goto return_ret;
} }
@ -934,8 +941,7 @@ static void clientReply(
client_dcb = backend_dcb->session->client; client_dcb = backend_dcb->session->client;
/** Unlock */ /** Unlock */
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
/** /**
* 1. Check if backend received reply to sescmd. * 1. Check if backend received reply to sescmd.
* 2. Check sescmd's state whether OK_PACKET has been * 2. Check sescmd's state whether OK_PACKET has been
@ -955,14 +961,14 @@ static void clientReply(
} }
if (backend_dcb == master_dcb) if (backend_dcb == master_dcb)
{ {
be_type = BE_MASTER; be_type = BE_MASTER;
} }
else if (backend_dcb == slave_dcb) else if (backend_dcb == slave_dcb)
{ {
be_type = BE_SLAVE; be_type = BE_SLAVE;
} }
LOGIF(LT, tracelog_routed_query(router_cli_ses, LOGIF(LT, tracelog_routed_query(router_cli_ses,
"reply_by_statement", "reply_by_statement",
backend_dcb, backend_dcb,
gwbuf_clone(writebuf))); gwbuf_clone(writebuf)));
@ -983,8 +989,7 @@ static void clientReply(
{ {
writebuf = sescmd_cursor_process_replies(client_dcb, writebuf = sescmd_cursor_process_replies(client_dcb,
writebuf, writebuf,
scur); scur);
} }
/** Unlock router session */ /** Unlock router session */
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
@ -1021,7 +1026,7 @@ lock_failed:
* *
* @param inst - in, use * @param inst - in, use
* Pointer to router instance * Pointer to router instance
* *
* @return true, if all what what requested found, false if the request * @return true, if all what what requested found, false if the request
* was not satisfied or was partially satisfied. * was not satisfied or was partially satisfied.
* *
@ -1032,12 +1037,12 @@ lock_failed:
static bool search_backend_servers( static bool search_backend_servers(
BACKEND** p_master, BACKEND** p_master,
BACKEND** p_slave, BACKEND** p_slave,
ROUTER_INSTANCE* router) ROUTER_INSTANCE* router)
{ {
BACKEND* local_backend[BE_COUNT] = {NULL,NULL}; BACKEND* local_backend[BE_COUNT] = {NULL,NULL};
int i; int i;
bool succp = true; bool succp = true;
/* /*
* Loop over all the servers and find any that have fewer connections * Loop over all the servers and find any that have fewer connections
* than current candidate server. * than current candidate server.
@ -1052,32 +1057,32 @@ static bool search_backend_servers(
* very low load. * very low load.
* *
* If master is searched for, the first master found is chosen. * If master is searched for, the first master found is chosen.
*/ */
for (i = 0; router->servers[i] != NULL; i++) { for (i = 0; router->servers[i] != NULL; i++) {
BACKEND* be = router->servers[i]; BACKEND* be = router->servers[i];
if (be != NULL) { if (be != NULL) {
LOGIF(LT, (skygw_log_write( LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [search_backend_servers] Examine server " "%lu [search_backend_servers] Examine server "
"%s:%d with %d connections. Status is %d, " "%s:%d with %d connections. Status is %d, "
"router->bitvalue is %d", "router->bitvalue is %d",
pthread_self(), pthread_self(),
be->backend_server->name, be->backend_server->name,
be->backend_server->port, be->backend_server->port,
be->backend_conn_count, be->backend_conn_count,
be->backend_server->status, be->backend_server->status,
router->bitmask))); router->bitmask)));
} }
if (be != NULL && if (be != NULL &&
SERVER_IS_RUNNING(be->backend_server) && SERVER_IS_RUNNING(be->backend_server) &&
(be->backend_server->status & router->bitmask) == (be->backend_server->status & router->bitmask) ==
router->bitvalue) router->bitvalue)
{ {
if (SERVER_IS_SLAVE(be->backend_server) && if (SERVER_IS_SLAVE(be->backend_server) &&
p_slave != NULL) p_slave != NULL)
{ {
/** /**
* If no candidate set, set first running * If no candidate set, set first running
* server as an initial candidate server. * server as an initial candidate server.
@ -1085,7 +1090,7 @@ static bool search_backend_servers(
if (local_backend[BE_SLAVE] == NULL) if (local_backend[BE_SLAVE] == NULL)
{ {
local_backend[BE_SLAVE] = be; local_backend[BE_SLAVE] = be;
} }
else if (be->backend_conn_count < else if (be->backend_conn_count <
local_backend[BE_SLAVE]->backend_conn_count) local_backend[BE_SLAVE]->backend_conn_count)
{ {
@ -1101,74 +1106,74 @@ static bool search_backend_servers(
be->backend_server->stats.n_connections < be->backend_server->stats.n_connections <
local_backend[BE_SLAVE]->backend_server->stats.n_connections) local_backend[BE_SLAVE]->backend_server->stats.n_connections)
{ {
/** /**
* This running server has the same * This running server has the same
* number of connections currently * number of connections currently
* as the candidate but has had * as the candidate but has had
* fewer connections over time * fewer connections over time
* than candidate, set this server * than candidate, set this server
* to candidate. * to candidate.
*/ */
local_backend[BE_SLAVE] = be; local_backend[BE_SLAVE] = be;
} }
} }
else if (p_master != NULL && else if (p_master != NULL &&
local_backend[BE_MASTER] == NULL && local_backend[BE_MASTER] == NULL &&
SERVER_IS_MASTER(be->backend_server)) SERVER_IS_MASTER(be->backend_server))
{ {
local_backend[BE_MASTER] = be; local_backend[BE_MASTER] = be;
} }
else if (p_master != NULL && else if (p_master != NULL &&
local_backend[BE_JOINED] == NULL && local_backend[BE_JOINED] == NULL &&
SERVER_IS_JOINED(be->backend_server)) SERVER_IS_JOINED(be->backend_server))
{ {
local_backend[BE_JOINED] = be; local_backend[BE_JOINED] = be;
} }
} }
} }
if (router->bitvalue != 0 && if (router->bitvalue != 0 &&
p_master != NULL && p_master != NULL &&
local_backend[BE_JOINED] == NULL) local_backend[BE_JOINED] == NULL)
{ {
succp = false; succp = false;
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Couldn't find a Joined Galera node from %d " "Error : Couldn't find a Joined Galera node from %d "
"candidates.", "candidates.",
i))); i)));
goto return_succp; goto return_succp;
} }
if (p_slave != NULL && local_backend[BE_SLAVE] == NULL) { if (p_slave != NULL && local_backend[BE_SLAVE] == NULL) {
succp = false; succp = false;
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Couldn't find suitable Slave from %d " "Error : Couldn't find suitable Slave from %d "
"candidates.", "candidates.",
i))); i)));
} }
if (p_master != NULL && local_backend[BE_MASTER] == NULL) { if (p_master != NULL && local_backend[BE_MASTER] == NULL) {
succp = false; succp = false;
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Couldn't find suitable Master from %d " "Error : Couldn't find suitable Master from %d "
"candidates.", "candidates.",
i))); i)));
} }
if (local_backend[BE_SLAVE] != NULL) { if (local_backend[BE_SLAVE] != NULL) {
*p_slave = local_backend[BE_SLAVE]; *p_slave = local_backend[BE_SLAVE];
LOGIF(LT, (skygw_log_write( LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [readwritesplit:search_backend_servers] Selected " "%lu [readwritesplit:search_backend_servers] Selected "
"Slave %s:%d from %d candidates.", "Slave %s:%d from %d candidates.",
pthread_self(), pthread_self(),
local_backend[BE_SLAVE]->backend_server->name, local_backend[BE_SLAVE]->backend_server->name,
local_backend[BE_SLAVE]->backend_server->port, local_backend[BE_SLAVE]->backend_server->port,
i))); i)));
} }
if (local_backend[BE_MASTER] != NULL) { if (local_backend[BE_MASTER] != NULL) {
*p_master = local_backend[BE_MASTER]; *p_master = local_backend[BE_MASTER];
LOGIF(LT, (skygw_log_write( LOGIF(LT, (skygw_log_write(
@ -1528,7 +1533,7 @@ static bool execute_sescmd_in_backend(
sescmd_cursor_t* scur; sescmd_cursor_t* scur;
dcb = rses->rses_dcb[be_type]; dcb = rses->rses_dcb[be_type];
CHK_DCB(dcb); CHK_DCB(dcb);
CHK_CLIENT_RSES(rses); CHK_CLIENT_RSES(rses);
ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock));
@ -1563,7 +1568,7 @@ static bool execute_sescmd_in_backend(
dcb->session, dcb->session,
sescmd_cursor_clone_querybuf(scur)); sescmd_cursor_clone_querybuf(scur));
break; break;
case COM_QUIT: case COM_QUIT:
case COM_QUERY: case COM_QUERY:
case COM_INIT_DB: case COM_INIT_DB:
@ -1588,6 +1593,7 @@ return_succp:
return succp; return succp;
} }
/** /**
* Moves cursor to next property and copied address of its sescmd to cursor. * Moves cursor to next property and copied address of its sescmd to cursor.
* Current propery must be non-null. * Current propery must be non-null.
@ -1665,7 +1671,6 @@ static rses_property_t* mysql_sescmd_get_property(
return scmd->my_sescmd_prop; return scmd->my_sescmd_prop;
} }
static void tracelog_routed_query( static void tracelog_routed_query(
ROUTER_CLIENT_SES* rses, ROUTER_CLIENT_SES* rses,
char* funcname, char* funcname,
@ -1679,7 +1684,7 @@ static void tracelog_routed_query(
char* querystr; char* querystr;
char* startpos = (char *)&packet[5]; char* startpos = (char *)&packet[5];
backend_type_t be_type; backend_type_t be_type;
if (rses->rses_dcb[BE_MASTER] == dcb) if (rses->rses_dcb[BE_MASTER] == dcb)
{ {
be_type = BE_MASTER; be_type = BE_MASTER;
@ -1722,11 +1727,13 @@ static void tracelog_routed_query(
-1)), -1)),
STRBETYPE(be_type), STRBETYPE(be_type),
dcb))); dcb)));
free(querystr);
} }
} }
gwbuf_free(buf); gwbuf_free(buf);
} }
/** /**
* Return rc, rc < 0 if router session is closed. rc == 0 if there are no * Return rc, rc < 0 if router session is closed. rc == 0 if there are no
* capabilities specified, rc > 0 when there are capabilities. * capabilities specified, rc > 0 when there are capabilities.
@ -1774,7 +1781,7 @@ static bool route_session_write(
DCB* master_dcb; DCB* master_dcb;
DCB* slave_dcb; DCB* slave_dcb;
rses_property_t* prop; rses_property_t* prop;
master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; master_dcb = router_cli_ses->rses_dcb[BE_MASTER];
slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE];
CHK_DCB(master_dcb); CHK_DCB(master_dcb);
@ -1795,12 +1802,12 @@ static bool route_session_write(
{ {
int rc; int rc;
int rc2; int rc2;
rc = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf)); rc = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf));
rc2 = slave_dcb->func.write(slave_dcb, querybuf); rc2 = slave_dcb->func.write(slave_dcb, querybuf);
if (rc == 1 && rc == rc2) if (rc == 1 && rc == rc2)
{ {
succp = true; succp = true;
} }
goto return_succp; goto return_succp;
@ -1822,16 +1829,16 @@ static bool route_session_write(
} }
/** Add sescmd property to router client session */ /** Add sescmd property to router client session */
rses_property_add(router_cli_ses, prop); rses_property_add(router_cli_ses, prop);
/** Execute session command in master */ /** Execute session command in master */
succp = execute_sescmd_in_backend(router_cli_ses, BE_MASTER); succp = execute_sescmd_in_backend(router_cli_ses, BE_MASTER);
if (!succp) if (!succp)
{ {
/** Unlock router session */ /** Unlock router session */
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
goto return_succp; goto return_succp;
} }
/** Execute session command in slave */ /** Execute session command in slave */
succp = execute_sescmd_in_backend(router_cli_ses, BE_SLAVE); succp = execute_sescmd_in_backend(router_cli_ses, BE_SLAVE);
if (!succp) if (!succp)
@ -1839,7 +1846,7 @@ static bool route_session_write(
/** Unlock router session */ /** Unlock router session */
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
goto return_succp; goto return_succp;
} }
/** Unlock router session */ /** Unlock router session */
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
@ -1847,4 +1854,5 @@ static bool route_session_write(
return_succp: return_succp:
return succp; return succp;
} }