Extended session command support to cover COM_CHANGE_USER, and COM_INIT_DB.

This implementation doesn't guarantee execution order between session commands and queries
if other backend server lags behind in session command execution.

In poll.c : moved processing of EPOLLERR and EPOLLHUP after processing of EPOLLIN and EPOLLOUT.
This ensures that COM_QUIT messages are read and routed forward before signals arrive (from local client/backend).
This commit is contained in:
VilhoRaatikka
2014-03-14 13:25:37 +02:00
parent cb6a976555
commit a3f7eebdc9
8 changed files with 233 additions and 293 deletions

View File

@ -130,7 +130,6 @@ GWBUF *rval;
rval->end = buf->end; rval->end = buf->end;
rval->gwbuf_type = buf->gwbuf_type; rval->gwbuf_type = buf->gwbuf_type;
rval->next = NULL; rval->next = NULL;
// rval->command = buf->command;
CHK_GWBUF(rval); CHK_GWBUF(rval);
return rval; return rval;
} }
@ -234,6 +233,7 @@ GWBUF *ptr = head;
if (!head) if (!head)
return tail; return tail;
CHK_GWBUF(head); CHK_GWBUF(head);
CHK_GWBUF(tail);
while (ptr->next) while (ptr->next)
{ {
ptr = ptr->next; ptr = ptr->next;
@ -262,9 +262,10 @@ GWBUF *
gwbuf_consume(GWBUF *head, unsigned int length) gwbuf_consume(GWBUF *head, unsigned int length)
{ {
GWBUF *rval = head; GWBUF *rval = head;
CHK_GWBUF(head); CHK_GWBUF(head);
GWBUF_CONSUME(head, length); GWBUF_CONSUME(head, length);
CHK_GWBUF(head);
if (GWBUF_EMPTY(head)) if (GWBUF_EMPTY(head))
{ {
rval = head->next; rval = head->next;

View File

@ -698,8 +698,7 @@ dcb_write(DCB *dcb, GWBUF *queue)
dcb->fd))); dcb->fd)));
return 0; return 0;
} }
spinlock_acquire(&dcb->writeqlock); spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq != NULL) if (dcb->writeq != NULL)
@ -809,9 +808,9 @@ dcb_write(DCB *dcb, GWBUF *queue)
pthread_self(), pthread_self(),
w, w,
dcb, dcb,
STRDCBSTATE(dcb->state), STRDCBSTATE(dcb->state),
dcb->fd))); dcb->fd)));
} } /*< while (queue != NULL) */
/*< /*<
* What wasn't successfully written is stored to write queue * What wasn't successfully written is stored to write queue
* for suspended write. * for suspended write.
@ -829,7 +828,6 @@ dcb_write(DCB *dcb, GWBUF *queue)
saved_errno != EAGAIN && saved_errno != EAGAIN &&
saved_errno != EWOULDBLOCK) saved_errno != EWOULDBLOCK)
{ {
queue = gwbuf_consume(queue, gwbuf_length(queue));
LOGIF(LE, (skygw_log_write_flush( LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR, LOGFILE_ERROR,
"Error : Writing to %s socket failed due %d, %s.", "Error : Writing to %s socket failed due %d, %s.",

View File

@ -357,60 +357,12 @@ poll_waitevents(void *arg)
dcb, dcb,
STRDCBROLE(dcb->dcb_role)))); STRDCBROLE(dcb->dcb_role))));
if (ev & EPOLLERR)
{
int eno = gw_getsockerrno(dcb->fd);
#if defined(SS_DEBUG)
if (eno == 0) {
eno = dcb_fake_write_errno[dcb->fd];
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Added fake errno %d. "
"%s",
pthread_self(),
eno,
strerror(eno))));
}
dcb_fake_write_errno[dcb->fd] = 0;
#endif
if (eno != 0) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLERR due %d, %s.",
pthread_self(),
eno,
strerror(eno))));
}
atomic_add(&pollStats.n_error, 1);
dcb->func.error(dcb);
}
if (ev & EPOLLHUP)
{
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLHUP on dcb %p, fd %d. "
"Errno %d, %s.",
pthread_self(),
dcb,
dcb->fd,
eno,
strerror(eno))));
atomic_add(&pollStats.n_hup, 1);
dcb->func.hangup(dcb);
}
if (ev & EPOLLOUT) if (ev & EPOLLOUT)
{ {
int eno = 0; int eno = 0;
eno = gw_getsockerrno(dcb->fd); eno = gw_getsockerrno(dcb->fd);
if (eno == 0) { if (eno == 0) {
#if 1
simple_mutex_lock( simple_mutex_lock(
&dcb->dcb_write_lock, &dcb->dcb_write_lock,
true); true);
@ -418,16 +370,13 @@ poll_waitevents(void *arg)
!dcb->dcb_write_active, !dcb->dcb_write_active,
"Write already active"); "Write already active");
dcb->dcb_write_active = TRUE; dcb->dcb_write_active = TRUE;
#endif
atomic_add( atomic_add(
&pollStats.n_write, &pollStats.n_write,
1); 1);
dcb->func.write_ready(dcb); dcb->func.write_ready(dcb);
#if 1
dcb->dcb_write_active = FALSE; dcb->dcb_write_active = FALSE;
simple_mutex_unlock( simple_mutex_unlock(
&dcb->dcb_write_lock); &dcb->dcb_write_lock);
#endif
} else { } else {
LOGIF(LD, (skygw_log_write( LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG, LOGFILE_DEBUG,
@ -443,13 +392,12 @@ poll_waitevents(void *arg)
} }
if (ev & EPOLLIN) if (ev & EPOLLIN)
{ {
#if 1
simple_mutex_lock(&dcb->dcb_read_lock, simple_mutex_lock(&dcb->dcb_read_lock,
true); true);
ss_info_dassert(!dcb->dcb_read_active, ss_info_dassert(!dcb->dcb_read_active,
"Read already active"); "Read already active");
dcb->dcb_read_active = TRUE; dcb->dcb_read_active = TRUE;
#endif
if (dcb->state == DCB_STATE_LISTENING) if (dcb->state == DCB_STATE_LISTENING)
{ {
LOGIF(LD, (skygw_log_write( LOGIF(LD, (skygw_log_write(
@ -474,12 +422,57 @@ poll_waitevents(void *arg)
atomic_add(&pollStats.n_read, 1); atomic_add(&pollStats.n_read, 1);
dcb->func.read(dcb); dcb->func.read(dcb);
} }
#if 1
dcb->dcb_read_active = FALSE; dcb->dcb_read_active = FALSE;
simple_mutex_unlock( simple_mutex_unlock(
&dcb->dcb_read_lock); &dcb->dcb_read_lock);
#endif
} }
if (ev & EPOLLERR)
{
int eno = gw_getsockerrno(dcb->fd);
#if defined(SS_DEBUG)
if (eno == 0) {
eno = dcb_fake_write_errno[dcb->fd];
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"Added fake errno %d. "
"%s",
pthread_self(),
eno,
strerror(eno))));
}
dcb_fake_write_errno[dcb->fd] = 0;
#endif
if (eno != 0) {
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLERR due %d, %s.",
pthread_self(),
eno,
strerror(eno))));
}
atomic_add(&pollStats.n_error, 1);
dcb->func.error(dcb);
}
if (ev & EPOLLHUP)
{
int eno = 0;
eno = gw_getsockerrno(dcb->fd);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [poll_waitevents] "
"EPOLLHUP on dcb %p, fd %d. "
"Errno %d, %s.",
pthread_self(),
dcb,
dcb->fd,
eno,
strerror(eno))));
atomic_add(&pollStats.n_hup, 1);
dcb->func.hangup(dcb);
}
} /*< for */ } /*< for */
no_op = FALSE; no_op = FALSE;
} }

