From 90f701be8e980bc41b7b4633d7486a7c406e01c7 Mon Sep 17 00:00:00 2001 From: VilhoRaatikka Date: Sun, 16 Mar 2014 19:43:49 +0200 Subject: [PATCH] readwritesplit.c : router is changed so that it guarantees to keep the execution order of session commands and queries when they are routed to backend servers. In the same way it maintains the order of response packets and discards duplicate responses. For each session command a sescmd property is created and added to the end of list of session commands. List is owned by router client session and it includes all session commands from the beginning of router session. Router maintains an individual session command cursor for each backend. A cursor refers to the first session command which the corresponding backend server haven't yet responded yet. When response message arrives at any time from backend, first it is checked whether backend's cursor is active. Cursor is active if a session command is routed to backend and the backend haven't responded to it yet. If cursor is active, next it is checked whether the current session command property has been responded by other backend. If both are true, then response message is sent to client as is. If session command response is routed to client already, the arrived response is discarded. --- server/core/dcb.c | 10 +- .../include/mysql_client_server_protocol.h | 1 + server/modules/protocol/mysql_backend.c | 4 - server/modules/protocol/mysql_client.c | 48 --- server/modules/protocol/mysql_common.c | 54 ++++ .../routing/readwritesplit/readwritesplit.c | 276 ++++++++---------- 6 files changed, 182 insertions(+), 211 deletions(-) diff --git a/server/core/dcb.c b/server/core/dcb.c index d1b5a4ffb..7c648ccd6 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -768,6 +768,8 @@ dcb_write(DCB *dcb, GWBUF *queue) saved_errno = errno; errno = 0; + if (LOG_IS_ENABLED(LOGFILE_DEBUG)) + { if (saved_errno == EPIPE) { LOGIF(LD, (skygw_log_write( LOGFILE_DEBUG, @@ -780,7 +782,12 @@ dcb_write(DCB *dcb, GWBUF *queue) dcb->fd, saved_errno, strerror(saved_errno)))); - } else if (saved_errno != EAGAIN && + } + } + if (LOG_IS_ENABLED(LOGFILE_ERROR)) + { + if (saved_errno != EPIPE && + saved_errno != EAGAIN && saved_errno != EWOULDBLOCK) { LOGIF(LE, (skygw_log_write_flush( @@ -794,6 +801,7 @@ dcb_write(DCB *dcb, GWBUF *queue) saved_errno, strerror(saved_errno)))); } + } break; } /* diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index e23519da3..7dd6517e0 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -296,3 +296,4 @@ char *gw_strend(register const char *s); int setnonblocking(int fd); void setipaddress(struct in_addr *a, char *p); int gw_read_gwbuff(DCB *dcb, GWBUF **head, int b); +GWBUF* gw_MySQL_get_next_stmt(GWBUF** p_readbuf); diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 78216d62f..45457e665 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -567,9 +567,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) /*< * Now we set the last command received, from the current queue */ -// memcpy(&dcb->command, &queue->command, sizeof(dcb->command)); spinlock_release(&dcb->authlock); -// LOGIF(LD, debuglog_statements(dcb, gwbuf_clone(queue))); rc = dcb_write(dcb, queue); return rc; } @@ -847,7 +845,6 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB unsigned int auth_token_len = 0; uint8_t *auth_token = NULL; int rv = -1; - int len = 0; int auth_ret = 1; current_session = (MYSQL_session *)in_session->client->data; @@ -919,7 +916,6 @@ static int gw_session(DCB *backend_dcb, void *data) { GWBUF *queue = NULL; queue = (GWBUF *) data; -// queue->command = ROUTER_CHANGE_SESSION; backend_dcb->func.write(backend_dcb, queue); return 1; diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index ecea34972..fbac8853b 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -57,7 +57,6 @@ static int route_by_statement( ROUTER_OBJECT* router, void* rsession, GWBUF* read_buf); -static GWBUF* gw_MySQL_get_next_stmt(GWBUF** buffer); /* * The "module object" for the mysqld client protocol module. @@ -1248,51 +1247,6 @@ return_rc: return rc; } -/** - * Remove the first mysql statement from buffer. Return pointer to the removed - * statement or NULL if buffer is empty. - * - * Clone buf, calculate the length of included mysql stmt, and point the - * statement with cloned buffer. Move the start pointer of buf accordingly - * so that it only cover the remaining buffer. - * - */ -static GWBUF* gw_MySQL_get_next_stmt( - GWBUF** p_readbuf) -{ - GWBUF* stmtbuf; - size_t buflen; - size_t strlen; - uint8_t* packet; - - if (*p_readbuf == NULL) - { - stmtbuf = NULL; - goto return_stmtbuf; - } - CHK_GWBUF(*p_readbuf); - - if (GWBUF_EMPTY(*p_readbuf)) - { - stmtbuf = NULL; - goto return_stmtbuf; - } - buflen = GWBUF_LENGTH((*p_readbuf)); - packet = GWBUF_DATA((*p_readbuf)); - strlen = MYSQL_GET_PACKET_LEN(packet); - - /** vraa :Multi-packet stmt is not supported as of 7.3.14 */ - if (strlen-1 > buflen-5) - { - stmtbuf = NULL; - goto return_stmtbuf; - } - stmtbuf = gwbuf_clone_portion(*p_readbuf, 0, strlen+4); - *p_readbuf = gwbuf_consume(*p_readbuf, strlen+4); - -return_stmtbuf: - return stmtbuf; -} /** * Detect if buffer includes partial mysql packet or multiple packets. @@ -1318,13 +1272,11 @@ static int route_by_statement( CHK_GWBUF(stmtbuf); payload = (uint8_t *)GWBUF_DATA(stmtbuf); - len += MYSQL_GET_PACKET_LEN(payload); /** * If message is longer than read data, suspend routing and * add statement buffer to wait queue. */ rc = router->routeQuery(router_instance, rsession, stmtbuf); - len = 0; /*< if routed, reset the length indicator */ } while (readbuf != NULL); diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index 7bfb3666c..b208da9c7 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -1209,3 +1209,57 @@ mysql_send_auth_error (DCB *dcb, int packet_number, int in_affected_rows, const return sizeof(mysql_packet_header) + mysql_payload_size; } + + +/** + * Remove the first mysql statement from buffer. Return pointer to the removed + * statement or NULL if buffer is empty. + * + * Clone buf, calculate the length of included mysql stmt, and point the + * statement with cloned buffer. Move the start pointer of buf accordingly + * so that it only cover the remaining buffer. + * + */ +GWBUF* gw_MySQL_get_next_stmt( + GWBUF** p_readbuf) +{ + GWBUF* stmtbuf; + size_t buflen; + size_t strlen; + uint8_t* packet; + + if (*p_readbuf == NULL) + { + stmtbuf = NULL; + goto return_stmtbuf; + } + CHK_GWBUF(*p_readbuf); + + if (GWBUF_EMPTY(*p_readbuf)) + { + stmtbuf = NULL; + goto return_stmtbuf; + } + buflen = GWBUF_LENGTH((*p_readbuf)); + packet = GWBUF_DATA((*p_readbuf)); + strlen = MYSQL_GET_PACKET_LEN(packet); + + if (strlen+4 == buflen) + { + stmtbuf = *p_readbuf; + *p_readbuf = NULL; + goto return_stmtbuf; + } + /** vraa :Multi-packet stmt is not supported as of 7.3.14 */ + if (strlen-1 > buflen-5) + { + stmtbuf = NULL; + goto return_stmtbuf; + } + stmtbuf = gwbuf_clone_portion(*p_readbuf, 0, strlen+4); + *p_readbuf = gwbuf_consume(*p_readbuf, strlen+4); + +return_stmtbuf: + return stmtbuf; +} + diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 1548eac57..42de6a8fa 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -677,43 +677,6 @@ static int routeQuery( switch (qtype) { case QUERY_TYPE_WRITE: -#if 0 - /** - * Running this block cause deadlock because read mutex is - * on hold. This doesn't serialize subsequent session commands - * and queries if there are multiple session commands and other - * backend starts to lag behind. vraa : 14.3.13 - */ - /** - * Wait until master has executed all its session commands. - * TODO: if master fails it needs to be detected in the loop. - */ - if (!rses_begin_locked_router_action(router_cli_ses)) - { - goto return_ret; - } - - while (sescmd_cursor_is_active(rses_get_sescmd_cursor( - router_cli_ses, - BE_MASTER))) - { - rses_end_locked_router_action(router_cli_ses); - - LOGIF(LD, (skygw_log_write( - LOGFILE_DEBUG, - "%lu [routeQuery:rwsplit] Session command is " - "active in MASTER. Waiting in loop.", - pthread_self()))); - - usleep(10); - - if (!rses_begin_locked_router_action(router_cli_ses)) - { - goto return_ret; - } - } - rses_end_locked_router_action(router_cli_ses); -#endif LOGIF(LT, (skygw_log_write( LOGFILE_TRACE, "%lu [routeQuery:rwsplit] Query type\t%s, " @@ -733,53 +696,24 @@ static int routeQuery( break; case QUERY_TYPE_READ: - LOGIF(LT, (skygw_log_write( + LOGIF(LT, (skygw_log_write_flush( LOGFILE_TRACE, "%lu [routeQuery:rwsplit] Query type\t%s, " "routing to Slave.", pthread_self(), STRQTYPE(qtype)))); - while (true) - { - if (!rses_begin_locked_router_action(router_cli_ses)) - { - goto return_ret; - } - /** - * If session command is being executed in slave - * route to master. - */ - if (!sescmd_cursor_is_active(rses_get_sescmd_cursor( - router_cli_ses, - BE_SLAVE))) - { - rses_end_locked_router_action(router_cli_ses); - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "routeQuery", - slave_dcb, - gwbuf_clone(querybuf))); - - ret = slave_dcb->func.write(slave_dcb, querybuf); - atomic_add(&inst->stats.n_slave, 1); - break; - } - else if (!sescmd_cursor_is_active(rses_get_sescmd_cursor( - router_cli_ses, - BE_MASTER))) - - { - rses_end_locked_router_action(router_cli_ses); - LOGIF(LT, tracelog_routed_query(router_cli_ses, - "routeQuery", - master_dcb, - gwbuf_clone(querybuf))); - - ret = slave_dcb->func.write(master_dcb, querybuf); - atomic_add(&inst->stats.n_master, 1); - break; - } - rses_end_locked_router_action(router_cli_ses); - } /*< while (true) */ + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "routeQuery", + slave_dcb, + gwbuf_clone(querybuf))); + ret = slave_dcb->func.write(slave_dcb, querybuf); + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "%lu [routeQuery:rwsplit] Routed.", + pthread_self()))); + + + atomic_add(&inst->stats.n_slave, 1); goto return_ret; break; @@ -807,7 +741,26 @@ static int routeQuery( slave_dcb, STRQTYPE(qtype), STRPACKETTYPE(packet_type)))); + /** + * COM_QUIT is one-way message. Server doesn't respond to that. + * Therefore reply processing is unnecessary and session + * command property is not needed. It is just routed to both + * backends. + */ + if (packet_type == COM_QUIT) + { + int rc; + int rc2; + rc = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf)); + rc2 = slave_dcb->func.write(slave_dcb, gwbuf_clone(querybuf)); + + if (rc == 1 && rc == rc2) + { + ret = 1; + } + goto return_ret; + } prop = rses_property_init(RSES_PROP_TYPE_SESCMD); /** * Additional reference is created to querybuf to @@ -1063,6 +1016,11 @@ static void clientReply( { be_type = BE_SLAVE; } + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "reply_by_statement", + backend_dcb, + gwbuf_clone(writebuf))); + /** Lock router session */ if (!rses_begin_locked_router_action(router_cli_ses)) { @@ -1076,27 +1034,15 @@ static void clientReply( */ if (sescmd_cursor_is_active(scur)) { - mysql_sescmd_t* scmd = sescmd_cursor_get_command(scur); - sescmd_reply_to_client(client_dcb, scmd, writebuf); - - /** Read next sescmd property */ - while (sescmd_cursor_next(scur)) - { - if (!cont_exec_sescmd_in_backend(router_cli_ses, be_type)) - { - /** Log error */ - } - else - { - /** Log execution of pending sescmd */ - } - } - /** Set cursor passive. */ - sescmd_cursor_set_active(scur, false); - ss_dassert(!sescmd_cursor_is_active(scur)); - - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); + mysql_sescmd_t* scmd = sescmd_cursor_get_command(scur); + sescmd_reply_to_client(client_dcb, scmd, writebuf); + /** When sescmd list is empty set cursor passive. */ + if (!sescmd_cursor_next(scur)) + { + sescmd_cursor_set_active(scur, false); + } + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); } else if (client_dcb != NULL) { @@ -1411,7 +1357,6 @@ static mysql_sescmd_t* mysql_sescmd_init ( /** Can't call rses_property_get_sescmd with uninitialized sescmd */ sescmd = &rses_prop->rses_prop_data.sescmd; sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */ -// sescmd->my_sescmd_rsession = rses; #if defined(SS_DEBUG) sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD; sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD; @@ -1452,21 +1397,43 @@ static bool sescmd_reply_to_client( CHK_GWBUF(writebuf); ss_dassert(SPINLOCK_IS_LOCKED( &scmd->my_sescmd_prop->rses_prop_rsession->rses_lock)); - - if (!scmd->my_sescmd_is_replied) - { - client_dcb->func.write(client_dcb, writebuf); - scmd->my_sescmd_is_replied = true; - succp = true; - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [sescmd_reply_to_client] Replied to client dcb %p.", - pthread_self(), - client_dcb))); - } - else + /** + * This depends on MySQL protoocl and doesn't work with others. + * TODO: write a function to MySQL protocol module and add + * a new protocol function 'discard n first messages'. + */ + if (scmd->my_sescmd_is_replied) { + size_t len; + uint8_t* packet; + /** + * Skip reply message because it is duplicate of alredy + * replied message. + */ + packet = (uint8_t*)writebuf->start; + len = packet[0]; + len += packet[1]*256; + len += packet[2]*256*256; + writebuf = gwbuf_consume(writebuf, len+4); + + if (writebuf == NULL || GWBUF_EMPTY(writebuf)) + { + succp = true; + goto return_succp; + } } + + client_dcb->func.write(client_dcb, writebuf); + scmd->my_sescmd_is_replied = true; + succp = true; + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "%lu [sescmd_reply_to_client] Replied cmd %p to client dcb %p.", + pthread_self(), + scmd, + client_dcb))); + +return_succp: return succp; } @@ -1575,52 +1542,44 @@ static bool execute_sescmd_in_backend( succp = false; goto return_succp; } - - if (!sescmd_cursor_is_active(scur)) - { - /** Cursor is left active when function returns. */ - sescmd_cursor_set_active(scur, true); - - LOGIF(LT, tracelog_routed_query(rses, - "execute_sescmd_in_backend", - dcb, - sescmd_cursor_clone_querybuf(scur))); - switch (scur->scmd_cur_cmd->my_sescmd_packet_type) { - case COM_CHANGE_USER: - rc = dcb->func.auth( - dcb, - NULL, - dcb->session, - sescmd_cursor_clone_querybuf(scur)); - break; - - case COM_QUIT: - case COM_QUERY: - case COM_INIT_DB: - default: - rc = dcb->func.write( - dcb, - sescmd_cursor_clone_querybuf(scur)); - break; - } - - if (rc != 1) - { - succp = false; - } - } - else + if (!sescmd_cursor_is_active(scur)) { - LOGIF(LD, (skygw_log_write_flush( - LOGFILE_DEBUG, - "%lu [routeQuery] Couldn't directly send SESSION " - "WRITER command to dcb %p because session command " - "cursor was executing previous command. Added " - "command to the queue.", - pthread_self(), - dcb))); + /** Cursor is left active when function returns. */ + sescmd_cursor_set_active(scur, true); } + LOGIF(LT, tracelog_routed_query(rses, + "execute_sescmd_in_backend", + dcb, + sescmd_cursor_clone_querybuf(scur))); + switch (scur->scmd_cur_cmd->my_sescmd_packet_type) { + case COM_CHANGE_USER: + rc = dcb->func.auth( + dcb, + NULL, + dcb->session, + sescmd_cursor_clone_querybuf(scur)); + break; + case COM_QUIT: + case COM_QUERY: + case COM_INIT_DB: + default: + rc = dcb->func.write( + dcb, + sescmd_cursor_clone_querybuf(scur)); + break; + } + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "%lu [execute_sescmd_in_backend] Routed %s cmd %p.", + pthread_self(), + STRPACKETTYPE(scur->scmd_cur_cmd->my_sescmd_packet_type), + scur->scmd_cur_cmd))); + + if (rc != 1) + { + succp = false; + } return_succp: return succp; } @@ -1664,7 +1623,8 @@ static bool cont_exec_sescmd_in_backend( sescmd_cursor_clone_querybuf(scur))); rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur)); - if (rc != 1) + + if (rc != 1) { succp = false; }