dcb.c, gateway.c little tuning.
poll.c Removed mutex from epoll_wait. Removed read and write mutexes from poll_waitevents. session.c If session_alloc fails, instead of calling directly free(session), call session_free, which decreases refcounter and only frees session when there are no references left. Added session_unlink_dcb function which removes link from session and optionally sets the dcb->session pointer to NULL. readconnection.h, readwritesplit.h Added check fields to ROUTER_CLIENT_SES strct as well as lock, version number (not used yet) and closed flag. mysql_backend.c gw_read_backend_event: if backend_protocol->state was set to MYSQL_AUTH_RECV, function returned, which was unnecessary. If mysql state became MYSQL_AUTH_FAILED, router client session was closed. Removed unnecessary NULL checks because rsession is not allowed to be NULL. Similarly, removed other NULL checks and replaced them with asserts checking that router client session is not NULL at any phase. mysql_client.c Removed unused code blocks. Polished log commands. Replaced router client sessions NULL checks with asserts. mysql_common.c mysql_send_custom_error: if called with dcb == NULL, return. readconnroute.c Replaced malloc with calloc. Added functions rses_begin_router_action and rses_exit_router_action. If router client session is not closed, they take a lock and release it, respectively. Those functions are used for protecting all operations which modify the contents of router client session struct. readwritesplit.c Identical changes than in readconnroute.c skygw_debug.h Added check number and - macro for ROUTER_CLIENT_SES, and added COM_QUIT to STRPACKETTYPE.
This commit is contained in:
@ -117,6 +117,12 @@ static ROUTER_OBJECT MyObject = {
|
||||
errorReply
|
||||
};
|
||||
|
||||
static bool rses_begin_router_action(
|
||||
ROUTER_CLIENT_SES* rses);
|
||||
|
||||
static void rses_exit_router_action(
|
||||
ROUTER_CLIENT_SES* rses);
|
||||
|
||||
static SPINLOCK instlock;
|
||||
static ROUTER_INSTANCE *instances;
|
||||
|
||||
@ -175,14 +181,12 @@ ROUTER_INSTANCE *inst;
|
||||
SERVER *server;
|
||||
int i, n;
|
||||
|
||||
if ((inst = malloc(sizeof(ROUTER_INSTANCE))) == NULL)
|
||||
return NULL;
|
||||
|
||||
memset(&inst->stats, 0, sizeof(ROUTER_STATS));
|
||||
if ((inst = calloc(1, sizeof(ROUTER_INSTANCE))) == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
inst->service = service;
|
||||
spinlock_init(&inst->lock);
|
||||
inst->connections = NULL;
|
||||
|
||||
/*
|
||||
* We need an array of the backend servers in the instance structure so
|
||||
@ -273,7 +277,7 @@ static void *
|
||||
newSession(ROUTER *instance, SESSION *session)
|
||||
{
|
||||
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance;
|
||||
ROUTER_CLIENT_SES *client_ses;
|
||||
ROUTER_CLIENT_SES *client_rses;
|
||||
BACKEND *candidate = NULL;
|
||||
int i;
|
||||
|
||||
@ -286,28 +290,34 @@ int i;
|
||||
inst);
|
||||
|
||||
|
||||
client_ses = (ROUTER_CLIENT_SES *)malloc(sizeof(ROUTER_CLIENT_SES));
|
||||
client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
|
||||
|
||||
if (client_ses == NULL) {
|
||||
if (client_rses == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
/*
|
||||
|
||||
#if defined(SS_DEBUG)
|
||||
client_rses->rses_chk_top = CHK_NUM_ROUTER_SES;
|
||||
client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES;
|
||||
#endif
|
||||
|
||||
/**
|
||||
* Find a backend server to connect to. This is the extent of the
|
||||
* load balancing algorithm we need to implement for this simple
|
||||
* connection router.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Loop over all the servers and find any that have fewer connections than our
|
||||
* candidate server.
|
||||
* Loop over all the servers and find any that have fewer connections
|
||||
* than the candidate server.
|
||||
*
|
||||
* If a server has less connections than the current candidate we mark this
|
||||
* as the new candidate to connect to.
|
||||
*
|
||||
* If a server has the same number of connections currently as the candidate
|
||||
* and has had less connections over time than the candidate it will also
|
||||
* become the new candidate. This has the effect of spreading the connections
|
||||
* over different servers during periods of very low load.
|
||||
* become the new candidate. This has the effect of spreading the
|
||||
* connections over different servers during periods of very low load.
|
||||
*/
|
||||
for (i = 0; inst->servers[i]; i++) {
|
||||
if(inst->servers[i]) {
|
||||
@ -325,7 +335,8 @@ int i;
|
||||
|
||||
if (inst->servers[i] &&
|
||||
SERVER_IS_RUNNING(inst->servers[i]->server) &&
|
||||
(inst->servers[i]->server->status & inst->bitmask) == inst->bitvalue)
|
||||
(inst->servers[i]->server->status & inst->bitmask) ==
|
||||
inst->bitvalue)
|
||||
{
|
||||
/* If no candidate set, set first running server as
|
||||
our initial candidate server */
|
||||
@ -361,7 +372,7 @@ int i;
|
||||
"Error : Failed to create new routing session. "
|
||||
"Couldn't find eligible candidate server. Freeing "
|
||||
"allocated resources.");
|
||||
free(client_ses);
|
||||
free(client_rses);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -370,7 +381,7 @@ int i;
|
||||
* Bump the connection count for this server
|
||||
*/
|
||||
atomic_add(&candidate->current_connection_count, 1);
|
||||
client_ses->backend = candidate;
|
||||
client_rses->backend = candidate;
|
||||
skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"%lu [newSession] Selected server in port %d. "
|
||||
@ -380,31 +391,30 @@ int i;
|
||||
candidate->current_connection_count);
|
||||
/*
|
||||
* Open a backend connection, putting the DCB for this
|
||||
* connection in the client_ses->backend_dcb
|
||||
* connection in the client_rses->backend_dcb
|
||||
*/
|
||||
client_ses->backend_dcb = dcb_connect(candidate->server,
|
||||
client_rses->backend_dcb = dcb_connect(candidate->server,
|
||||
session,
|
||||
candidate->server->protocol);
|
||||
if (client_ses->backend_dcb == NULL)
|
||||
if (client_rses->backend_dcb == NULL)
|
||||
{
|
||||
atomic_add(&candidate->current_connection_count, -1);
|
||||
skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Failed to create new routing session. "
|
||||
"Couldn't establish connection to candidate server "
|
||||
"listening to port %d. Freeing allocated resources.",
|
||||
candidate->server->port);
|
||||
free(client_ses);
|
||||
free(client_rses);
|
||||
return NULL;
|
||||
}
|
||||
inst->stats.n_sessions++;
|
||||
|
||||
/* Add this session to the list of active sessions */
|
||||
/**
|
||||
* Add this session to the list of active sessions.
|
||||
*/
|
||||
spinlock_acquire(&inst->lock);
|
||||
client_ses->next = inst->connections;
|
||||
inst->connections = client_ses;
|
||||
client_rses->next = inst->connections;
|
||||
inst->connections = client_rses;
|
||||
spinlock_release(&inst->lock);
|
||||
return (void *)client_ses;
|
||||
|
||||
CHK_CLIENT_RSES(client_rses);
|
||||
|
||||
return (void *)client_rses;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -478,14 +488,29 @@ static void freeSession(
|
||||
static void
|
||||
closeSession(ROUTER *instance, void *router_session)
|
||||
{
|
||||
ROUTER_INSTANCE *router_inst = (ROUTER_INSTANCE *)instance;
|
||||
ROUTER_CLIENT_SES *router_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||
bool succp = false;
|
||||
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||
DCB* backend_dcb;
|
||||
|
||||
/*
|
||||
* Close the connection to the backend
|
||||
*/
|
||||
router_ses->backend_dcb->func.close(router_ses->backend_dcb);
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
/**
|
||||
* Lock router client session for secure read and update.
|
||||
*/
|
||||
if (rses_begin_router_action(router_cli_ses))
|
||||
{
|
||||
backend_dcb = router_cli_ses->backend_dcb;
|
||||
router_cli_ses->backend_dcb = NULL;
|
||||
router_cli_ses->rses_closed = true;
|
||||
/** Unlock */
|
||||
rses_exit_router_action(router_cli_ses);
|
||||
|
||||
/**
|
||||
* Close the backend server connection
|
||||
*/
|
||||
if (backend_dcb != NULL) {
|
||||
CHK_DCB(backend_dcb);
|
||||
backend_dcb->func.close(backend_dcb);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -502,39 +527,69 @@ static int
|
||||
routeQuery(ROUTER *instance, void *router_session, GWBUF *queue)
|
||||
{
|
||||
ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance;
|
||||
ROUTER_CLIENT_SES *rsession = (ROUTER_CLIENT_SES *)router_session;
|
||||
ROUTER_CLIENT_SES *router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||
uint8_t *payload = GWBUF_DATA(queue);
|
||||
int mysql_command;
|
||||
int rc;
|
||||
|
||||
DCB* backend_dcb;
|
||||
bool rses_is_closed;
|
||||
|
||||
inst->stats.n_queries++;
|
||||
mysql_command = MYSQL_GET_COMMAND(payload);
|
||||
|
||||
/** Dirty read for quick check if router is closed. */
|
||||
if (router_cli_ses->rses_closed)
|
||||
{
|
||||
rses_is_closed = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
/**
|
||||
* Lock router client session for secure read of DCBs
|
||||
*/
|
||||
rses_is_closed = !(rses_begin_router_action(router_cli_ses));
|
||||
}
|
||||
|
||||
if (!rses_is_closed)
|
||||
{
|
||||
backend_dcb = router_cli_ses->backend_dcb;
|
||||
/** unlock */
|
||||
rses_exit_router_action(router_cli_ses);
|
||||
}
|
||||
|
||||
if (rses_is_closed || backend_dcb == NULL)
|
||||
{
|
||||
skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"Error: Failed to route MySQL command %d to backend "
|
||||
"server.",
|
||||
mysql_command);
|
||||
goto return_rc;
|
||||
}
|
||||
|
||||
switch(mysql_command) {
|
||||
case MYSQL_COM_CHANGE_USER:
|
||||
rc = rsession->backend_dcb->func.auth(
|
||||
rsession->backend_dcb,
|
||||
rc = backend_dcb->func.auth(
|
||||
backend_dcb,
|
||||
NULL,
|
||||
rsession->backend_dcb->session,
|
||||
backend_dcb->session,
|
||||
queue);
|
||||
|
||||
break;
|
||||
default:
|
||||
rc = rsession->backend_dcb->func.write(
|
||||
rsession->backend_dcb,
|
||||
queue);
|
||||
rc = backend_dcb->func.write(backend_dcb, queue);
|
||||
break;
|
||||
}
|
||||
|
||||
CHK_PROTOCOL(((MySQLProtocol*)rsession->backend_dcb->protocol));
|
||||
|
||||
CHK_PROTOCOL(((MySQLProtocol*)backend_dcb->protocol));
|
||||
skygw_log_write(
|
||||
LOGFILE_DEBUG,
|
||||
"%lu [readconnroute:routeQuery] Routed command %d to dcb %p "
|
||||
"with return value %d.",
|
||||
pthread_self(),
|
||||
mysql_command,
|
||||
rsession->backend_dcb,
|
||||
backend_dcb,
|
||||
rc);
|
||||
|
||||
return_rc:
|
||||
return rc;
|
||||
}
|
||||
|
||||
@ -621,3 +676,62 @@ errorReply(
|
||||
|
||||
ss_dassert(client != NULL);
|
||||
}
|
||||
|
||||
/** to be inline'd */
|
||||
/**
|
||||
* @node Acquires lock to router client session if it is not closed.
|
||||
*
|
||||
* Parameters:
|
||||
* @param rses - in, use
|
||||
*
|
||||
*
|
||||
* @return true if router session was not closed. If return value is true
|
||||
* it means that router is locked, and must be unlocked later. False, if
|
||||
* router was closed before lock was acquired.
|
||||
*
|
||||
*
|
||||
* @details (write detailed description here)
|
||||
*
|
||||
*/
|
||||
static bool rses_begin_router_action(
|
||||
ROUTER_CLIENT_SES* rses)
|
||||
{
|
||||
bool succp = false;
|
||||
|
||||
CHK_CLIENT_RSES(rses);
|
||||
|
||||
if (rses->rses_closed) {
|
||||
goto return_succp;
|
||||
}
|
||||
spinlock_acquire(&rses->rses_lock);
|
||||
if (rses->rses_closed) {
|
||||
spinlock_release(&rses->rses_lock);
|
||||
goto return_succp;
|
||||
}
|
||||
succp = true;
|
||||
|
||||
return_succp:
|
||||
return succp;
|
||||
}
|
||||
|
||||
/** to be inline'd */
|
||||
/**
|
||||
* @node Releases router client session lock.
|
||||
*
|
||||
* Parameters:
|
||||
* @param rses - <usage>
|
||||
* <description>
|
||||
*
|
||||
* @return void
|
||||
*
|
||||
*
|
||||
* @details (write detailed description here)
|
||||
*
|
||||
*/
|
||||
static void rses_exit_router_action(
|
||||
ROUTER_CLIENT_SES* rses)
|
||||
{
|
||||
CHK_CLIENT_RSES(rses);
|
||||
ss_dassert(((SPINLOCK)rses->rses_lock).lock == 1);
|
||||
spinlock_release(&rses->rses_lock);
|
||||
}
|
||||
|
@ -80,6 +80,11 @@ static ROUTER_OBJECT MyObject = {
|
||||
clientReply,
|
||||
NULL
|
||||
};
|
||||
static bool rses_begin_router_action(
|
||||
ROUTER_CLIENT_SES* rses);
|
||||
|
||||
static void rses_exit_router_action(
|
||||
ROUTER_CLIENT_SES* rses);
|
||||
|
||||
static SPINLOCK instlock;
|
||||
static ROUTER_INSTANCE* instances;
|
||||
@ -145,7 +150,6 @@ static ROUTER* createInstance(
|
||||
}
|
||||
router->service = service;
|
||||
spinlock_init(&router->lock);
|
||||
router->connections = NULL;
|
||||
|
||||
/** Calculate number of servers */
|
||||
server = service->databases;
|
||||
@ -268,12 +272,17 @@ static void* newSession(
|
||||
bool succp;
|
||||
|
||||
client_rses =
|
||||
(ROUTER_CLIENT_SES *)malloc(sizeof(ROUTER_CLIENT_SES));
|
||||
(ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
|
||||
|
||||
if (client_rses == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
ss_dassert(false);
|
||||
return NULL;
|
||||
}
|
||||
#if defined(SS_DEBUG)
|
||||
client_rses->rses_chk_top = CHK_NUM_ROUTER_SES;
|
||||
client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES;
|
||||
#endif
|
||||
/**
|
||||
* Find a backend server to connect to. This is the extent of the
|
||||
* load balancing algorithm we need to implement for this simple
|
||||
@ -294,6 +303,7 @@ static void* newSession(
|
||||
be_slave->backend_server->protocol);
|
||||
|
||||
if (client_rses->slave_dcb == NULL) {
|
||||
ss_dassert(session->refcount == 1);
|
||||
free(client_rses);
|
||||
return NULL;
|
||||
}
|
||||
@ -322,6 +332,11 @@ static void* newSession(
|
||||
client_rses->be_master = be_master;
|
||||
router->stats.n_sessions += 1;
|
||||
|
||||
/**
|
||||
* Version is bigger than zero once initialized.
|
||||
*/
|
||||
atomic_add(&client_rses->rses_versno, 2);
|
||||
ss_dassert(client_rses->rses_versno == 2);
|
||||
/**
|
||||
* Add this session to end of the list of active sessions in router.
|
||||
*/
|
||||
@ -330,6 +345,8 @@ static void* newSession(
|
||||
router->connections = client_rses;
|
||||
spinlock_release(&router->lock);
|
||||
|
||||
CHK_CLIENT_RSES(client_rses);
|
||||
|
||||
return (void *)client_rses;
|
||||
}
|
||||
|
||||
@ -345,13 +362,38 @@ static void closeSession(
|
||||
void* router_session)
|
||||
{
|
||||
ROUTER_CLIENT_SES* router_cli_ses;
|
||||
DCB* slave_dcb;
|
||||
DCB* master_dcb;
|
||||
|
||||
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
/**
|
||||
* Close the connection to the backend servers
|
||||
* Lock router client session for secure read and update.
|
||||
*/
|
||||
router_cli_ses->slave_dcb->func.close(router_cli_ses->slave_dcb);
|
||||
router_cli_ses->master_dcb->func.close(router_cli_ses->master_dcb);
|
||||
if (rses_begin_router_action(router_cli_ses))
|
||||
{
|
||||
slave_dcb = router_cli_ses->slave_dcb;
|
||||
router_cli_ses->slave_dcb = NULL;
|
||||
master_dcb = router_cli_ses->master_dcb;
|
||||
router_cli_ses->master_dcb = NULL;
|
||||
|
||||
router_cli_ses->rses_closed = true;
|
||||
/** Unlock */
|
||||
rses_exit_router_action(router_cli_ses);
|
||||
|
||||
/**
|
||||
* Close the backend server connections
|
||||
*/
|
||||
if (slave_dcb != NULL) {
|
||||
CHK_DCB(slave_dcb);
|
||||
slave_dcb->func.close(slave_dcb);
|
||||
}
|
||||
|
||||
if (master_dcb != NULL) {
|
||||
master_dcb->func.close(master_dcb);
|
||||
CHK_DCB(master_dcb);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void freeSession(
|
||||
@ -415,27 +457,33 @@ static void freeSession(
|
||||
static int routeQuery(
|
||||
ROUTER* instance,
|
||||
void* router_session,
|
||||
GWBUF* queue)
|
||||
GWBUF* querybuf)
|
||||
{
|
||||
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
|
||||
char* querystr = NULL;
|
||||
char* startpos;
|
||||
size_t len;
|
||||
unsigned char packet_type, *packet;
|
||||
unsigned char packet_type;
|
||||
unsigned char* packet;
|
||||
int ret = 0;
|
||||
GWBUF *cq = NULL;
|
||||
|
||||
DCB* master_dcb = NULL;
|
||||
DCB* slave_dcb = NULL;
|
||||
GWBUF* bufcopy = NULL;
|
||||
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
||||
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||
bool rses_is_closed;
|
||||
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
|
||||
inst->stats.n_queries++;
|
||||
|
||||
packet = GWBUF_DATA(queue);
|
||||
packet = GWBUF_DATA(querybuf);
|
||||
packet_type = packet[4];
|
||||
startpos = (char *)&packet[5];
|
||||
len = packet[0];
|
||||
len += 255*packet[1];
|
||||
len += 255*255*packet[2];
|
||||
|
||||
|
||||
switch(packet_type) {
|
||||
case COM_QUIT: /**< 1 QUIT will close all sessions */
|
||||
case COM_INIT_DB: /**< 2 DDL must go to the master */
|
||||
@ -469,6 +517,42 @@ static int routeQuery(
|
||||
default:
|
||||
break;
|
||||
} /**< switch by packet type */
|
||||
|
||||
/** Dirty read for quick check if router is closed. */
|
||||
if (router_cli_ses->rses_closed)
|
||||
{
|
||||
rses_is_closed = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
/**
|
||||
* Lock router client session for secure read of DCBs
|
||||
*/
|
||||
rses_is_closed = !(rses_begin_router_action(router_cli_ses));
|
||||
}
|
||||
|
||||
if (!rses_is_closed)
|
||||
{
|
||||
master_dcb = router_cli_ses->master_dcb;
|
||||
slave_dcb = router_cli_ses->slave_dcb;
|
||||
/** unlock */
|
||||
rses_exit_router_action(router_cli_ses);
|
||||
}
|
||||
|
||||
if (rses_is_closed || (master_dcb == NULL && slave_dcb == NULL))
|
||||
{
|
||||
skygw_log_write(
|
||||
LOGFILE_ERROR,
|
||||
"Error: Failed to route %s:%s:\"%s\" to backend server. "
|
||||
"%s.",
|
||||
STRPACKETTYPE(packet_type),
|
||||
STRQTYPE(qtype),
|
||||
querystr,
|
||||
(rses_is_closed ? "Router was closed" :
|
||||
"Router has no backend servers where to route to"));
|
||||
|
||||
goto return_ret;
|
||||
}
|
||||
|
||||
skygw_log_write(LOGFILE_TRACE, "String\t\"%s\"", querystr);
|
||||
skygw_log_write(LOGFILE_TRACE,
|
||||
@ -483,10 +567,9 @@ static int routeQuery(
|
||||
pthread_self(),
|
||||
STRQTYPE(qtype));
|
||||
|
||||
ret = router_cli_ses->master_dcb->func.write(
|
||||
router_cli_ses->master_dcb,
|
||||
queue);
|
||||
atomic_add(&inst->stats.n_master, 1);
|
||||
ret = master_dcb->func.write(master_dcb, querybuf);
|
||||
atomic_add(&inst->stats.n_master, 1);
|
||||
|
||||
goto return_ret;
|
||||
break;
|
||||
|
||||
@ -496,11 +579,10 @@ static int routeQuery(
|
||||
"routing to Slave.",
|
||||
pthread_self(),
|
||||
STRQTYPE(qtype));
|
||||
|
||||
ret = slave_dcb->func.write(slave_dcb, querybuf);
|
||||
atomic_add(&inst->stats.n_slave, 1);
|
||||
|
||||
ret = router_cli_ses->slave_dcb->func.write(
|
||||
router_cli_ses->slave_dcb,
|
||||
queue);
|
||||
atomic_add(&inst->stats.n_slave, 1);
|
||||
goto return_ret;
|
||||
break;
|
||||
|
||||
@ -516,36 +598,30 @@ static int routeQuery(
|
||||
* the command must be executed in them.
|
||||
*/
|
||||
|
||||
cq = gwbuf_clone(queue);
|
||||
bufcopy = gwbuf_clone(querybuf);
|
||||
|
||||
switch(packet_type) {
|
||||
case COM_QUIT:
|
||||
ret = router_cli_ses->master_dcb->func.write(
|
||||
router_cli_ses->master_dcb,
|
||||
queue);
|
||||
router_cli_ses->slave_dcb->func.write(
|
||||
router_cli_ses->slave_dcb,
|
||||
cq);
|
||||
ret = master_dcb->func.write(master_dcb, querybuf);
|
||||
slave_dcb->func.write(slave_dcb, bufcopy);
|
||||
break;
|
||||
|
||||
case COM_CHANGE_USER:
|
||||
router_cli_ses->master_dcb->func.auth(
|
||||
router_cli_ses->master_dcb,
|
||||
master_dcb->func.auth(
|
||||
master_dcb,
|
||||
NULL,
|
||||
router_cli_ses->master_dcb->session,
|
||||
queue);
|
||||
router_cli_ses->slave_dcb->func.auth(
|
||||
router_cli_ses->slave_dcb,
|
||||
master_dcb->session,
|
||||
querybuf);
|
||||
slave_dcb->func.auth(
|
||||
slave_dcb,
|
||||
NULL,
|
||||
router_cli_ses->master_dcb->session,
|
||||
cq);
|
||||
master_dcb->session,
|
||||
bufcopy);
|
||||
break;
|
||||
|
||||
default:
|
||||
ret = router_cli_ses->master_dcb->func.session(
|
||||
router_cli_ses->master_dcb,
|
||||
(void *)queue);
|
||||
router_cli_ses->slave_dcb->func.session(
|
||||
router_cli_ses->slave_dcb,
|
||||
(void *)cq);
|
||||
ret = master_dcb->func.session(master_dcb, (void *)querybuf);
|
||||
slave_dcb->func.session(slave_dcb, (void *)bufcopy);
|
||||
break;
|
||||
} /**< switch by packet type */
|
||||
|
||||
@ -560,11 +636,12 @@ static int routeQuery(
|
||||
pthread_self(),
|
||||
STRQTYPE(qtype));
|
||||
|
||||
/** Is this really ok? */
|
||||
ret = router_cli_ses->master_dcb->func.write(
|
||||
router_cli_ses->master_dcb,
|
||||
queue);
|
||||
atomic_add(&inst->stats.n_master, 1);
|
||||
/**
|
||||
* Is this really ok?
|
||||
* What is not known is routed to master.
|
||||
*/
|
||||
ret = master_dcb->func.write(master_dcb, querybuf);
|
||||
atomic_add(&inst->stats.n_master, 1);
|
||||
goto return_ret;
|
||||
break;
|
||||
} /**< switch by query type */
|
||||
@ -574,6 +651,67 @@ return_ret:
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
/** to be inline'd */
|
||||
/**
|
||||
* @node Acquires lock to router client session if it is not closed.
|
||||
*
|
||||
* Parameters:
|
||||
* @param rses - in, use
|
||||
*
|
||||
*
|
||||
* @return true if router session was not closed. If return value is true
|
||||
* it means that router is locked, and must be unlocked later. False, if
|
||||
* router was closed before lock was acquired.
|
||||
*
|
||||
*
|
||||
* @details (write detailed description here)
|
||||
*
|
||||
*/
|
||||
static bool rses_begin_router_action(
|
||||
ROUTER_CLIENT_SES* rses)
|
||||
{
|
||||
bool succp = false;
|
||||
|
||||
CHK_CLIENT_RSES(rses);
|
||||
|
||||
if (rses->rses_closed) {
|
||||
goto return_succp;
|
||||
}
|
||||
spinlock_acquire(&rses->rses_lock);
|
||||
if (rses->rses_closed) {
|
||||
spinlock_release(&rses->rses_lock);
|
||||
goto return_succp;
|
||||
}
|
||||
succp = true;
|
||||
|
||||
return_succp:
|
||||
return succp;
|
||||
}
|
||||
|
||||
/** to be inline'd */
|
||||
/**
|
||||
* @node Releases router client session lock.
|
||||
*
|
||||
* Parameters:
|
||||
* @param rses - <usage>
|
||||
* <description>
|
||||
*
|
||||
* @return void
|
||||
*
|
||||
*
|
||||
* @details (write detailed description here)
|
||||
*
|
||||
*/
|
||||
static void rses_exit_router_action(
|
||||
ROUTER_CLIENT_SES* rses)
|
||||
{
|
||||
CHK_CLIENT_RSES(rses);
|
||||
ss_dassert(((SPINLOCK)rses->rses_lock).lock == 1);
|
||||
spinlock_release(&rses->rses_lock);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Diagnostics routine
|
||||
*
|
||||
@ -631,7 +769,7 @@ int i = 0;
|
||||
static void clientReply(
|
||||
ROUTER* instance,
|
||||
void* router_session,
|
||||
GWBUF* queue,
|
||||
GWBUF* writebuf,
|
||||
DCB* backend_dcb)
|
||||
{
|
||||
DCB* master_dcb;
|
||||
@ -639,22 +777,40 @@ static void clientReply(
|
||||
ROUTER_CLIENT_SES* router_cli_ses;
|
||||
|
||||
router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||
ss_dassert(router_cli_ses != NULL);
|
||||
master_dcb = router_cli_ses->master_dcb;
|
||||
client_dcb = backend_dcb->session->client;
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
|
||||
if (backend_dcb->command == ROUTER_CHANGE_SESSION) {
|
||||
/* if backend_dcb is the master we can reply to the client */
|
||||
if (backend_dcb == master_dcb) {
|
||||
client_dcb->func.write(client_dcb, queue);
|
||||
} else {
|
||||
/* just consume the gwbuf without writing to the client */
|
||||
gwbuf_consume(queue, gwbuf_length(queue));
|
||||
}
|
||||
} else {
|
||||
/* normal flow */
|
||||
client_dcb->func.write(client_dcb, queue);
|
||||
}
|
||||
/**
|
||||
* Lock router client session for secure read of router session members.
|
||||
* Note that this could be done without lock by using version #
|
||||
*/
|
||||
if (rses_begin_router_action(router_cli_ses))
|
||||
{
|
||||
master_dcb = router_cli_ses->master_dcb;
|
||||
|
||||
/** Unlock */
|
||||
rses_exit_router_action(router_cli_ses);
|
||||
|
||||
client_dcb = backend_dcb->session->client;
|
||||
|
||||
if (backend_dcb != NULL &&
|
||||
backend_dcb->command == ROUTER_CHANGE_SESSION)
|
||||
{
|
||||
/* if backend_dcb is master we can reply to client */
|
||||
if (client_dcb != NULL &&
|
||||
backend_dcb == master_dcb)
|
||||
{
|
||||
client_dcb->func.write(client_dcb, writebuf);
|
||||
} else {
|
||||
/* consume the gwbuf without writing to client */
|
||||
gwbuf_consume(writebuf, gwbuf_length(writebuf));
|
||||
}
|
||||
}
|
||||
else if (client_dcb != NULL)
|
||||
{
|
||||
/* normal flow */
|
||||
client_dcb->func.write(client_dcb, writebuf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -710,8 +866,8 @@ static bool search_backend_servers(
|
||||
if (be != NULL) {
|
||||
skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"%lu [newSession] Examine server %s:%d with "
|
||||
"%d connections. Status is %d, "
|
||||
"%lu [search_backend_servers] Examine server %s:%d "
|
||||
"with %d connections. Status is %d, "
|
||||
"router->bitvalue is %d",
|
||||
pthread_self(),
|
||||
be->backend_server->name,
|
||||
@ -792,8 +948,8 @@ static bool search_backend_servers(
|
||||
*p_slave = be_slave;
|
||||
skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"%lu [readwritesplit:newSession] Selected Slave %s:%d "
|
||||
"from %d candidates.",
|
||||
"%lu [readwritesplit:search_backend_servers] Selected "
|
||||
"Slave %s:%d from %d candidates.",
|
||||
pthread_self(),
|
||||
be_slave->backend_server->name,
|
||||
be_slave->backend_server->port,
|
||||
@ -803,7 +959,8 @@ static bool search_backend_servers(
|
||||
*p_master = be_master;
|
||||
skygw_log_write(
|
||||
LOGFILE_TRACE,
|
||||
"%lu [readwritesplit:newSession] Selected Master %s:%d "
|
||||
"%lu [readwritesplit:search_backend_servers] Selected "
|
||||
"Master %s:%d "
|
||||
"from %d candidates.",
|
||||
pthread_self(),
|
||||
be_master->backend_server->name,
|
||||
|
Reference in New Issue
Block a user