diff --git a/server/core/buffer.c b/server/core/buffer.c index 57228b253..b21cf216c 100644 --- a/server/core/buffer.c +++ b/server/core/buffer.c @@ -130,7 +130,6 @@ GWBUF *rval; rval->end = buf->end; rval->gwbuf_type = buf->gwbuf_type; rval->next = NULL; -// rval->command = buf->command; CHK_GWBUF(rval); return rval; } @@ -234,6 +233,7 @@ GWBUF *ptr = head; if (!head) return tail; CHK_GWBUF(head); + CHK_GWBUF(tail); while (ptr->next) { ptr = ptr->next; @@ -262,9 +262,10 @@ GWBUF * gwbuf_consume(GWBUF *head, unsigned int length) { GWBUF *rval = head; - CHK_GWBUF(head); GWBUF_CONSUME(head, length); + CHK_GWBUF(head); + if (GWBUF_EMPTY(head)) { rval = head->next; diff --git a/server/core/dcb.c b/server/core/dcb.c index db9e96b60..d1b5a4ffb 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -698,8 +698,7 @@ dcb_write(DCB *dcb, GWBUF *queue) dcb->fd))); return 0; } - - + spinlock_acquire(&dcb->writeqlock); if (dcb->writeq != NULL) @@ -809,9 +808,9 @@ dcb_write(DCB *dcb, GWBUF *queue) pthread_self(), w, dcb, - STRDCBSTATE(dcb->state), + STRDCBSTATE(dcb->state), dcb->fd))); - } + } /*< while (queue != NULL) */ /*< * What wasn't successfully written is stored to write queue * for suspended write. @@ -829,7 +828,6 @@ dcb_write(DCB *dcb, GWBUF *queue) saved_errno != EAGAIN && saved_errno != EWOULDBLOCK) { - queue = gwbuf_consume(queue, gwbuf_length(queue)); LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, "Error : Writing to %s socket failed due %d, %s.", diff --git a/server/core/poll.c b/server/core/poll.c index 16c8807a9..7cd4ae992 100644 --- a/server/core/poll.c +++ b/server/core/poll.c @@ -357,60 +357,12 @@ poll_waitevents(void *arg) dcb, STRDCBROLE(dcb->dcb_role)))); - if (ev & EPOLLERR) - { - int eno = gw_getsockerrno(dcb->fd); -#if defined(SS_DEBUG) - if (eno == 0) { - eno = dcb_fake_write_errno[dcb->fd]; - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "Added fake errno %d. " - "%s", - pthread_self(), - eno, - strerror(eno)))); - } - dcb_fake_write_errno[dcb->fd] = 0; -#endif - if (eno != 0) { - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "EPOLLERR due %d, %s.", - pthread_self(), - eno, - strerror(eno)))); - } - atomic_add(&pollStats.n_error, 1); - dcb->func.error(dcb); - } - if (ev & EPOLLHUP) - { - int eno = 0; - eno = gw_getsockerrno(dcb->fd); - - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [poll_waitevents] " - "EPOLLHUP on dcb %p, fd %d. " - "Errno %d, %s.", - pthread_self(), - dcb, - dcb->fd, - eno, - strerror(eno)))); - atomic_add(&pollStats.n_hup, 1); - dcb->func.hangup(dcb); - } if (ev & EPOLLOUT) { int eno = 0; eno = gw_getsockerrno(dcb->fd); if (eno == 0) { -#if 1 simple_mutex_lock( &dcb->dcb_write_lock, true); @@ -418,16 +370,13 @@ poll_waitevents(void *arg) !dcb->dcb_write_active, "Write already active"); dcb->dcb_write_active = TRUE; -#endif atomic_add( &pollStats.n_write, 1); dcb->func.write_ready(dcb); -#if 1 dcb->dcb_write_active = FALSE; simple_mutex_unlock( &dcb->dcb_write_lock); -#endif } else { LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, @@ -443,13 +392,12 @@ poll_waitevents(void *arg) } if (ev & EPOLLIN) { -#if 1 simple_mutex_lock(&dcb->dcb_read_lock, true); ss_info_dassert(!dcb->dcb_read_active, "Read already active"); dcb->dcb_read_active = TRUE; -#endif + if (dcb->state == DCB_STATE_LISTENING) { LOGIF(LD, (skygw_log_write( @@ -474,12 +422,57 @@ poll_waitevents(void *arg) atomic_add(&pollStats.n_read, 1); dcb->func.read(dcb); } -#if 1 dcb->dcb_read_active = FALSE; simple_mutex_unlock( &dcb->dcb_read_lock); -#endif } + if (ev & EPOLLERR) + { + int eno = gw_getsockerrno(dcb->fd); + #if defined(SS_DEBUG) + if (eno == 0) { + eno = dcb_fake_write_errno[dcb->fd]; + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "Added fake errno %d. " + "%s", + pthread_self(), + eno, + strerror(eno)))); + } + dcb_fake_write_errno[dcb->fd] = 0; + #endif + if (eno != 0) { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "EPOLLERR due %d, %s.", + pthread_self(), + eno, + strerror(eno)))); + } + atomic_add(&pollStats.n_error, 1); + dcb->func.error(dcb); + } + if (ev & EPOLLHUP) + { + int eno = 0; + eno = gw_getsockerrno(dcb->fd); + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [poll_waitevents] " + "EPOLLHUP on dcb %p, fd %d. " + "Errno %d, %s.", + pthread_self(), + dcb, + dcb->fd, + eno, + strerror(eno)))); + atomic_add(&pollStats.n_hup, 1); + dcb->func.hangup(dcb); + } } /*< for */ no_op = FALSE; } diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index c5dcbdaaa..477096b76 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -68,6 +68,7 @@ typedef struct mysql_sescmd_st { #endif rses_property_t* my_sescmd_prop; /*< parent property */ GWBUF* my_sescmd_buf; /*< query buffer */ + unsigned char my_sescmd_packet_type;/*< packet type */ bool my_sescmd_is_replied; /*< is cmd replied to client */ #if defined(SS_DEBUG) skygw_chk_t my_sescmd_chk_tail; diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 03e433062..78216d62f 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -283,6 +283,8 @@ static int gw_read_backend_event(DCB *dcb) { } if (backend_protocol->state == MYSQL_AUTH_FAILED) { + + spinlock_acquire(&dcb->delayqlock); /*< * vraa : errorHandle * check the delayq before the reply @@ -295,10 +297,12 @@ static int gw_read_backend_event(DCB *dcb) { 0, "Connection to backend lost."); // consume all the delay queue - dcb->delayq = gwbuf_consume( + while ((dcb->delayq = gwbuf_consume( dcb->delayq, - gwbuf_length(dcb->delayq)); + GWBUF_LENGTH(dcb->delayq))) != NULL); } + spinlock_release(&dcb->delayqlock); + while (session->state != SESSION_STATE_ROUTER_READY) { @@ -347,7 +351,7 @@ static int gw_read_backend_event(DCB *dcb) { pthread_self(), dcb->fd, current_session->user))); - + /* check the delay queue and flush the data */ if (dcb->delayq) { @@ -802,12 +806,6 @@ static int backend_write_delayqueue(DCB *dcb) localq = dcb->delayq; dcb->delayq = NULL; - /*< - * Now we set the last command received, from the delayed queue - */ - -// memcpy(&dcb->command, &localq->command, sizeof(dcb->command)); - spinlock_release(&dcb->delayqlock); rc = dcb_write(dcb, localq); @@ -856,8 +854,6 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB backend_protocol = backend->protocol; client_protocol = in_session->client->protocol; -// queue->command = ROUTER_CHANGE_SESSION; - // now get the user, after 4 bytes header and 1 byte command client_auth_packet += 5; strcpy(username, (char *)client_auth_packet); @@ -899,30 +895,14 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB rv = gw_send_change_user_to_backend(database, username, client_sha1, backend_protocol); - /*< - * The current queue was not handled by func.write() in gw_send_change_user_to_backend() - * We wrote a new gwbuf - * Set backend command here! - */ - memcpy(&backend->command, &queue->command, sizeof(backend->command)); - /*< * Now copy new data into user session */ strcpy(current_session->user, username); strcpy(current_session->db, database); memcpy(current_session->client_sha1, client_sha1, sizeof(current_session->client_sha1)); - } - - // consume all the data received from client - - spinlock_acquire(&backend->writeqlock); - - len = gwbuf_length(queue); - queue = gwbuf_consume(queue, len); - - spinlock_release(&backend->writeqlock); - + } + gwbuf_free(queue); return rv; } diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 255ede201..ecea34972 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -1157,7 +1157,6 @@ static int gw_error_client_event(DCB *dcb) { router = session->service->router; router_instance = session->service->router_instance; rsession = session->router_session; - router->closeSession(router_instance, rsession); } dcb_close(dcb); @@ -1215,7 +1214,8 @@ gw_client_hangup_event(DCB *dcb) void* router_instance; void* rsession; int rc = 1; -#if defined(SS_DEBUG) + + #if defined(SS_DEBUG) MySQLProtocol* protocol = (MySQLProtocol *)dcb->protocol; if (dcb->state == DCB_STATE_POLLING || dcb->state == DCB_STATE_NOPOLLING || @@ -1224,8 +1224,6 @@ gw_client_hangup_event(DCB *dcb) CHK_PROTOCOL(protocol); } #endif - - CHK_DCB(dcb); if (dcb->state != DCB_STATE_POLLING) { @@ -1242,7 +1240,6 @@ gw_client_hangup_event(DCB *dcb) router = session->service->router; router_instance = session->service->router_instance; rsession = session->router_session; - router->closeSession(router_instance, rsession); } @@ -1314,13 +1311,6 @@ static int route_by_statement( uint8_t* payload; static size_t len; -#if defined(SS_DEBUG) - uint8_t router_capabilities; - - router_capabilities = router->getCapabilities(router_instance, rsession); - - ss_dassert(router_capabilities == RCAP_TYPE_STMT_INPUT); -#endif do { stmtbuf = gw_MySQL_get_next_stmt(&readbuf); diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index 01fd75b27..7bfb3666c 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -181,7 +181,7 @@ int gw_read_backend_handshake(MySQLProtocol *conn) { conn->state = MYSQL_AUTH_SENT; // consume all the data here - head = gwbuf_consume(head, gwbuf_length(head)); + head = gwbuf_consume(head, GWBUF_LENGTH(head)); return 0; } @@ -337,7 +337,7 @@ int gw_receive_backend_auth( /*< * Remove data from buffer. */ - head = gwbuf_consume(head, gwbuf_length(head)); + head = gwbuf_consume(head, GWBUF_LENGTH(head)); } else if (n == 0) { diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index a70b7c8d9..1548eac57 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -98,6 +98,7 @@ static void mysql_sescmd_done( static mysql_sescmd_t* mysql_sescmd_init ( rses_property_t* rses_prop, GWBUF* sescmd_buf, + unsigned char packet_type, ROUTER_CLIENT_SES* rses); static rses_property_t* mysql_sescmd_get_property( @@ -590,9 +591,7 @@ static int routeQuery( } else { - /** - * Lock router client session for secure read of DCBs - */ + /*< Lock router client session for secure read of DCBs */ rses_is_closed = !(rses_begin_locked_router_action(router_cli_ses)); } @@ -678,6 +677,43 @@ static int routeQuery( switch (qtype) { case QUERY_TYPE_WRITE: +#if 0 + /** + * Running this block cause deadlock because read mutex is + * on hold. This doesn't serialize subsequent session commands + * and queries if there are multiple session commands and other + * backend starts to lag behind. vraa : 14.3.13 + */ + /** + * Wait until master has executed all its session commands. + * TODO: if master fails it needs to be detected in the loop. + */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + goto return_ret; + } + + while (sescmd_cursor_is_active(rses_get_sescmd_cursor( + router_cli_ses, + BE_MASTER))) + { + rses_end_locked_router_action(router_cli_ses); + + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [routeQuery:rwsplit] Session command is " + "active in MASTER. Waiting in loop.", + pthread_self()))); + + usleep(10); + + if (!rses_begin_locked_router_action(router_cli_ses)) + { + goto return_ret; + } + } + rses_end_locked_router_action(router_cli_ses); +#endif LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "%lu [routeQuery:rwsplit] Query type\t%s, " @@ -702,84 +738,65 @@ static int routeQuery( "%lu [routeQuery:rwsplit] Query type\t%s, " "routing to Slave.", pthread_self(), - STRQTYPE(qtype)))); - /** Lock router session */ - if (!rses_begin_locked_router_action(router_cli_ses)) + STRQTYPE(qtype)))); + while (true) { - /** Log error to debug */ - goto return_ret; - } - /** - * If session command is being executed in slave - * route to master - */ - if (sescmd_cursor_is_active(rses_get_sescmd_cursor( - router_cli_ses, - BE_SLAVE))) - { - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "routeQuery", - master_dcb, - gwbuf_clone(querybuf))); - - ret = master_dcb->func.write(master_dcb, querybuf); - atomic_add(&inst->stats.n_master, 1); - } - else - { - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "routeQuery", - slave_dcb, - gwbuf_clone(querybuf))); - - ret = slave_dcb->func.write(slave_dcb, querybuf); - atomic_add(&inst->stats.n_slave, 1); - } - rses_end_locked_router_action(router_cli_ses); + if (!rses_begin_locked_router_action(router_cli_ses)) + { + goto return_ret; + } + /** + * If session command is being executed in slave + * route to master. + */ + if (!sescmd_cursor_is_active(rses_get_sescmd_cursor( + router_cli_ses, + BE_SLAVE))) + { + rses_end_locked_router_action(router_cli_ses); + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "routeQuery", + slave_dcb, + gwbuf_clone(querybuf))); + + ret = slave_dcb->func.write(slave_dcb, querybuf); + atomic_add(&inst->stats.n_slave, 1); + break; + } + else if (!sescmd_cursor_is_active(rses_get_sescmd_cursor( + router_cli_ses, + BE_MASTER))) + + { + rses_end_locked_router_action(router_cli_ses); + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "routeQuery", + master_dcb, + gwbuf_clone(querybuf))); + + ret = slave_dcb->func.write(master_dcb, querybuf); + atomic_add(&inst->stats.n_master, 1); + break; + } + rses_end_locked_router_action(router_cli_ses); + } /*< while (true) */ goto return_ret; break; case QUERY_TYPE_SESSION_WRITE: /** - * Execute in backends used by current router session. - * Save session variable commands to router session property - * struct. Thus, they - * can be replayed in backends which are started and joined later. - * - * Suppress OK packets sent to MaxScale by slaves. - * - * DOES THIS ALL APPLY TO COM_QUIT AS WELL?? - * - */ - - /** - * Update connections which are used in this session. - * - * For each connection updated, add a flag which indicates that - * OK Packet must arrive for this command before server - * in question is allowed to be used by router. That is, - * maintain a queue of pending OK packets and remove item - * from queue by FIFO. - * - * Return when the master responds OK Packet. Send that - * OK packet back to client. - * - * Suppress OK packets sent to MaxScale by slaves. - * - * Open questions: - * How to handle interleaving session write - * and queries? It would be simple if OK must be received - * from all/both servers before continuing query execution. - * How to maintain the order of operations? Execution queue - * would solve the problem. In the queue some things must be - * executed in serialized manner while some could be executed - * in parallel. Queries mostly. - * - * Instead of waiting for the OK packet from the master, the - * first OK packet could also be sent to client. TBD. - * vraa 9.12.13 - * - */ + * Execute in backends used by current router session. + * Save session variable commands to router session property + * struct. Thus, they can be replayed in backends which are + * started and joined later. + * + * Suppress redundant OK packets sent by backends. + * + * DOES THIS ALL APPLY TO COM_QUIT AS WELL?? + * + * The first OK packet is replied to the client. + * + */ LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "%lu [routeQuery:rwsplit] DCB M:%p s:%p, " @@ -791,106 +808,45 @@ static int routeQuery( STRQTYPE(qtype), STRPACKETTYPE(packet_type)))); - switch(packet_type) { - case COM_CHANGE_USER: - - LOGIF(LT, tracelog_routed_query( - router_cli_ses, - "routeQuery", - master_dcb, - gwbuf_clone(querybuf))); - - master_dcb->func.auth( - master_dcb, - NULL, - master_dcb->session, - gwbuf_clone(querybuf)); - - LOGIF(LT, tracelog_routed_query( - router_cli_ses, - "routeQuery", - slave_dcb, - gwbuf_clone(querybuf))); - - slave_dcb->func.auth( - slave_dcb, - NULL, - master_dcb->session, - querybuf); - break; - - case COM_QUIT: - case COM_QUERY: - /** - * 1. Create new property of type RSES_PROP_TYPE_SESCMD. - * 2. Add property to the ROUTER_CLIENT_SES struct of - * this router session. - * 3. For each backend, and for each non-executed - * sescmd: - * call execution of current sescmd in - * all backends as long as both have executed - * them all. - * Execution call is dcb->func.session. - * All sescmds are executed when its return value is - * NULL, otherwise it is a pointer to next property. - */ - prop = rses_property_init(RSES_PROP_TYPE_SESCMD); - /** - * Additional reference is created to querybuf to - * prevent it from being released before properties - * are cleaned up as a part of router sessionclean-up. - */ - mysql_sescmd_init(prop, querybuf, router_cli_ses); - - /** Lock router session */ - if (!rses_begin_locked_router_action(router_cli_ses)) - { - rses_property_done(prop); - goto return_ret; - } - /** Add sescmd property to router client session */ - rses_property_add(router_cli_ses, prop); - - /** Execute session command in master */ - if (execute_sescmd_in_backend(router_cli_ses, BE_MASTER)) - { - ret = 1; - } - else - { - /** Log error */ - } - /** Execute session command in slave */ - if (execute_sescmd_in_backend(router_cli_ses, BE_SLAVE)) - { - ret = 1; - } - else - { - /** Log error */ - } - - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); - break; - - default: - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "routeQuery", - master_dcb, - gwbuf_clone(querybuf))); - ret = master_dcb->func.write(master_dcb, - (void *)gwbuf_clone(querybuf)); - - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "routeQuery", - slave_dcb, - gwbuf_clone(querybuf))); - - slave_dcb->func.write(slave_dcb, (void *)querybuf); - break; - } /**< switch by packet type */ - + prop = rses_property_init(RSES_PROP_TYPE_SESCMD); + /** + * Additional reference is created to querybuf to + * prevent it from being released before properties + * are cleaned up as a part of router sessionclean-up. + */ + mysql_sescmd_init(prop, querybuf, packet_type, router_cli_ses); + + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + rses_property_done(prop); + goto return_ret; + } + /** Add sescmd property to router client session */ + rses_property_add(router_cli_ses, prop); + + /** Execute session command in master */ + if (execute_sescmd_in_backend(router_cli_ses, BE_MASTER)) + { + ret = 1; + } + else + { + /** Log error */ + } + /** Execute session command in slave */ + if (execute_sescmd_in_backend(router_cli_ses, BE_SLAVE)) + { + ret = 1; + } + else + { + /** Log error */ + } + + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); + atomic_add(&inst->stats.n_all, 1); goto return_ret; break; @@ -1066,8 +1022,9 @@ static void clientReply( */ if (!rses_begin_locked_router_action(router_cli_ses)) { - /** is this needed ??*/ - gwbuf_consume(writebuf, gwbuf_length(writebuf)); + while ((writebuf = gwbuf_consume( + writebuf, + GWBUF_LENGTH(writebuf))) != NULL); goto lock_failed; } master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; @@ -1091,7 +1048,9 @@ static void clientReply( */ if (client_dcb == NULL) { - gwbuf_consume(writebuf, gwbuf_length(writebuf)); + while ((writebuf = gwbuf_consume( + writebuf, + GWBUF_LENGTH(writebuf))) != NULL); /** Log that client was closed before reply */ return; } @@ -1111,7 +1070,7 @@ static void clientReply( goto lock_failed; } scur = rses_get_sescmd_cursor(router_cli_ses, be_type); - /** + /** * Active cursor means that reply is from session command * execution. */ @@ -1134,6 +1093,7 @@ static void clientReply( } /** Set cursor passive. */ sescmd_cursor_set_active(scur, false); + ss_dassert(!sescmd_cursor_is_active(scur)); /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); @@ -1142,6 +1102,7 @@ static void clientReply( { /** Write reply to client DCB */ client_dcb->func.write(client_dcb, writebuf); + ss_dassert(!sescmd_cursor_is_active(scur)); /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); LOGIF(LT, (skygw_log_write_flush( @@ -1150,12 +1111,10 @@ static void clientReply( "backend dcb %p. End of normal reply.", pthread_self(), client_dcb, - backend_dcb))); + backend_dcb))); } - return; /*< succeed */ lock_failed: - /** log that router session couldn't be locked */ - return; + return; } /** @@ -1443,6 +1402,7 @@ static void rses_end_locked_property_action( static mysql_sescmd_t* mysql_sescmd_init ( rses_property_t* rses_prop, GWBUF* sescmd_buf, + unsigned char packet_type, ROUTER_CLIENT_SES* rses) { mysql_sescmd_t* sescmd; @@ -1453,11 +1413,12 @@ static mysql_sescmd_t* mysql_sescmd_init ( sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */ // sescmd->my_sescmd_rsession = rses; #if defined(SS_DEBUG) - sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD; + sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD; sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD; #endif /** Set session command buffer */ - sescmd->my_sescmd_buf = sescmd_buf; + sescmd->my_sescmd_buf = sescmd_buf; + sescmd->my_sescmd_packet_type = packet_type; return sescmd; } @@ -1624,8 +1585,25 @@ static bool execute_sescmd_in_backend( "execute_sescmd_in_backend", dcb, sescmd_cursor_clone_querybuf(scur))); - rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur)); - + switch (scur->scmd_cur_cmd->my_sescmd_packet_type) { + case COM_CHANGE_USER: + rc = dcb->func.auth( + dcb, + NULL, + dcb->session, + sescmd_cursor_clone_querybuf(scur)); + break; + + case COM_QUIT: + case COM_QUERY: + case COM_INIT_DB: + default: + rc = dcb->func.write( + dcb, + sescmd_cursor_clone_querybuf(scur)); + break; + } + if (rc != 1) { succp = false; @@ -1685,7 +1663,6 @@ static bool cont_exec_sescmd_in_backend( dcb, sescmd_cursor_clone_querybuf(scur))); -// rc = dcb->func.session(dcb, sescmd_cursor_clone_querybuf(scur)); rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur)); if (rc != 1) {