View File

@ -68,6 +68,7 @@ typedef struct mysql_sescmd_st {
#endif #endif
rses_property_t* my_sescmd_prop; /*< parent property */ rses_property_t* my_sescmd_prop; /*< parent property */
GWBUF* my_sescmd_buf; /*< query buffer */ GWBUF* my_sescmd_buf; /*< query buffer */
unsigned char my_sescmd_packet_type;/*< packet type */
bool my_sescmd_is_replied; /*< is cmd replied to client */ bool my_sescmd_is_replied; /*< is cmd replied to client */
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
skygw_chk_t my_sescmd_chk_tail; skygw_chk_t my_sescmd_chk_tail;

View File

@ -283,6 +283,8 @@ static int gw_read_backend_event(DCB *dcb) {
} }
if (backend_protocol->state == MYSQL_AUTH_FAILED) { if (backend_protocol->state == MYSQL_AUTH_FAILED) {
spinlock_acquire(&dcb->delayqlock);
/*< /*<
* vraa : errorHandle * vraa : errorHandle
* check the delayq before the reply * check the delayq before the reply
@ -295,10 +297,12 @@ static int gw_read_backend_event(DCB *dcb) {
0, 0,
"Connection to backend lost."); "Connection to backend lost.");
// consume all the delay queue // consume all the delay queue
dcb->delayq = gwbuf_consume( while ((dcb->delayq = gwbuf_consume(
dcb->delayq, dcb->delayq,
gwbuf_length(dcb->delayq)); GWBUF_LENGTH(dcb->delayq))) != NULL);
} }
spinlock_release(&dcb->delayqlock);
while (session->state != SESSION_STATE_ROUTER_READY) while (session->state != SESSION_STATE_ROUTER_READY)
{ {
@ -347,7 +351,7 @@ static int gw_read_backend_event(DCB *dcb) {
pthread_self(), pthread_self(),
dcb->fd, dcb->fd,
current_session->user))); current_session->user)));
/* check the delay queue and flush the data */ /* check the delay queue and flush the data */
if (dcb->delayq) if (dcb->delayq)
{ {
@ -802,12 +806,6 @@ static int backend_write_delayqueue(DCB *dcb)
localq = dcb->delayq; localq = dcb->delayq;
dcb->delayq = NULL; dcb->delayq = NULL;
/*<
* Now we set the last command received, from the delayed queue
*/
// memcpy(&dcb->command, &localq->command, sizeof(dcb->command));
spinlock_release(&dcb->delayqlock); spinlock_release(&dcb->delayqlock);
rc = dcb_write(dcb, localq); rc = dcb_write(dcb, localq);
@ -856,8 +854,6 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB
backend_protocol = backend->protocol; backend_protocol = backend->protocol;
client_protocol = in_session->client->protocol; client_protocol = in_session->client->protocol;
// queue->command = ROUTER_CHANGE_SESSION;
// now get the user, after 4 bytes header and 1 byte command // now get the user, after 4 bytes header and 1 byte command
client_auth_packet += 5; client_auth_packet += 5;
strcpy(username, (char *)client_auth_packet); strcpy(username, (char *)client_auth_packet);
@ -899,30 +895,14 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB
rv = gw_send_change_user_to_backend(database, username, client_sha1, backend_protocol); rv = gw_send_change_user_to_backend(database, username, client_sha1, backend_protocol);
/*<
* The current queue was not handled by func.write() in gw_send_change_user_to_backend()
* We wrote a new gwbuf
* Set backend command here!
*/
memcpy(&backend->command, &queue->command, sizeof(backend->command));
/*< /*<
* Now copy new data into user session * Now copy new data into user session
*/ */
strcpy(current_session->user, username); strcpy(current_session->user, username);
strcpy(current_session->db, database); strcpy(current_session->db, database);
memcpy(current_session->client_sha1, client_sha1, sizeof(current_session->client_sha1)); memcpy(current_session->client_sha1, client_sha1, sizeof(current_session->client_sha1));
} }
gwbuf_free(queue);
// consume all the data received from client
spinlock_acquire(&backend->writeqlock);
len = gwbuf_length(queue);
queue = gwbuf_consume(queue, len);
spinlock_release(&backend->writeqlock);
return rv; return rv;
} }

