diff --git a/log_manager/log_manager.cc b/log_manager/log_manager.cc index c8f96266e..733e9fffe 100644 --- a/log_manager/log_manager.cc +++ b/log_manager/log_manager.cc @@ -255,11 +255,7 @@ static int logmanager_write_log( static blockbuf_t* blockbuf_init(logfile_id_t id); static void blockbuf_node_done(void* bb_data); static char* blockbuf_get_writepos( -#if 0 - int** refcount, -#else blockbuf_t** p_bb, -#endif logfile_id_t id, size_t str_len, bool flush); @@ -653,7 +649,16 @@ static int logmanager_write_log( int safe_str_len; timestamp_len = get_timestamp_len(); - safe_str_len = MIN(timestamp_len-1+str_len, lf->lf_buf_size); + + /** Findout how much can be safely written with current block size */ + if (timestamp_len-1+str_len > lf->lf_buf_size) + { + safe_str_len = lf->lf_buf_size; + } + else + { + safe_str_len = timestamp_len-1+str_len; + } /** * Seek write position and register to block buffer. * Then print formatted string to write position. @@ -673,9 +678,9 @@ static int logmanager_write_log( * of the timestamp string. */ if (use_valist) { - vsnprintf(wp+timestamp_len, safe_str_len, str, valist); + vsnprintf(wp+timestamp_len, safe_str_len-timestamp_len, str, valist); } else { - snprintf(wp+timestamp_len, safe_str_len, "%s", str); + snprintf(wp+timestamp_len, safe_str_len-timestamp_len, "%s", str); } /** write to syslog */ @@ -694,12 +699,7 @@ static int logmanager_write_log( break; } } - - /** remove double line feed */ - if (wp[timestamp_len+str_len-2] == '\n') { - wp[timestamp_len+str_len-2]=' '; - } - wp[timestamp_len+str_len-1]='\n'; + wp[safe_str_len-1] = '\n'; blockbuf_unregister(bb); /** diff --git a/server/include/dcb.h b/server/include/dcb.h index 88964fc31..4f8be99d4 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -145,9 +145,9 @@ typedef enum { DCB_STATE_POLLING, /*< Waiting in the poll loop */ DCB_STATE_LISTENING, /*< The DCB is for a listening socket */ DCB_STATE_DISCONNECTED, /*< The socket is now closed */ - DCB_STATE_FREED, /*< Memory freed */ DCB_STATE_NOPOLLING, /*< Removed from poll mask */ - DCB_STATE_ZOMBIE /*< DCB is no longer active, waiting to free it */ + DCB_STATE_ZOMBIE, /*< DCB is no longer active, waiting to free it */ + DCB_STATE_FREED /*< Memory freed */ } dcb_state_t; typedef enum { diff --git a/server/modules/include/mysql_client_server_protocol.h b/server/modules/include/mysql_client_server_protocol.h index 9e2147d05..c875a1232 100644 --- a/server/modules/include/mysql_client_server_protocol.h +++ b/server/modules/include/mysql_client_server_protocol.h @@ -102,6 +102,12 @@ typedef enum { MYSQL_IDLE } mysql_auth_state_t; +typedef enum { + MYSQL_PROTOCOL_ALLOC, + MYSQL_PROTOCOL_ACTIVE, + MYSQL_PROTOCOL_DONE +} mysql_protocol_state_t; + /* * MySQL session specific data @@ -270,6 +276,7 @@ typedef struct { server_command_t protocol_command; /*< session command list */ server_command_t* protocol_cmd_history; /*< session command history */ mysql_auth_state_t protocol_auth_state; /*< Authentication status */ + mysql_protocol_state_t protocol_state; /*< Protocol struct status */ uint8_t scramble[MYSQL_SCRAMBLE_LEN]; /*< server scramble, * created or received */ uint32_t server_capabilities; /*< server capabilities, diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index 5d8088d4e..903c17022 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -762,12 +762,13 @@ return_rc: */ static int gw_error_backend_event(DCB *dcb) { - SESSION* session; - void* rsession; - ROUTER_OBJECT* router; - ROUTER* router_instance; - GWBUF* errbuf; - bool succp; + SESSION* session; + void* rsession; + ROUTER_OBJECT* router; + ROUTER* router_instance; + GWBUF* errbuf; + bool succp; + session_state_t ses_state; CHK_DCB(dcb); session = dcb->session; @@ -787,7 +788,7 @@ static int gw_error_backend_event(DCB *dcb) * closed by router and COM_QUIT sent or there was an error which * have already been handled. */ - if (dcb->session != DCB_STATE_POLLING) + if (dcb->state != DCB_STATE_POLLING) { return 1; } @@ -796,6 +797,29 @@ static int gw_error_backend_event(DCB *dcb) 0, "Lost connection to backend server."); + spinlock_acquire(&session->ses_lock); + ses_state = session->state; + spinlock_release(&session->ses_lock); + + /** + * Session might be initialized when DCB already is in the poll set. + * Thus hangup can occur in the middle of session initialization. + * Only complete and successfully initialized sessions allow for + * calling error handler. + */ + while (ses_state == SESSION_STATE_READY) + { + spinlock_acquire(&session->ses_lock); + ses_state = session->state; + spinlock_release(&session->ses_lock); + } + + if (ses_state != SESSION_STATE_ROUTER_READY) + { + gwbuf_free(errbuf); + goto retblock; + } + router->handleError(router_instance, rsession, errbuf, @@ -811,6 +835,7 @@ static int gw_error_backend_event(DCB *dcb) } dcb_close(dcb); +retblock: return 1; } @@ -924,12 +949,13 @@ return_fd: static int gw_backend_hangup(DCB *dcb) { - SESSION* session; - void* rsession; - ROUTER_OBJECT* router; - ROUTER* router_instance; - bool succp; - GWBUF* errbuf; + SESSION* session; + void* rsession; + ROUTER_OBJECT* router; + ROUTER* router_instance; + bool succp; + GWBUF* errbuf; + session_state_t ses_state; CHK_DCB(dcb); session = dcb->session; @@ -950,7 +976,29 @@ gw_backend_hangup(DCB *dcb) 1, 0, "Lost connection to backend server."); - + + spinlock_acquire(&session->ses_lock); + ses_state = session->state; + spinlock_release(&session->ses_lock); + + /** + * Session might be initialized when DCB already is in the poll set. + * Thus hangup can occur in the middle of session initialization. + * Only complete and successfully initialized sessions allow for + * calling error handler. + */ + while (ses_state == SESSION_STATE_READY) + { + spinlock_acquire(&session->ses_lock); + ses_state = session->state; + spinlock_release(&session->ses_lock); + } + + if (ses_state != SESSION_STATE_ROUTER_READY) + { + gwbuf_free(errbuf); + goto retblock; + } router->handleError(router_instance, rsession, errbuf, @@ -972,7 +1020,8 @@ gw_backend_hangup(DCB *dcb) } dcb_close(dcb); - return 1; +retblock: + return 1; } /** diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index 6cc9027de..fbe0589a5 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -65,7 +65,7 @@ static int gw_client_hangup_event(DCB *dcb); int mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message); int MySQLSendHandshake(DCB* dcb); static int gw_mysql_do_authentication(DCB *dcb, GWBUF *queue); -static int route_by_statement(SESSION *, GWBUF *); +static int route_by_statement(SESSION *, GWBUF **); /* * The "module object" for the mysqld client protocol module. @@ -534,7 +534,7 @@ int gw_read_client_event( if (nbytes_read == 0) { goto return_rc; - } + } /** * if read queue existed appent read to it. * if length of read buffer is less than 3 or less than mysql packet @@ -783,7 +783,7 @@ int gw_read_client_event( * Feed each statement completely and separately * to router. */ - rc = route_by_statement(session, read_buffer); + rc = route_by_statement(session, &read_buffer); if (read_buffer != NULL) { @@ -1383,7 +1383,7 @@ gw_client_hangup_event(DCB *dcb) #if defined(SS_DEBUG) LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR, - "Client hangup error handling."))); + "Client hangup error handling."))); #endif dcb_close(dcb); return 1; @@ -1399,7 +1399,9 @@ gw_client_hangup_event(DCB *dcb) * Return 1 in success. If the last packet is incomplete return success but * leave incomplete packet to readbuf. */ -static int route_by_statement(SESSION *session, GWBUF *readbuf) +static int route_by_statement( + SESSION* session, + GWBUF** p_readbuf) { int rc = -1; GWBUF* packetbuf; @@ -1407,7 +1409,7 @@ static int route_by_statement(SESSION *session, GWBUF *readbuf) gwbuf_type_t prevtype; GWBUF* tmpbuf; - tmpbuf = readbuf; + tmpbuf = *p_readbuf; while (tmpbuf != NULL) { ss_dassert(GWBUF_IS_TYPE_MYSQL(tmpbuf)); @@ -1416,9 +1418,9 @@ static int route_by_statement(SESSION *session, GWBUF *readbuf) #endif do { - ss_dassert(GWBUF_IS_TYPE_MYSQL(readbuf)); + ss_dassert(GWBUF_IS_TYPE_MYSQL((*p_readbuf))); - packetbuf = gw_MySQL_get_next_packet(&readbuf); + packetbuf = gw_MySQL_get_next_packet(p_readbuf); ss_dassert(GWBUF_IS_TYPE_MYSQL(packetbuf)); @@ -1447,7 +1449,7 @@ static int route_by_statement(SESSION *session, GWBUF *readbuf) goto return_rc; } } - while (readbuf != NULL); + while (*p_readbuf != NULL); return_rc: return rc; diff --git a/server/modules/protocol/mysql_common.c b/server/modules/protocol/mysql_common.c index 70050a13c..ef9991554 100644 --- a/server/modules/protocol/mysql_common.c +++ b/server/modules/protocol/mysql_common.c @@ -79,6 +79,7 @@ MySQLProtocol* mysql_protocol_init( strerror(eno)))); goto return_p; } + p->protocol_state = MYSQL_PROTOCOL_ALLOC; p->protocol_auth_state = MYSQL_ALLOC; p->protocol_command.scom_cmd = MYSQL_COM_UNDEFINED; p->protocol_command.scom_nresponse_packets = 0; @@ -90,6 +91,7 @@ MySQLProtocol* mysql_protocol_init( /*< Assign fd with protocol */ p->fd = fd; p->owner_dcb = dcb; + p->protocol_state = MYSQL_PROTOCOL_ACTIVE; CHK_PROTOCOL(p); return_p: return p; @@ -107,15 +109,25 @@ return_p: void mysql_protocol_done ( DCB* dcb) { - server_command_t* scmd = ((MySQLProtocol *)dcb->protocol)->protocol_cmd_history; + MySQLProtocol* p; + server_command_t* scmd; server_command_t* scmd2; + p = (MySQLProtocol *)dcb->protocol; + + spinlock_acquire(&p->protocol_lock); + + scmd = p->protocol_cmd_history; + while (scmd != NULL) { scmd2 = scmd->scom_next; free(scmd); scmd = scmd2; } + p->protocol_state = MYSQL_PROTOCOL_DONE; + + spinlock_release(&p->protocol_lock); } @@ -886,7 +898,7 @@ int mysql_send_com_quit( if (buf == NULL) { return 0; - } + } nbytes = dcb->func.write(dcb, buf); return nbytes; @@ -1542,16 +1554,18 @@ GWBUF* gw_MySQL_get_next_packet( packetbuf = gwbuf_alloc(packetlen); target = GWBUF_DATA(packetbuf); - + packetbuf->gwbuf_type = readbuf->gwbuf_type; /*< Copy the type too */ + while (nbytes_copied < packetlen) { uint8_t* src = GWBUF_DATA(readbuf); size_t buflen = GWBUF_LENGTH(readbuf); - + memcpy(target+nbytes_copied, src, buflen); - *p_readbuf = gwbuf_consume(readbuf, buflen); + readbuf = gwbuf_consume(readbuf, buflen); nbytes_copied += buflen; } + *p_readbuf = readbuf; ss_dassert(nbytes_copied == packetlen); } else @@ -1625,11 +1639,16 @@ void protocol_archive_srv_command( MySQLProtocol* p) { server_command_t* s1; - server_command_t** s2; + server_command_t* h1; int len = 0; spinlock_acquire(&p->protocol_lock); + if (p->protocol_state != MYSQL_PROTOCOL_ACTIVE) + { + goto retblock; + } + s1 = &p->protocol_command; LOGIF(LT, (skygw_log_write( @@ -1639,18 +1658,19 @@ void protocol_archive_srv_command( p->owner_dcb->fd))); /** Copy to history list */ - s2 = &p->protocol_cmd_history; - - if (*s2 != NULL) - { - while ((*s2)->scom_next != NULL) - { - *s2 = (*s2)->scom_next; - len += 1; - } + if ((h1 = p->protocol_cmd_history) == NULL) + { + p->protocol_cmd_history = server_command_copy(s1); } - *s2 = server_command_copy(s1); - + else + { + while (h1->scom_next != NULL) + { + h1 = h1->scom_next; + } + h1->scom_next = server_command_copy(s1); + } + /** Keep history limits, remove oldest */ if (len > MAX_CMD_HISTORY) { @@ -1669,6 +1689,8 @@ void protocol_archive_srv_command( p->protocol_command = *(s1->scom_next); free(s1->scom_next); } + +retblock: spinlock_release(&p->protocol_lock); } diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 9d4b1b8ff..c10aca134 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -1549,7 +1549,7 @@ static void clientReply ( size_t len = MYSQL_GET_PACKET_LEN(buf); char* cmdstr = (char *)malloc(len+1); - snprintf(cmdstr, len+1, "%s", &buf[5]); + snprintf(cmdstr, len, "%s", &buf[5]); LOGIF(LE, (skygw_log_write_flush( LOGFILE_ERROR,