diff --git a/server/core/dcb.c b/server/core/dcb.c index d1b5a4ffb..7c648ccd6 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -768,6 +768,8 @@ dcb_write(DCB *dcb, GWBUF *queue) saved_errno = errno; errno = 0; + if (LOG_IS_ENABLED(LOGFILE_DEBUG)) + { if (saved_errno == EPIPE) { LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, @@ -780,7 +782,12 @@ dcb_write(DCB *dcb, GWBUF *queue) dcb->fd, saved_errno, strerror(saved_errno)))); - } else if (saved_errno != EAGAIN && + } + } + if (LOG_IS_ENABLED(LOGFILE_ERROR)) + { + if (saved_errno != EPIPE && + saved_errno != EAGAIN && saved_errno != EWOULDBLOCK) { LOGIF(LE, (skygw_log_write_flush( @@ -794,6 +801,7 @@ dcb_write(DCB *dcb, GWBUF *queue) saved_errno, strerror(saved_errno)))); } + } break; } /* diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index e23519da3..7dd6517e0 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -296,3 +296,4 @@ char *gw_strend(register const char *s); int setnonblocking(int fd); void setipaddress(struct in_addr *a, char *p); int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b); +GWBUF* gw_MySQL_get_next_stmt(GWBUF** p_readbuf); diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 78216d62f..45457e665 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -567,9 +567,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) /*< * Now we set the last command received, from the current queue */ -// memcpy(&dcb->command, &queue->command, sizeof(dcb->command)); spinlock_release(&dcb->authlock); -// LOGIF(LD, debuglog_statements(dcb, gwbuf_clone(queue))); rc = dcb_write(dcb, queue); return rc; } @@ -847,7 +845,6 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB unsigned int auth_token_len = 0; uint8_t *auth_token = NULL; int rv = -1; - int len = 0; int auth_ret = 1; current_session = (MYSQL_session *)in_session->client->data; @@ -919,7 +916,6 @@ static int gw_session(DCB *backend_dcb, void *data) { GWBUF *queue = NULL; queue = (GWBUF *) data; -// queue->command = ROUTER_CHANGE_SESSION; backend_dcb->func.write(backend_dcb, queue); return 1; diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index ecea34972..fbac8853b 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -57,7 +57,6 @@ static int route_by_statement( ROUTER_OBJECT* router, void* rsession, GWBUF* read_buf); -static GWBUF* gw_MySQL_get_next_stmt(GWBUF** buffer); /* * The "module object" for the mysqld client protocol module. @@ -1248,51 +1247,6 @@ return_rc: return rc; } -/** - * Remove the first mysql statement from buffer. Return pointer to the removed - * statement or NULL if buffer is empty. - * - * Clone buf, calculate the length of included mysql stmt, and point the - * statement with cloned buffer. Move the start pointer of buf accordingly - * so that it only cover the remaining buffer. - * - */ -static GWBUF* gw_MySQL_get_next_stmt( - GWBUF** p_readbuf) -{ - GWBUF* stmtbuf; - size_t buflen; - size_t strlen; - uint8_t* packet; - - if (*p_readbuf == NULL) - { - stmtbuf = NULL; - goto return_stmtbuf; - } - CHK_GWBUF(*p_readbuf); - - if (GWBUF_EMPTY(*p_readbuf)) - { - stmtbuf = NULL; - goto return_stmtbuf; - } - buflen = GWBUF_LENGTH((*p_readbuf)); - packet = GWBUF_DATA((*p_readbuf)); - strlen = MYSQL_GET_PACKET_LEN(packet); - - /** vraa :Multi-packet stmt is not supported as of 7.3.14 */ - if (strlen-1 > buflen-5) - { - stmtbuf = NULL; - goto return_stmtbuf; - } - stmtbuf = gwbuf_clone_portion(*p_readbuf, 0, strlen+4); - *p_readbuf = gwbuf_consume(*p_readbuf, strlen+4); - -return_stmtbuf: - return stmtbuf; -} /** * Detect if buffer includes partial mysql packet or multiple packets. @@ -1318,13 +1272,11 @@ static int route_by_statement( CHK_GWBUF(stmtbuf); payload = (uint8_t *)GWBUF_DATA(stmtbuf); - len += MYSQL_GET_PACKET_LEN(payload); /** * If message is longer than read data, suspend routing and * add statement buffer to wait queue. */ rc = router->routeQuery(router_instance, rsession, stmtbuf); - len = 0; /*< if routed, reset the length indicator */ } while (readbuf != NULL); diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index 7bfb3666c..b208da9c7 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -1209,3 +1209,57 @@ mysql_send_auth_error (DCB *dcb, int packet_number, int in_affected_rows, const return sizeof(mysql_packet_header) + mysql_payload_size; } + + +/** + * Remove the first mysql statement from buffer. Return pointer to the removed + * statement or NULL if buffer is empty. + * + * Clone buf, calculate the length of included mysql stmt, and point the + * statement with cloned buffer. Move the start pointer of buf accordingly + * so that it only cover the remaining buffer. + * + */ +GWBUF* gw_MySQL_get_next_stmt( + GWBUF** p_readbuf) +{ + GWBUF* stmtbuf; + size_t buflen; + size_t strlen; + uint8_t* packet; + + if (*p_readbuf == NULL) + { + stmtbuf = NULL; + goto return_stmtbuf; + } + CHK_GWBUF(*p_readbuf); + + if (GWBUF_EMPTY(*p_readbuf)) + { + stmtbuf = NULL; + goto return_stmtbuf; + } + buflen = GWBUF_LENGTH((*p_readbuf)); + packet = GWBUF_DATA((*p_readbuf)); + strlen = MYSQL_GET_PACKET_LEN(packet); + + if (strlen+4 == buflen) + { + stmtbuf = *p_readbuf; + *p_readbuf = NULL; + goto return_stmtbuf; + } + /** vraa :Multi-packet stmt is not supported as of 7.3.14 */ + if (strlen-1 > buflen-5) + { + stmtbuf = NULL; + goto return_stmtbuf; + } + stmtbuf = gwbuf_clone_portion(*p_readbuf, 0, strlen+4); + *p_readbuf = gwbuf_consume(*p_readbuf, strlen+4); + +return_stmtbuf: + return stmtbuf; +} + diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 1548eac57..42de6a8fa 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -677,43 +677,6 @@ static int routeQuery( switch (qtype) { case QUERY_TYPE_WRITE: -#if 0 - /** - * Running this block cause deadlock because read mutex is - * on hold. This doesn't serialize subsequent session commands - * and queries if there are multiple session commands and other - * backend starts to lag behind. vraa : 14.3.13 - */ - /** - * Wait until master has executed all its session commands. - * TODO: if master fails it needs to be detected in the loop. - */ - if (!rses_begin_locked_router_action(router_cli_ses)) - { - goto return_ret; - } - - while (sescmd_cursor_is_active(rses_get_sescmd_cursor( - router_cli_ses, - BE_MASTER))) - { - rses_end_locked_router_action(router_cli_ses); - - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [routeQuery:rwsplit] Session command is " - "active in MASTER. Waiting in loop.", - pthread_self()))); - - usleep(10); - - if (!rses_begin_locked_router_action(router_cli_ses)) - { - goto return_ret; - } - } - rses_end_locked_router_action(router_cli_ses); -#endif LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "%lu [routeQuery:rwsplit] Query type\t%s, " @@ -733,53 +696,24 @@ static int routeQuery( break; case QUERY_TYPE_READ: - LOGIF(LT, (skygw_log_write( + LOGIF(LT, (skygw_log_write_flush( LOGFILE_TRACE, "%lu [routeQuery:rwsplit] Query type\t%s, " "routing to Slave.", pthread_self(), STRQTYPE(qtype)))); - while (true) - { - if (!rses_begin_locked_router_action(router_cli_ses)) - { - goto return_ret; - } - /** - * If session command is being executed in slave - * route to master. - */ - if (!sescmd_cursor_is_active(rses_get_sescmd_cursor( - router_cli_ses, - BE_SLAVE))) - { - rses_end_locked_router_action(router_cli_ses); - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "routeQuery", - slave_dcb, - gwbuf_clone(querybuf))); - - ret = slave_dcb->func.write(slave_dcb, querybuf); - atomic_add(&inst->stats.n_slave, 1); - break; - } - else if (!sescmd_cursor_is_active(rses_get_sescmd_cursor( - router_cli_ses, - BE_MASTER))) - - { - rses_end_locked_router_action(router_cli_ses); - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "routeQuery", - master_dcb, - gwbuf_clone(querybuf))); - - ret = slave_dcb->func.write(master_dcb, querybuf); - atomic_add(&inst->stats.n_master, 1); - break; - } - rses_end_locked_router_action(router_cli_ses); - } /*< while (true) */ + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "routeQuery", + slave_dcb, + gwbuf_clone(querybuf))); + ret = slave_dcb->func.write(slave_dcb, querybuf); + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "%lu [routeQuery:rwsplit] Routed.", + pthread_self()))); + + + atomic_add(&inst->stats.n_slave, 1); goto return_ret; break; @@ -807,7 +741,26 @@ static int routeQuery( slave_dcb, STRQTYPE(qtype), STRPACKETTYPE(packet_type)))); + /** + * COM_QUIT is one-way message. Server doesn't respond to that. + * Therefore reply processing is unnecessary and session + * command property is not needed. It is just routed to both + * backends. + */ + if (packet_type == COM_QUIT) + { + int rc; + int rc2; + rc = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf)); + rc2 = slave_dcb->func.write(slave_dcb, gwbuf_clone(querybuf)); + + if (rc == 1 && rc == rc2) + { + ret = 1; + } + goto return_ret; + } prop = rses_property_init(RSES_PROP_TYPE_SESCMD); /** * Additional reference is created to querybuf to @@ -1063,6 +1016,11 @@ static void clientReply( { be_type = BE_SLAVE; } + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "reply_by_statement", + backend_dcb, + gwbuf_clone(writebuf))); + /** Lock router session */ if (!rses_begin_locked_router_action(router_cli_ses)) { @@ -1076,27 +1034,15 @@ static void clientReply( */ if (sescmd_cursor_is_active(scur)) { - mysql_sescmd_t* scmd = sescmd_cursor_get_command(scur); - sescmd_reply_to_client(client_dcb, scmd, writebuf); - - /** Read next sescmd property */ - while (sescmd_cursor_next(scur)) - { - if (!cont_exec_sescmd_in_backend(router_cli_ses, be_type)) - { - /** Log error */ - } - else - { - /** Log execution of pending sescmd */ - } - } - /** Set cursor passive. */ - sescmd_cursor_set_active(scur, false); - ss_dassert(!sescmd_cursor_is_active(scur)); - - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); + mysql_sescmd_t* scmd = sescmd_cursor_get_command(scur); + sescmd_reply_to_client(client_dcb, scmd, writebuf); + /** When sescmd list is empty set cursor passive. */ + if (!sescmd_cursor_next(scur)) + { + sescmd_cursor_set_active(scur, false); + } + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); } else if (client_dcb != NULL) { @@ -1411,7 +1357,6 @@ static mysql_sescmd_t* mysql_sescmd_init ( /** Can't call rses_property_get_sescmd with uninitialized sescmd */ sescmd = &rses_prop->rses_prop_data.sescmd; sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */ -// sescmd->my_sescmd_rsession = rses; #if defined(SS_DEBUG) sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD; sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD; @@ -1452,21 +1397,43 @@ static bool sescmd_reply_to_client( CHK_GWBUF(writebuf); ss_dassert(SPINLOCK_IS_LOCKED( &scmd->my_sescmd_prop->rses_prop_rsession->rses_lock)); - - if (!scmd->my_sescmd_is_replied) - { - client_dcb->func.write(client_dcb, writebuf); - scmd->my_sescmd_is_replied = true; - succp = true; - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [sescmd_reply_to_client] Replied to client dcb %p.", - pthread_self(), - client_dcb))); - } - else + /** + * This depends on MySQL protoocl and doesn't work with others. + * TODO: write a function to MySQL protocol module and add + * a new protocol function 'discard n first messages'. + */ + if (scmd->my_sescmd_is_replied) { + size_t len; + uint8_t* packet; + /** + * Skip reply message because it is duplicate of alredy + * replied message. + */ + packet = (uint8_t*)writebuf->start; + len = packet[0]; + len += packet[1]*256; + len += packet[2]*256*256; + writebuf = gwbuf_consume(writebuf, len+4); + + if (writebuf == NULL || GWBUF_EMPTY(writebuf)) + { + succp = true; + goto return_succp; + } } + + client_dcb->func.write(client_dcb, writebuf); + scmd->my_sescmd_is_replied = true; + succp = true; + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "%lu [sescmd_reply_to_client] Replied cmd %p to client dcb %p.", + pthread_self(), + scmd, + client_dcb))); + +return_succp: return succp; } @@ -1575,52 +1542,44 @@ static bool execute_sescmd_in_backend( succp = false; goto return_succp; } - - if (!sescmd_cursor_is_active(scur)) - { - /** Cursor is left active when function returns. */ - sescmd_cursor_set_active(scur, true); - - LOGIF(LT, tracelog_routed_query(rses, - "execute_sescmd_in_backend", - dcb, - sescmd_cursor_clone_querybuf(scur))); - switch (scur->scmd_cur_cmd->my_sescmd_packet_type) { - case COM_CHANGE_USER: - rc = dcb->func.auth( - dcb, - NULL, - dcb->session, - sescmd_cursor_clone_querybuf(scur)); - break; - - case COM_QUIT: - case COM_QUERY: - case COM_INIT_DB: - default: - rc = dcb->func.write( - dcb, - sescmd_cursor_clone_querybuf(scur)); - break; - } - - if (rc != 1) - { - succp = false; - } - } - else + if (!sescmd_cursor_is_active(scur)) { - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [routeQuery] Couldn't directly send SESSION " - "WRITER command to dcb %p because session command " - "cursor was executing previous command. Added " - "command to the queue.", - pthread_self(), - dcb))); + /** Cursor is left active when function returns. */ + sescmd_cursor_set_active(scur, true); } + LOGIF(LT, tracelog_routed_query(rses, + "execute_sescmd_in_backend", + dcb, + sescmd_cursor_clone_querybuf(scur))); + switch (scur->scmd_cur_cmd->my_sescmd_packet_type) { + case COM_CHANGE_USER: + rc = dcb->func.auth( + dcb, + NULL, + dcb->session, + sescmd_cursor_clone_querybuf(scur)); + break; + case COM_QUIT: + case COM_QUERY: + case COM_INIT_DB: + default: + rc = dcb->func.write( + dcb, + sescmd_cursor_clone_querybuf(scur)); + break; + } + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "%lu [execute_sescmd_in_backend] Routed %s cmd %p.", + pthread_self(), + STRPACKETTYPE(scur->scmd_cur_cmd->my_sescmd_packet_type), + scur->scmd_cur_cmd))); + + if (rc != 1) + { + succp = false; + } return_succp: return succp; } @@ -1664,7 +1623,8 @@ static bool cont_exec_sescmd_in_backend( sescmd_cursor_clone_querybuf(scur))); rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur)); - if (rc != 1) + + if (rc != 1) { succp = false; }