View File

@ -1157,7 +1157,6 @@ static int gw_error_client_event(DCB *dcb) {
router = session->service->router; router = session->service->router;
router_instance = session->service->router_instance; router_instance = session->service->router_instance;
rsession = session->router_session; rsession = session->router_session;
router->closeSession(router_instance, rsession); router->closeSession(router_instance, rsession);
} }
dcb_close(dcb); dcb_close(dcb);
@ -1215,7 +1214,8 @@ gw_client_hangup_event(DCB *dcb)
void* router_instance; void* router_instance;
void* rsession; void* rsession;
int rc = 1; int rc = 1;
#if defined(SS_DEBUG)
#if defined(SS_DEBUG)
MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol; MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol;
if (dcb->state == DCB_STATE_POLLING || if (dcb->state == DCB_STATE_POLLING ||
dcb->state == DCB_STATE_NOPOLLING || dcb->state == DCB_STATE_NOPOLLING ||
@ -1224,8 +1224,6 @@ gw_client_hangup_event(DCB *dcb)
CHK_PROTOCOL(protocol); CHK_PROTOCOL(protocol);
} }
#endif #endif
CHK_DCB(dcb); CHK_DCB(dcb);
if (dcb->state != DCB_STATE_POLLING) { if (dcb->state != DCB_STATE_POLLING) {
@ -1242,7 +1240,6 @@ gw_client_hangup_event(DCB *dcb)
router = session->service->router; router = session->service->router;
router_instance = session->service->router_instance; router_instance = session->service->router_instance;
rsession = session->router_session; rsession = session->router_session;
router->closeSession(router_instance, rsession); router->closeSession(router_instance, rsession);
} }
@ -1314,13 +1311,6 @@ static int route_by_statement(
uint8_t* payload; uint8_t* payload;
static size_t len; static size_t len;
#if defined(SS_DEBUG)
uint8_t router_capabilities;
router_capabilities = router->getCapabilities(router_instance, rsession);
ss_dassert(router_capabilities == RCAP_TYPE_STMT_INPUT);
#endif
do do
{ {
stmtbuf = gw_MySQL_get_next_stmt(&readbuf); stmtbuf = gw_MySQL_get_next_stmt(&readbuf);

View File

@ -181,7 +181,7 @@ int gw_read_backend_handshake(MySQLProtocol *conn) {
conn->state = MYSQL_AUTH_SENT; conn->state = MYSQL_AUTH_SENT;
// consume all the data here // consume all the data here
head = gwbuf_consume(head, gwbuf_length(head)); head = gwbuf_consume(head, GWBUF_LENGTH(head));
return 0; return 0;
} }
@ -337,7 +337,7 @@ int gw_receive_backend_auth(
/*< /*<
* Remove data from buffer. * Remove data from buffer.
*/ */
head = gwbuf_consume(head, gwbuf_length(head)); head = gwbuf_consume(head, GWBUF_LENGTH(head));
} }
else if (n == 0) else if (n == 0)
{ {

View File

@ -98,6 +98,7 @@ static void mysql_sescmd_done(
static mysql_sescmd_t* mysql_sescmd_init ( static mysql_sescmd_t* mysql_sescmd_init (
rses_property_t* rses_prop, rses_property_t* rses_prop,
GWBUF* sescmd_buf, GWBUF* sescmd_buf,
unsigned char packet_type,
ROUTER_CLIENT_SES* rses); ROUTER_CLIENT_SES* rses);
static rses_property_t* mysql_sescmd_get_property( static rses_property_t* mysql_sescmd_get_property(
@ -590,9 +591,7 @@ static int routeQuery(
} }
else else
{ {
/** /*< Lock router client session for secure read of DCBs */
* Lock router client session for secure read of DCBs
*/
rses_is_closed = rses_is_closed =
!(rses_begin_locked_router_action(router_cli_ses)); !(rses_begin_locked_router_action(router_cli_ses));
} }
@ -678,6 +677,43 @@ static int routeQuery(
switch (qtype) { switch (qtype) {
case QUERY_TYPE_WRITE: case QUERY_TYPE_WRITE:
#if 0
/**
* Running this block cause deadlock because read mutex is
* on hold. This doesn't serialize subsequent session commands
* and queries if there are multiple session commands and other
* backend starts to lag behind. vraa : 14.3.13
*/
/**
* Wait until master has executed all its session commands.
* TODO: if master fails it needs to be detected in the loop.
*/
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto return_ret;
}
while (sescmd_cursor_is_active(rses_get_sescmd_cursor(
router_cli_ses,
BE_MASTER)))
{
rses_end_locked_router_action(router_cli_ses);
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
"%lu [routeQuery:rwsplit] Session command is "
"active in MASTER. Waiting in loop.",
pthread_self())));
usleep(10);
if (!rses_begin_locked_router_action(router_cli_ses))
{
goto return_ret;
}
}
rses_end_locked_router_action(router_cli_ses);
#endif
LOGIF(LT, (skygw_log_write( LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] Query type\t%s, " "%lu [routeQuery:rwsplit] Query type\t%s, "
@ -702,84 +738,65 @@ static int routeQuery(
"%lu [routeQuery:rwsplit] Query type\t%s, " "%lu [routeQuery:rwsplit] Query type\t%s, "
"routing to Slave.", "routing to Slave.",
pthread_self(), pthread_self(),
STRQTYPE(qtype)))); STRQTYPE(qtype))));
/** Lock router session */ while (true)
if (!rses_begin_locked_router_action(router_cli_ses))
{ {
/** Log error to debug */ if (!rses_begin_locked_router_action(router_cli_ses))
goto return_ret; {
} goto return_ret;
/** }
* If session command is being executed in slave /**
* route to master * If session command is being executed in slave
*/ * route to master.
if (sescmd_cursor_is_active(rses_get_sescmd_cursor( */
router_cli_ses, if (!sescmd_cursor_is_active(rses_get_sescmd_cursor(
BE_SLAVE))) router_cli_ses,
{ BE_SLAVE)))
LOGIF(LT, tracelog_routed_query(router_cli_ses, {
"routeQuery", rses_end_locked_router_action(router_cli_ses);
master_dcb, LOGIF(LT, tracelog_routed_query(router_cli_ses,
gwbuf_clone(querybuf))); "routeQuery",
slave_dcb,
ret = master_dcb->func.write(master_dcb, querybuf); gwbuf_clone(querybuf)));
atomic_add(&inst->stats.n_master, 1);
} ret = slave_dcb->func.write(slave_dcb, querybuf);
else atomic_add(&inst->stats.n_slave, 1);
{ break;
LOGIF(LT, tracelog_routed_query(router_cli_ses, }
"routeQuery", else if (!sescmd_cursor_is_active(rses_get_sescmd_cursor(
slave_dcb, router_cli_ses,
gwbuf_clone(querybuf))); BE_MASTER)))
ret = slave_dcb->func.write(slave_dcb, querybuf); {
atomic_add(&inst->stats.n_slave, 1); rses_end_locked_router_action(router_cli_ses);
} LOGIF(LT, tracelog_routed_query(router_cli_ses,
rses_end_locked_router_action(router_cli_ses); "routeQuery",
master_dcb,
gwbuf_clone(querybuf)));
ret = slave_dcb->func.write(master_dcb, querybuf);
atomic_add(&inst->stats.n_master, 1);
break;
}
rses_end_locked_router_action(router_cli_ses);
} /*< while (true) */
goto return_ret; goto return_ret;
break; break;
case QUERY_TYPE_SESSION_WRITE: case QUERY_TYPE_SESSION_WRITE:
/** /**
* Execute in backends used by current router session. * Execute in backends used by current router session.
* Save session variable commands to router session property * Save session variable commands to router session property
* struct. Thus, they * struct. Thus, they can be replayed in backends which are
* can be replayed in backends which are started and joined later. * started and joined later.
* *
* Suppress OK packets sent to MaxScale by slaves. * Suppress redundant OK packets sent by backends.
* *
* DOES THIS ALL APPLY TO COM_QUIT AS WELL?? * DOES THIS ALL APPLY TO COM_QUIT AS WELL??
* *
*/ * The first OK packet is replied to the client.
*
/** */
* Update connections which are used in this session.
*
* For each connection updated, add a flag which indicates that
* OK Packet must arrive for this command before server
* in question is allowed to be used by router. That is,
* maintain a queue of pending OK packets and remove item
* from queue by FIFO.
*
* Return when the master responds OK Packet. Send that
* OK packet back to client.
*
* Suppress OK packets sent to MaxScale by slaves.
*
* Open questions:
* How to handle interleaving session write
* and queries? It would be simple if OK must be received
* from all/both servers before continuing query execution.
* How to maintain the order of operations? Execution queue
* would solve the problem. In the queue some things must be
* executed in serialized manner while some could be executed
* in parallel. Queries mostly.
*
* Instead of waiting for the OK packet from the master, the
* first OK packet could also be sent to client. TBD.
* vraa 9.12.13
*
*/
LOGIF(LT, (skygw_log_write( LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE, LOGFILE_TRACE,
"%lu [routeQuery:rwsplit] DCB M:%p s:%p, " "%lu [routeQuery:rwsplit] DCB M:%p s:%p, "
@ -791,106 +808,45 @@ static int routeQuery(
STRQTYPE(qtype), STRQTYPE(qtype),
STRPACKETTYPE(packet_type)))); STRPACKETTYPE(packet_type))));
switch(packet_type) { prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
case COM_CHANGE_USER: /**
* Additional reference is created to querybuf to
LOGIF(LT, tracelog_routed_query( * prevent it from being released before properties
router_cli_ses, * are cleaned up as a part of router sessionclean-up.
"routeQuery", */
master_dcb, mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses);
gwbuf_clone(querybuf)));
/** Lock router session */
master_dcb->func.auth( if (!rses_begin_locked_router_action(router_cli_ses))
master_dcb, {
NULL, rses_property_done(prop);
master_dcb->session, goto return_ret;
gwbuf_clone(querybuf)); }
/** Add sescmd property to router client session */
LOGIF(LT, tracelog_routed_query( rses_property_add(router_cli_ses, prop);
router_cli_ses,
"routeQuery", /** Execute session command in master */
slave_dcb, if (execute_sescmd_in_backend(router_cli_ses, BE_MASTER))
gwbuf_clone(querybuf))); {
ret = 1;
slave_dcb->func.auth( }
slave_dcb, else
NULL, {
master_dcb->session, /** Log error */
querybuf); }
break; /** Execute session command in slave */
if (execute_sescmd_in_backend(router_cli_ses, BE_SLAVE))
case COM_QUIT: {
case COM_QUERY: ret = 1;
/** }
* 1. Create new property of type RSES_PROP_TYPE_SESCMD. else
* 2. Add property to the ROUTER_CLIENT_SES struct of {
* this router session. /** Log error */
* 3. For each backend, and for each non-executed }
* sescmd:
* call execution of current sescmd in /** Unlock router session */
* all backends as long as both have executed rses_end_locked_router_action(router_cli_ses);
* them all.
* Execution call is dcb->func.session.
* All sescmds are executed when its return value is
* NULL, otherwise it is a pointer to next property.
*/
prop = rses_property_init(RSES_PROP_TYPE_SESCMD);
/**
* Additional reference is created to querybuf to
* prevent it from being released before properties
* are cleaned up as a part of router sessionclean-up.
*/
mysql_sescmd_init(prop, querybuf, router_cli_ses);
/** Lock router session */
if (!rses_begin_locked_router_action(router_cli_ses))
{
rses_property_done(prop);
goto return_ret;
}
/** Add sescmd property to router client session */
rses_property_add(router_cli_ses, prop);
/** Execute session command in master */
if (execute_sescmd_in_backend(router_cli_ses, BE_MASTER))
{
ret = 1;
}
else
{
/** Log error */
}
/** Execute session command in slave */
if (execute_sescmd_in_backend(router_cli_ses, BE_SLAVE))
{
ret = 1;
}
else
{
/** Log error */
}
/** Unlock router session */
rses_end_locked_router_action(router_cli_ses);
break;
default:
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
master_dcb,
gwbuf_clone(querybuf)));
ret = master_dcb->func.write(master_dcb,
(void *)gwbuf_clone(querybuf));
LOGIF(LT, tracelog_routed_query(router_cli_ses,
"routeQuery",
slave_dcb,
gwbuf_clone(querybuf)));
slave_dcb->func.write(slave_dcb, (void *)querybuf);
break;
} /**< switch by packet type */
atomic_add(&inst->stats.n_all, 1); atomic_add(&inst->stats.n_all, 1);
goto return_ret; goto return_ret;
break; break;
@ -1066,8 +1022,9 @@ static void clientReply(
*/ */
if (!rses_begin_locked_router_action(router_cli_ses)) if (!rses_begin_locked_router_action(router_cli_ses))
{ {
/** is this needed ??*/ while ((writebuf = gwbuf_consume(
gwbuf_consume(writebuf, gwbuf_length(writebuf)); writebuf,
GWBUF_LENGTH(writebuf))) != NULL);
goto lock_failed; goto lock_failed;
} }
master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; master_dcb = router_cli_ses->rses_dcb[BE_MASTER];
@ -1091,7 +1048,9 @@ static void clientReply(
*/ */
if (client_dcb == NULL) if (client_dcb == NULL)
{ {
gwbuf_consume(writebuf, gwbuf_length(writebuf)); while ((writebuf = gwbuf_consume(
writebuf,
GWBUF_LENGTH(writebuf))) != NULL);
/** Log that client was closed before reply */ /** Log that client was closed before reply */
return; return;
} }
@ -1111,7 +1070,7 @@ static void clientReply(
goto lock_failed; goto lock_failed;
} }
scur = rses_get_sescmd_cursor(router_cli_ses, be_type); scur = rses_get_sescmd_cursor(router_cli_ses, be_type);
/** /**
* Active cursor means that reply is from session command * Active cursor means that reply is from session command
* execution. * execution.
*/ */
@ -1134,6 +1093,7 @@ static void clientReply(
} }
/** Set cursor passive. */ /** Set cursor passive. */
sescmd_cursor_set_active(scur, false); sescmd_cursor_set_active(scur, false);
ss_dassert(!sescmd_cursor_is_active(scur));
/** Unlock router session */ /** Unlock router session */
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
@ -1142,6 +1102,7 @@ static void clientReply(
{ {
/** Write reply to client DCB */ /** Write reply to client DCB */
client_dcb->func.write(client_dcb, writebuf); client_dcb->func.write(client_dcb, writebuf);
ss_dassert(!sescmd_cursor_is_active(scur));
/** Unlock router session */ /** Unlock router session */
rses_end_locked_router_action(router_cli_ses); rses_end_locked_router_action(router_cli_ses);
LOGIF(LT, (skygw_log_write_flush( LOGIF(LT, (skygw_log_write_flush(
@ -1150,12 +1111,10 @@ static void clientReply(
"backend dcb %p. End of normal reply.", "backend dcb %p. End of normal reply.",
pthread_self(), pthread_self(),
client_dcb, client_dcb,
backend_dcb))); backend_dcb)));
} }
return; /*< succeed */
lock_failed: lock_failed:
/** log that router session couldn't be locked */ return;
return;
} }
/** /**
@ -1443,6 +1402,7 @@ static void rses_end_locked_property_action(
static mysql_sescmd_t* mysql_sescmd_init ( static mysql_sescmd_t* mysql_sescmd_init (
rses_property_t* rses_prop, rses_property_t* rses_prop,
GWBUF* sescmd_buf, GWBUF* sescmd_buf,
unsigned char packet_type,
ROUTER_CLIENT_SES* rses) ROUTER_CLIENT_SES* rses)
{ {
mysql_sescmd_t* sescmd; mysql_sescmd_t* sescmd;
@ -1453,11 +1413,12 @@ static mysql_sescmd_t* mysql_sescmd_init (
sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */ sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */
// sescmd->my_sescmd_rsession = rses; // sescmd->my_sescmd_rsession = rses;
#if defined(SS_DEBUG) #if defined(SS_DEBUG)
sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD; sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD;
sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD; sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD;
#endif #endif
/** Set session command buffer */ /** Set session command buffer */
sescmd->my_sescmd_buf = sescmd_buf; sescmd->my_sescmd_buf = sescmd_buf;
sescmd->my_sescmd_packet_type = packet_type;
return sescmd; return sescmd;
} }
@ -1624,8 +1585,25 @@ static bool execute_sescmd_in_backend(
"execute_sescmd_in_backend", "execute_sescmd_in_backend",
dcb, dcb,
sescmd_cursor_clone_querybuf(scur))); sescmd_cursor_clone_querybuf(scur)));
rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur)); switch (scur->scmd_cur_cmd->my_sescmd_packet_type) {
case COM_CHANGE_USER:
rc = dcb->func.auth(
dcb,
NULL,
dcb->session,
sescmd_cursor_clone_querybuf(scur));
break;
case COM_QUIT:
case COM_QUERY:
case COM_INIT_DB:
default:
rc = dcb->func.write(
dcb,
sescmd_cursor_clone_querybuf(scur));
break;
}
if (rc != 1) if (rc != 1)
{ {
succp = false; succp = false;
@ -1685,7 +1663,6 @@ static bool cont_exec_sescmd_in_backend(
dcb, dcb,
sescmd_cursor_clone_querybuf(scur))); sescmd_cursor_clone_querybuf(scur)));
// rc = dcb->func.session(dcb, sescmd_cursor_clone_querybuf(scur));
rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur)); rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur));
if (rc != 1) if (rc != 1)
{ {