diff --git a/server/core/buffer.c b/server/core/buffer.c index d46ad475e..e3c649f69 100644 --- a/server/core/buffer.c +++ b/server/core/buffer.c @@ -128,10 +128,37 @@ GWBUF *rval; rval->start = buf->start; rval->end = buf->end; rval->next = NULL; - rval->command = buf->command; +// rval->command = buf->command; CHK_GWBUF(rval); return rval; } + + +GWBUF *gwbuf_clone_portion( + GWBUF *buf, + size_t start_offset, + size_t length) +{ + GWBUF* clonebuf; + + CHK_GWBUF(buf); + ss_dassert(start_offset+length <= GWBUF_LENGTH(buf)); + + if ((clonebuf = (GWBUF *)malloc(sizeof(GWBUF))) == NULL) + { + return NULL; + } + atomic_add(&buf->sbuf->refcount, 1); + clonebuf->sbuf = buf->sbuf; + clonebuf->start = (void *)((char*)buf->start)+start_offset; + clonebuf->end = (void *)((char *)clonebuf->start)+length; + clonebuf->next = NULL; + CHK_GWBUF(clonebuf); + return clonebuf; + +} + + /** * Append a buffer onto a linked list of buffer structures. * diff --git a/server/core/dcb.c b/server/core/dcb.c index 306a3b103..fcf2e0cf2 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -302,6 +302,8 @@ dcb_final_free(DCB *dcb) if (dcb->remote) free(dcb->remote); bitmask_free(&dcb->memdata.bitmask); + simple_mutex_done(&dcb->dcb_read_lock); + simple_mutex_done(&dcb->dcb_write_lock); free(dcb); } @@ -520,6 +522,8 @@ int rc; * Successfully connected to backend. Assign file descriptor to dcb */ dcb->fd = fd; + /** Copy status field to DCB */ + dcb->dcb_server_status = server->status; /*< * backend_dcb is connected to backend server, and once backend_dcb @@ -683,6 +687,15 @@ dcb_write(DCB *dcb, GWBUF *queue) dcb->state != DCB_STATE_LISTENING && dcb->state != DCB_STATE_NOPOLLING)) { + LOGIF(LD, (skygw_log_write( + LOGFILE_DEBUG, + "%lu [dcb_write] Write aborted to dcb %p because " + "it is in state %s", + pthread_self(), + dcb->stats.n_buffered, + dcb, + STRDCBSTATE(dcb->state), + dcb->fd))); return 0; } @@ -743,7 +756,11 @@ dcb_write(DCB *dcb, GWBUF *queue) #endif /* SS_DEBUG */ len = GWBUF_LENGTH(queue); GW_NOINTR_CALL( - w = gw_write(dcb->fd, GWBUF_DATA(queue), len); + w = gw_write( +#if defined(SS_DEBUG) + dcb, +#endif + dcb->fd, GWBUF_DATA(queue), len); dcb->stats.n_writes++; ); @@ -855,9 +872,13 @@ int saved_errno = 0; while (dcb->writeq != NULL) { len = GWBUF_LENGTH(dcb->writeq); - GW_NOINTR_CALL(w = gw_write(dcb->fd, - GWBUF_DATA(dcb->writeq), - len);); + GW_NOINTR_CALL(w = gw_write( +#if defined(SS_DEBUG) + dcb, +#endif + dcb->fd, + GWBUF_DATA(dcb->writeq), + len);); saved_errno = errno; errno = 0; @@ -1312,12 +1333,15 @@ static bool dcb_set_state_nomutex( } int gw_write( +#if defined(SS_DEBUG) + DCB* dcb, +#endif int fd, const void* buf, size_t nbytes) { int w; -#if defined(SS_DEBUG) +#if defined(SS_DEBUG) if (dcb_fake_write_errno[fd] != 0) { ss_dassert(dcb_fake_write_ev[fd] != 0); w = write(fd, buf, nbytes/2); /*< leave peer to read missing bytes */ @@ -1332,6 +1356,56 @@ int gw_write( #else w = write(fd, buf, nbytes); #endif /* SS_DEBUG && SS_TEST */ + +#if defined(SS_DEBUG) + { + size_t len; + unsigned char* packet = (unsigned char *)buf; + char* str; + + /** Print only MySQL packets */ + if (w > 5) + { + str = (char *)&packet[5]; + len = packet[0]; + len += 255*packet[1]; + len += 255*255*packet[2]; + + if (strncmp(str, "insert", 6) == 0 || + strncmp(str, "create", 6) == 0 || + strncmp(str, "drop", 4) == 0) + { + ss_dassert((dcb->dcb_server_status & (SERVER_RUNNING|SERVER_MASTER|SERVER_SLAVE))==(SERVER_RUNNING|SERVER_MASTER)); + } + + if (strncmp(str, "set autocommit", 14) == 0 && nbytes > 17) + { + char* s = (char *)calloc(1, nbytes+1); + + if (nbytes-5 > len) + { + size_t len2 = packet[4+len]; + len2 += 255*packet[4+len+1]; + len2 += 255*255*packet[4+len+2]; + + char* str2 = (char *)&packet[4+len+5]; + snprintf(s, 5+len+len2, "long %s %s", (char *)str, (char *)str2); + } + else + { + snprintf(s, len, "%s", (char *)str); + } + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "%lu [gw_write] Wrote %d bytes : %s ", + pthread_self(), + w, + s))); + free(s); + } + } + } +#endif return w; } diff --git a/server/include/buffer.h b/server/include/buffer.h index ca9669ecd..de5a13148 100644 --- a/server/include/buffer.h +++ b/server/include/buffer.h @@ -92,6 +92,6 @@ extern GWBUF *gwbuf_clone(GWBUF *buf); extern GWBUF *gwbuf_append(GWBUF *head, GWBUF *tail); extern GWBUF *gwbuf_consume(GWBUF *head, unsigned int length); extern unsigned int gwbuf_length(GWBUF *head); - +extern GWBUF *gwbuf_clone_portion(GWBUF *head, size_t offset, size_t len); #endif diff --git a/server/include/dcb.h b/server/include/dcb.h index e46784b27..cacf6527e 100644 --- a/server/include/dcb.h +++ b/server/include/dcb.h @@ -87,6 +87,7 @@ typedef struct gw_protocol { int (*listen)(struct dcb *, char *); int (*auth)(struct dcb *, struct server *, struct session *, GWBUF *); int (*session)(struct dcb *, void *); + void* (*getstmt)(void* buf); } GWPROTOCOL; /** @@ -176,7 +177,7 @@ typedef struct dcb { SPINLOCK authlock; /**< Generic Authorization spinlock */ DCBSTATS stats; /**< DCB related statistics */ - + unsigned int dcb_server_status; /*< the server role indicator from SERVER */ struct dcb *next; /**< Next DCB in the chain of allocated DCB's */ struct service *service; /**< The related service */ void *data; /**< Specific client data */ @@ -202,7 +203,13 @@ int fail_accept_errno; #define DCB_ISZOMBIE(x) ((x)->state == DCB_STATE_ZOMBIE) DCB *dcb_get_zombies(void); -int gw_write(int fd, const void* buf, size_t nbytes); +int gw_write( +#if defined(SS_DEBUG) + DCB* dcb, +#endif + int fd, + const void* buf, + size_t nbytes); int dcb_write(DCB *, GWBUF *); DCB *dcb_alloc(dcb_role_t); void dcb_free(DCB *); diff --git a/server/include/gw.h b/server/include/gw.h index 98e48c60a..17911d13a 100644 --- a/server/include/gw.h +++ b/server/include/gw.h @@ -56,6 +56,12 @@ int MySQLWrite(DCB *dcb, GWBUF *queue); int gw_write_backend_event(DCB *dcb); int gw_read_backend_event(DCB *dcb); int setnonblocking(int fd); -int gw_write(int fd, const void* buf, size_t nbytes); +int gw_write( +#if defined(SS_DEBUG) + DCB* dcb, +#endif + int fd, + const void* buf, + size_t nbytes); int gw_getsockerrno(int fd); int parse_bindconfig(char *, unsigned short, struct sockaddr_in *); diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 6abec7563..95b414958 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -46,12 +46,19 @@ typedef struct router_client_session ROUTER_CLIENT_SES; typedef enum rses_property_type_t { RSES_PROP_TYPE_UNDEFINED=0, - RSES_PROP_TYPE_FIRST, - RSES_PROP_TYPE_SESCMD=RSES_PROP_TYPE_FIRST, + RSES_PROP_TYPE_SESCMD, + RSES_PROP_TYPE_FIRST = RSES_PROP_TYPE_SESCMD, RSES_PROP_TYPE_LAST=RSES_PROP_TYPE_SESCMD, RSES_PROP_TYPE_COUNT=RSES_PROP_TYPE_LAST+1 } rses_property_type_t; +typedef enum backend_type_t { + BE_UNDEFINED=-1, + BE_MASTER, + BE_SLAVE, + BE_COUNT +} backend_type_t; + /** * Session variable command */ @@ -59,9 +66,9 @@ typedef struct mysql_sescmd_st { #if defined(SS_DEBUG) skygw_chk_t my_sescmd_chk_top; #endif - ROUTER_CLIENT_SES* my_sescmd_rsession; /*< parent router session */ +// ROUTER_CLIENT_SES* my_sescmd_rsession; /*< parent router session */ rses_property_t* my_sescmd_prop; /*< parent property */ - GWBUF* my_sescmd_buf; /*< client query reference */ + GWBUF* my_sescmd_buf; /*< query buffer */ bool my_sescmd_is_replied; /*< is cmd replied to client */ #if defined(SS_DEBUG) skygw_chk_t my_sescmd_chk_tail; @@ -76,7 +83,8 @@ struct rses_property_st { #if defined(SS_DEBUG) skygw_chk_t rses_prop_chk_top; #endif - SPINLOCK rses_prop_lock; /*< protect property content */ + ROUTER_CLIENT_SES* rses_prop_rsession; /*< parent router session */ +// SPINLOCK rses_prop_lock; /*< protect property content */ int rses_prop_refcount; rses_property_type_t rses_prop_type; union rses_prop_data { @@ -90,18 +98,13 @@ struct rses_property_st { }; typedef struct sescmd_cursor_st { - rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */ - mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */ - bool scmd_cur_active; /*< true if command is being executed */ + ROUTER_CLIENT_SES* scmd_cur_rses; /*< pointer to owning router session */ + rses_property_t** scmd_cur_ptr_property; /*< address of pointer to owner property */ + mysql_sescmd_t* scmd_cur_cmd; /*< pointer to current session command */ + bool scmd_cur_active; /*< true if command is being executed */ + backend_type_t scmd_cur_be_type; /*< BE_MASTER or BE_SLAVE */ } sescmd_cursor_t; -typedef enum backend_type_t { - BE_UNDEFINED=-1, - BE_MASTER, - BE_SLAVE, - BE_COUNT -} backend_type_t; - /** * The client session structure used within this router. */ diff --git a/server/modules/protocol/mysql_backend.c b/server/modules/protocol/mysql_backend.c index a031648c7..06d6e3a85 100644 --- a/server/modules/protocol/mysql_backend.c +++ b/server/modules/protocol/mysql_backend.c @@ -521,7 +521,7 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) /*< * Don't write to backend if backend_dcb is not in poll set anymore. */ - spinlock_acquire(&dcb->authlock); + spinlock_acquire(&dcb->dcb_initlock); if (dcb->state != DCB_STATE_POLLING) { /*< vraa : errorHandle */ /*< Free buffer memory */ @@ -536,10 +536,12 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) dcb->fd, STRDCBSTATE(dcb->state)))); - spinlock_release(&dcb->authlock); + spinlock_release(&dcb->dcb_initlock); return 0; } - + spinlock_release(&dcb->dcb_initlock); + + spinlock_acquire(&dcb->authlock); /*< * Now put the incoming data to the delay queue unless backend is * connected with auth ok @@ -553,7 +555,6 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) dcb, dcb->fd, STRPROTOCOLSTATE(backend_protocol->state)))); - backend_set_delayqueue(dcb, queue); spinlock_release(&dcb->authlock); return 1; @@ -562,9 +563,9 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue) /*< * Now we set the last command received, from the current queue */ - memcpy(&dcb->command, &queue->command, sizeof(dcb->command)); - +// memcpy(&dcb->command, &queue->command, sizeof(dcb->command)); spinlock_release(&dcb->authlock); +// LOGIF(LD, debuglog_statements(dcb, gwbuf_clone(queue))); rc = dcb_write(dcb, queue); return rc; } @@ -805,7 +806,7 @@ static int backend_write_delayqueue(DCB *dcb) * Now we set the last command received, from the delayed queue */ - memcpy(&dcb->command, &localq->command, sizeof(dcb->command)); +// memcpy(&dcb->command, &localq->command, sizeof(dcb->command)); spinlock_release(&dcb->delayqlock); rc = dcb_write(dcb, localq); @@ -911,7 +912,6 @@ static int gw_change_user(DCB *backend, SERVER *server, SESSION *in_session, GWB strcpy(current_session->user, username); strcpy(current_session->db, database); memcpy(current_session->client_sha1, client_sha1, sizeof(current_session->client_sha1)); - } // consume all the data received from client diff --git a/server/modules/protocol/mysql_client.c b/server/modules/protocol/mysql_client.c index a8cde7337..c2f50a76a 100644 --- a/server/modules/protocol/mysql_client.c +++ b/server/modules/protocol/mysql_client.c @@ -48,6 +48,7 @@ static int gw_MySQLWrite_client(DCB *dcb, GWBUF *queue); static int gw_error_client_event(DCB *dcb); static int gw_client_close(DCB *dcb); static int gw_client_hangup_event(DCB *dcb); +static void* gw_MySQL_get_next_stmt(void* buffer); int mysql_send_ok(DCB *dcb, int packet_number, int in_affected_rows, const char* mysql_message); int MySQLSendHandshake(DCB* dcb); @@ -67,8 +68,9 @@ static GWPROTOCOL MyObject = { gw_client_close, /* Close */ gw_MySQLListener, /* Listen */ NULL, /* Authentication */ - NULL /* Session */ - }; + NULL, /* Session */ + gw_MySQL_get_next_stmt /* get single stmt from read buf */ +}; /** * Implementation of the mandatory version entry point @@ -607,8 +609,7 @@ int gw_read_client_event(DCB* dcb) { */ { int len = -1; - GWBUF *queue = NULL; - GWBUF *gw_buffer = NULL; + GWBUF *read_buffer = NULL; uint8_t *ptr_buff = NULL; int mysql_command = -1; @@ -626,16 +627,15 @@ int gw_read_client_event(DCB* dcb) { ////////////////////////////////////////////////////// // read and handle errors & close, or return if busy ////////////////////////////////////////////////////// - rc = gw_read_gwbuff(dcb, &gw_buffer, b); + rc = gw_read_gwbuff(dcb, &read_buffer, b); if (rc != 0) { goto return_rc; } /* Now, we are assuming in the first buffer there is * the information form mysql command */ - queue = gw_buffer; - len = GWBUF_LENGTH(queue); - ptr_buff = GWBUF_DATA(queue); + len = GWBUF_LENGTH(read_buffer); + ptr_buff = GWBUF_DATA(read_buffer); /* get mysql commang at fifth byte */ if (ptr_buff) { @@ -669,12 +669,12 @@ int gw_read_client_event(DCB* dcb) { } rc = 1; /** Free buffer */ - queue = gwbuf_consume(queue, len); + read_buffer = gwbuf_consume(read_buffer, len); goto return_rc; } /** Route COM_QUIT to backend */ if (mysql_command == '\x01') { - router->routeQuery(router_instance, rsession, queue); + router->routeQuery(router_instance, rsession, read_buffer); LOGIF(LD, (skygw_log_write_flush( LOGFILE_DEBUG, "%lu [gw_read_client_event] Routed COM_QUIT to " @@ -693,7 +693,7 @@ int gw_read_client_event(DCB* dcb) { /** Route other commands to backend */ rc = router->routeQuery(router_instance, rsession, - queue); + read_buffer); /** succeed */ if (rc == 1) { rc = 0; /**< here '0' means success */ @@ -1203,3 +1203,47 @@ gw_client_hangup_event(DCB *dcb) return_rc: return rc; } + +/** + * Remove the first mysql statement from buffer. Return pointer to the removed + * statement or NULL if buffer is empty. + * + * Clone buf, calculate the length of included mysql stmt, and point the + * statement with cloned buffer. Move the start pointer of buf accordingly + * so that it only cover the remaining buffer. + * + */ +static void* gw_MySQL_get_next_stmt( + void* buffer) +{ + GWBUF* readbuf = (GWBUF *)buffer; + GWBUF* stmtbuf; + unsigned char* packet; + size_t len; + + CHK_GWBUF(readbuf); + + if (GWBUF_EMPTY(readbuf)) + { + stmtbuf = NULL; + goto return_stmtbuf; + } + packet = GWBUF_DATA(readbuf); + len = packet[0]; + len += 255*packet[1]; + len += 255*255*packet[2]; + + /** vraa :Multi-packet stmt is not supported as of 7.3.14 */ + if (len+4 > GWBUF_LENGTH(readbuf)) + { + stmtbuf = NULL; + goto return_stmtbuf; + } + stmtbuf = gwbuf_clone_portion(readbuf, 0, 4+len); + gwbuf_consume(readbuf, 4+len); + +return_stmtbuf: + return (void *)stmtbuf; +} + + diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 538c9502f..8f38c79c3 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -109,19 +109,25 @@ static void rses_property_add( static void rses_property_done( rses_property_t* prop); +static mysql_sescmd_t* rses_property_get_sescmd( + rses_property_t* prop); + static sescmd_cursor_t* rses_get_sescmd_cursor( ROUTER_CLIENT_SES* rses, backend_type_t be_type); - static bool execute_sescmd_in_backend( ROUTER_CLIENT_SES* rses, backend_type_t be_type); +static void sescmd_cursor_set_active( + sescmd_cursor_t* sescmd_cursor, + bool value); + static bool sescmd_cursor_is_active( sescmd_cursor_t* sescmd_cursor); -static GWBUF* sescmd_cursor_get_querybuf( +static GWBUF* sescmd_cursor_clone_querybuf( sescmd_cursor_t* scur); static mysql_sescmd_t* sescmd_cursor_get_command( @@ -130,13 +136,20 @@ static mysql_sescmd_t* sescmd_cursor_get_command( static bool sescmd_cursor_next( sescmd_cursor_t* scur); -static void sescmd_reply_to_client( +static bool sescmd_reply_to_client( DCB* client_dcb, - mysql_sescmd_t* scmd); + mysql_sescmd_t* scmd, + GWBUF* writebuf); static bool cont_exec_sescmd_in_backend( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type); + ROUTER_CLIENT_SES* rses, + backend_type_t be_type); + +static void tracelog_routed_query( + ROUTER_CLIENT_SES* rses, + char* funcname, + DCB* dcb, + GWBUF* buf); static SPINLOCK instlock; static ROUTER_INSTANCE* instances; @@ -337,16 +350,20 @@ static void* newSession( client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES; #endif /** store pointers to sescmd list to both cursors */ + client_rses->rses_cursor[BE_MASTER].scmd_cur_rses = client_rses; client_rses->rses_cursor[BE_MASTER].scmd_cur_active = false; - client_rses->rses_cursor[BE_MASTER].scmd_cur_cmd = NULL; client_rses->rses_cursor[BE_MASTER].scmd_cur_ptr_property = &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - - client_rses->rses_cursor[BE_SLAVE].scmd_cur_active = false; - client_rses->rses_cursor[BE_SLAVE].scmd_cur_cmd = NULL; + client_rses->rses_cursor[BE_MASTER].scmd_cur_cmd = NULL; + client_rses->rses_cursor[BE_MASTER].scmd_cur_be_type = BE_MASTER; + + client_rses->rses_cursor[BE_SLAVE].scmd_cur_rses = client_rses; + client_rses->rses_cursor[BE_SLAVE].scmd_cur_active = false; client_rses->rses_cursor[BE_SLAVE].scmd_cur_ptr_property = &client_rses->rses_properties[RSES_PROP_TYPE_SESCMD]; - + client_rses->rses_cursor[BE_SLAVE].scmd_cur_cmd = NULL; + client_rses->rses_cursor[BE_SLAVE].scmd_cur_be_type = BE_SLAVE; + /** * Find a backend server to connect to. This is the extent of the * load balancing algorithm we need to implement for this simple @@ -544,6 +561,7 @@ static int routeQuery( GWBUF* querybuf) { skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; + GWBUF* stmtbuf; char* querystr = NULL; char* startpos; size_t len; @@ -552,7 +570,7 @@ static int routeQuery( int ret = 0; DCB* master_dcb = NULL; DCB* slave_dcb = NULL; - GWBUF* bufcopy = NULL; +// GWBUF* bufcopy = NULL; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; bool rses_is_closed; @@ -561,284 +579,364 @@ static int routeQuery( CHK_CLIENT_RSES(router_cli_ses); inst->stats.n_queries++; - - packet = GWBUF_DATA(querybuf); - packet_type = packet[4]; - startpos = (char *)&packet[5]; - len = packet[0]; - len += 255*packet[1]; - len += 255*255*packet[2]; - - switch(packet_type) { - case COM_QUIT: /**< 1 QUIT will close all sessions */ - case COM_INIT_DB: /**< 2 DDL must go to the master */ - case COM_REFRESH: /**< 7 - I guess this is session but not sure */ - case COM_DEBUG: /**< 0d all servers dump debug info to stdout */ - case COM_PING: /**< 0e all servers are pinged */ - case COM_CHANGE_USER: /**< 11 all servers change it accordingly */ - qtype = QUERY_TYPE_SESSION_WRITE; - break; - - case COM_CREATE_DB: /**< 5 DDL must go to the master */ - case COM_DROP_DB: /**< 6 DDL must go to the master */ - qtype = QUERY_TYPE_WRITE; - break; - - case COM_QUERY: - querystr = (char *)malloc(len); - memcpy(querystr, startpos, len-1); - memset(&querystr[len-1], 0, 1); - qtype = skygw_query_classifier_get_type(querystr, 0); - break; - - case COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */ - case COM_STATISTICS: /**< 9 ? */ - case COM_PROCESS_INFO: /**< 0a ? */ - case COM_CONNECT: /**< 0b ? */ - case COM_PROCESS_KILL: /**< 0c ? */ - case COM_TIME: /**< 0f should this be run in gateway ? */ - case COM_DELAYED_INSERT: /**< 10 ? */ - case COM_DAEMON: /**< 1d ? */ - default: - break; - } /**< switch by packet type */ - - /** Dirty read for quick check if router is closed. */ - if (router_cli_ses->rses_closed) + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) { - rses_is_closed = true; - } - else - { - /** - * Lock router client session for secure read of DCBs - */ - rses_is_closed = !(rses_begin_locked_router_action(router_cli_ses)); - } - - if (!rses_is_closed) - { - master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - /** unlock */ - rses_end_locked_router_action(router_cli_ses); - } - - if (rses_is_closed || (master_dcb == NULL && slave_dcb == NULL)) - { - LOGIF(LE, (skygw_log_write_flush( - LOGFILE_ERROR, - "Error: Failed to route %s:%s:\"%s\" to backend server. " - "%s.", - STRPACKETTYPE(packet_type), - STRQTYPE(qtype), - (querystr == NULL ? "(empty)" : querystr), - (rses_is_closed ? "Router was closed" : - "Router has no backend servers where to route to")))); - goto return_ret; } + master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, - "String\t\"%s\"", - querystr == NULL ? "(empty)" : querystr))); - LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, - "Packet type\t%s", - STRPACKETTYPE(packet_type)))); + /** stmtbuf is clone of querybuf, and only covers one stmt */ + stmtbuf = (GWBUF *)master_dcb->session->client->func.getstmt((void *)querybuf); + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); - switch (qtype) { - case QUERY_TYPE_WRITE: - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [routeQuery:rwsplit] Query type\t%s, " - "routing to Master.", - pthread_self(), - STRQTYPE(qtype)))); - ret = master_dcb->func.write(master_dcb, querybuf); - atomic_add(&inst->stats.n_master, 1); - - goto return_ret; - break; - - case QUERY_TYPE_READ: - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [routeQuery:rwsplit] Query type\t%s, " - "routing to Slave.", - pthread_self(), - STRQTYPE(qtype)))); - /** Lock router session */ - if (!rses_begin_locked_router_action(router_cli_ses)) - { - /** Log error to debug */ - goto return_ret; - } - /** If session command is being executed, route to master */ - if (sescmd_cursor_is_active(rses_get_sescmd_cursor( - router_cli_ses, - BE_MASTER))) - { - ret = master_dcb->func.write(master_dcb, querybuf); - atomic_add(&inst->stats.n_master, 1); - } - else{ - ret = slave_dcb->func.write(slave_dcb, querybuf); - atomic_add(&inst->stats.n_slave, 1); - } - rses_end_locked_router_action(router_cli_ses); - goto return_ret; - break; - - case QUERY_TYPE_SESSION_WRITE: - /** - * Execute in backends used by current router session. - * Save session variable commands to router session property - * struct. Thus, they - * can be replayed in backends which are started and joined later. - * - * Suppress OK packets sent to MaxScale by slaves. - * - * DOES THIS ALL APPLY TO COM_QUIT AS WELL?? - * - */ - - /** - * Update connections which are used in this session. - * - * For each connection updated, add a flag which indicates that - * OK Packet must arrive for this command before server - * in question is allowed to be used by router. That is, - * maintain a queue of pending OK packets and remove item - * from queue by FIFO. - * - * Return when the master responds OK Packet. Send that - * OK packet back to client. - * - * Suppress OK packets sent to MaxScale by slaves. - * - * Open questions: - * How to handle interleaving session write - * and queries? It would be simple if OK must be received - * from all/both servers before continuing query execution. - * How to maintain the order of operations? Execution queue - * would solve the problem. In the queue some things must be - * executed in serialized manner while some could be executed - * in parallel. Queries mostly. - * - * Instead of waiting for the OK packet from the master, the - * first OK packet could also be sent to client. TBD. - * vraa 9.12.13 - * - */ - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [routeQuery:rwsplit] DCB M:%p s:%p, " - "Query type\t%s, " - "packet type %s, routing to all servers.", - pthread_self(), - master_dcb, - slave_dcb, - STRQTYPE(qtype), - STRPACKETTYPE(packet_type)))); - - bufcopy = gwbuf_clone(querybuf); + while (stmtbuf != NULL) + { + packet = GWBUF_DATA(stmtbuf); + packet_type = packet[4]; + startpos = (char *)&packet[5]; + len = packet[0]; + len += 255*packet[1]; + len += 255*255*packet[2]; switch(packet_type) { - case COM_QUIT: - ret = master_dcb->func.write(master_dcb, querybuf); - slave_dcb->func.write(slave_dcb, bufcopy); + case COM_QUIT: /**< 1 QUIT will close all sessions */ + case COM_INIT_DB: /**< 2 DDL must go to the master */ + case COM_REFRESH: /**< 7 - I guess this is session but not sure */ + case COM_DEBUG: /**< 0d all servers dump debug info to stdout */ + case COM_PING: /**< 0e all servers are pinged */ + case COM_CHANGE_USER: /**< 11 all servers change it accordingly */ + qtype = QUERY_TYPE_SESSION_WRITE; + break; + + case COM_CREATE_DB: /**< 5 DDL must go to the master */ + case COM_DROP_DB: /**< 6 DDL must go to the master */ + qtype = QUERY_TYPE_WRITE; + break; + + case COM_QUERY: + querystr = (char *)malloc(len); + memcpy(querystr, startpos, len-1); + memset(&querystr[len-1], 0, 1); + qtype = skygw_query_classifier_get_type(querystr, 0); + break; + + case COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */ + case COM_STATISTICS: /**< 9 ? */ + case COM_PROCESS_INFO: /**< 0a ? */ + case COM_CONNECT: /**< 0b ? */ + case COM_PROCESS_KILL: /**< 0c ? */ + case COM_TIME: /**< 0f should this be run in gateway ? */ + case COM_DELAYED_INSERT: /**< 10 ? */ + case COM_DAEMON: /**< 1d ? */ + default: + break; + } /**< switch by packet type */ + + /** Dirty read for quick check if router is closed. */ + if (router_cli_ses->rses_closed) + { + rses_is_closed = true; + } + else + { + /** + * Lock router client session for secure read of DCBs + */ + rses_is_closed = + !(rses_begin_locked_router_action(router_cli_ses)); + } + + if (!rses_is_closed) + { + master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; + slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; + /** unlock */ + rses_end_locked_router_action(router_cli_ses); + } + + if (rses_is_closed || (master_dcb == NULL && slave_dcb == NULL)) + { + LOGIF(LE, (skygw_log_write_flush( + LOGFILE_ERROR, + "Error: Failed to route %s:%s:\"%s\" to " + "backend server. %s.", + STRPACKETTYPE(packet_type), + STRQTYPE(qtype), + (querystr == NULL ? "(empty)" : querystr), + (rses_is_closed ? "Router was closed" : + "Router has no backend servers where to " + "route to")))); + goto return_ret; + } + + LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, + "String\t\"%s\"", + querystr == NULL ? "(empty)" : querystr))); + LOGIF(LT, (skygw_log_write(LOGFILE_TRACE, + "Packet type\t%s", + STRPACKETTYPE(packet_type)))); + + switch (qtype) { + case QUERY_TYPE_WRITE: + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "%lu [routeQuery:rwsplit] Query type\t%s, " + "routing to Master.", + pthread_self(), + STRQTYPE(qtype)))); + + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "routeQuery", + master_dcb, + gwbuf_clone(stmtbuf))); + + ret = master_dcb->func.write(master_dcb, stmtbuf); + atomic_add(&inst->stats.n_master, 1); + + goto return_ret; + break; + + case QUERY_TYPE_READ: + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "%lu [routeQuery:rwsplit] Query type\t%s, " + "routing to Slave.", + pthread_self(), + STRQTYPE(qtype)))); + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + /** Log error to debug */ + goto return_ret; + } + /** + * If session command is being executed in slave + * route to master + */ + if (sescmd_cursor_is_active(rses_get_sescmd_cursor( + router_cli_ses, + BE_SLAVE))) + { + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "routeQuery", + master_dcb, + gwbuf_clone(stmtbuf))); + + ret = master_dcb->func.write(master_dcb, stmtbuf); + atomic_add(&inst->stats.n_master, 1); + } + else + { + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "routeQuery", + slave_dcb, + gwbuf_clone(stmtbuf))); + + ret = slave_dcb->func.write(slave_dcb, stmtbuf); + atomic_add(&inst->stats.n_slave, 1); + } + rses_end_locked_router_action(router_cli_ses); + goto return_ret; break; - case COM_CHANGE_USER: - master_dcb->func.auth( - master_dcb, - NULL, - master_dcb->session, - querybuf); - slave_dcb->func.auth( - slave_dcb, - NULL, - master_dcb->session, - bufcopy); - break; + case QUERY_TYPE_SESSION_WRITE: + /** + * Execute in backends used by current router session. + * Save session variable commands to router session property + * struct. Thus, they + * can be replayed in backends which are started and joined later. + * + * Suppress OK packets sent to MaxScale by slaves. + * + * DOES THIS ALL APPLY TO COM_QUIT AS WELL?? + * + */ - case COM_QUERY: - /** - * 1. Create new property of type RSES_PROP_TYPE_SESCMD. - * 2. Add property to the ROUTER_CLIENT_SES struct of - * this router session. - * 3. For each backend, and for each non-executed - * sescmd: - * call execution of current sescmd in - * all backends as long as both have executed - * them all. - * Execution call is dcb->func.session. - * All sescmds are executed when its return value is - * NULL, otherwise it is a pointer to next property. - */ - prop = rses_property_init(RSES_PROP_TYPE_SESCMD); - /** - * Additional reference is created to querybuf to - * prevent it from being released before properties - * are cleaned up as a part of router sessionclean-up. - */ - mysql_sescmd_init(prop, - gwbuf_clone(querybuf), - router_cli_ses); - - /** Lock router session */ - if (!rses_begin_locked_router_action(router_cli_ses)) - { - rses_property_done(prop); - goto return_ret; - } - /** Add sescmd property to router client session */ - rses_property_add(router_cli_ses, prop); - - /** Execute session command in master */ - if (!execute_sescmd_in_backend(router_cli_ses, BE_MASTER)) - { - /** Log error */ - } - /** Execute session command in slave */ - if (!execute_sescmd_in_backend(router_cli_ses, BE_SLAVE)) - { - /** Log error */ - } - - /** Unlock router session */ - rses_end_locked_router_action(router_cli_ses); + /** + * Update connections which are used in this session. + * + * For each connection updated, add a flag which indicates that + * OK Packet must arrive for this command before server + * in question is allowed to be used by router. That is, + * maintain a queue of pending OK packets and remove item + * from queue by FIFO. + * + * Return when the master responds OK Packet. Send that + * OK packet back to client. + * + * Suppress OK packets sent to MaxScale by slaves. + * + * Open questions: + * How to handle interleaving session write + * and queries? It would be simple if OK must be received + * from all/both servers before continuing query execution. + * How to maintain the order of operations? Execution queue + * would solve the problem. In the queue some things must be + * executed in serialized manner while some could be executed + * in parallel. Queries mostly. + * + * Instead of waiting for the OK packet from the master, the + * first OK packet could also be sent to client. TBD. + * vraa 9.12.13 + * + */ + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "%lu [routeQuery:rwsplit] DCB M:%p s:%p, " + "Query type\t%s, " + "packet type %s, routing to all servers.", + pthread_self(), + master_dcb, + slave_dcb, + STRQTYPE(qtype), + STRPACKETTYPE(packet_type)))); + + switch(packet_type) { + /** + case COM_QUIT: + ret = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf)); + slave_dcb->func.write(slave_dcb, querybuf); + break; + */ + case COM_CHANGE_USER: + + LOGIF(LT, tracelog_routed_query( + router_cli_ses, + "routeQuery", + master_dcb, + gwbuf_clone(stmtbuf))); + + master_dcb->func.auth( + master_dcb, + NULL, + master_dcb->session, + gwbuf_clone(stmtbuf)); + + LOGIF(LT, tracelog_routed_query( + router_cli_ses, + "routeQuery", + slave_dcb, + gwbuf_clone(stmtbuf))); + + slave_dcb->func.auth( + slave_dcb, + NULL, + master_dcb->session, + stmtbuf); + break; + + case COM_QUIT: + case COM_QUERY: + /** + * 1. Create new property of type RSES_PROP_TYPE_SESCMD. + * 2. Add property to the ROUTER_CLIENT_SES struct of + * this router session. + * 3. For each backend, and for each non-executed + * sescmd: + * call execution of current sescmd in + * all backends as long as both have executed + * them all. + * Execution call is dcb->func.session. + * All sescmds are executed when its return value is + * NULL, otherwise it is a pointer to next property. + */ + prop = rses_property_init(RSES_PROP_TYPE_SESCMD); + /** + * Additional reference is created to querybuf to + * prevent it from being released before properties + * are cleaned up as a part of router sessionclean-up. + */ + mysql_sescmd_init(prop, stmtbuf, router_cli_ses); + + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + rses_property_done(prop); + goto return_ret; + } + /** Add sescmd property to router client session */ + rses_property_add(router_cli_ses, prop); + + /** Execute session command in master */ + if (execute_sescmd_in_backend(router_cli_ses, BE_MASTER)) + { + ret = 1; + } + else + { + /** Log error */ + } + /** Execute session command in slave */ + if (execute_sescmd_in_backend(router_cli_ses, BE_SLAVE)) + { + ret = 1; + } + else + { + /** Log error */ + } + + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); + break; + + default: + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "routeQuery", + master_dcb, + gwbuf_clone(stmtbuf))); + ret = master_dcb->func.write(master_dcb, + (void *)gwbuf_clone(stmtbuf)); + + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "routeQuery", + slave_dcb, + gwbuf_clone(stmtbuf))); + + slave_dcb->func.write(slave_dcb, (void *)stmtbuf); + break; + } /**< switch by packet type */ + + atomic_add(&inst->stats.n_all, 1); + goto return_ret; break; default: - ret = master_dcb->func.session(master_dcb, - (void *)querybuf); - slave_dcb->func.session(slave_dcb, (void *)bufcopy); + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "%lu [routeQuery:rwsplit] Query type\t%s, " + "routing to Master by default.", + pthread_self(), + STRQTYPE(qtype)))); + + /** + * Is this really ok? + * What is not known is routed to master. + */ + LOGIF(LT, tracelog_routed_query(router_cli_ses, + "routeQuery", + master_dcb, + gwbuf_clone(stmtbuf))); + + ret = master_dcb->func.write(master_dcb, stmtbuf); + atomic_add(&inst->stats.n_master, 1); + goto return_ret; break; - } /**< switch by packet type */ + } /*< switch by query type */ - atomic_add(&inst->stats.n_all, 1); - goto return_ret; - break; - - default: - LOGIF(LT, (skygw_log_write( - LOGFILE_TRACE, - "%lu [routeQuery:rwsplit] Query type\t%s, " - "routing to Master by default.", - pthread_self(), - STRQTYPE(qtype)))); + /** get next stmt */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + goto return_ret; + } + master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; - /** - * Is this really ok? - * What is not known is routed to master. - */ - ret = master_dcb->func.write(master_dcb, querybuf); - atomic_add(&inst->stats.n_master, 1); - goto return_ret; - break; - } /**< switch by query type */ - + /** stmtbuf is clone of querybuf, and only covers one stmt */ + stmtbuf = (GWBUF *)master_dcb->session->client->func.getstmt((void *)querybuf); + rses_end_locked_router_action(router_cli_ses); + } /* while (stmtbuf != NULL) */ return_ret: free(querystr); return ret; @@ -984,16 +1082,20 @@ static void clientReply( */ if (!rses_begin_locked_router_action(router_cli_ses)) { - goto lock_failed; + /** is this needed ??*/ + gwbuf_consume(writebuf, gwbuf_length(writebuf)); + goto lock_failed; } master_dcb = router_cli_ses->rses_dcb[BE_MASTER]; slave_dcb = router_cli_ses->rses_dcb[BE_SLAVE]; - - /** Unlock */ - rses_end_locked_router_action(router_cli_ses); - + + /** Holding lock ensures that router session remains open */ + ss_dassert(backend_dcb->session != NULL); client_dcb = backend_dcb->session->client; + /** Unlock */ + rses_end_locked_router_action(router_cli_ses); + /** * 1. Check if backend received reply to sescmd. * 2. Check sescmd's state whether OK_PACKET has been @@ -1018,27 +1120,23 @@ static void clientReply( { be_type = BE_SLAVE; } - scur = rses_get_sescmd_cursor(router_cli_ses, be_type); - ss_dassert(writebuf == sescmd_cursor_get_querybuf(scur)); - + /** Lock router session */ + if (!rses_begin_locked_router_action(router_cli_ses)) + { + /** Log to debug that router was closed */ + goto lock_failed; + } + scur = rses_get_sescmd_cursor(router_cli_ses, be_type); /** * Active cursor means that reply is from session command * execution. */ if (sescmd_cursor_is_active(scur)) { - mysql_sescmd_t* scmd = sescmd_cursor_get_command(scur); - - sescmd_reply_to_client(client_dcb, scmd); - - /** - * If there is a pending sescmd, start its execution. - */ - if (!rses_begin_locked_router_action(router_cli_ses)) - { - goto lock_failed; - } - /** Start execution of all pending ses commands. */ + mysql_sescmd_t* scmd = sescmd_cursor_get_command(scur); + sescmd_reply_to_client(client_dcb, scmd, writebuf); + + /** Read next sescmd property */ while (sescmd_cursor_next(scur)) { if (!cont_exec_sescmd_in_backend(router_cli_ses, be_type)) @@ -1050,8 +1148,26 @@ static void clientReply( /** Log execution of pending sescmd */ } } + /** Set cursor passive. */ + sescmd_cursor_set_active(scur, false); + + /** Unlock router session */ rses_end_locked_router_action(router_cli_ses); } + else if (client_dcb != NULL) + { + /** Write reply to client DCB */ + client_dcb->func.write(client_dcb, writebuf); + /** Unlock router session */ + rses_end_locked_router_action(router_cli_ses); + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "%lu [clientReply:rwsplit] client dcb %p, " + "backend dcb %p. End of normal reply.", + pthread_self(), + client_dcb, + backend_dcb))); + } return; /*< succeed */ lock_failed: /** log that router session couldn't be locked */ @@ -1230,7 +1346,7 @@ static rses_property_t* rses_property_init( { goto return_prop; } - spinlock_init(&prop->rses_prop_lock); +// spinlock_init(&prop->rses_prop_lock); prop->rses_prop_type = prop_type; #if defined(SS_DEBUG) prop->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY; @@ -1242,29 +1358,6 @@ return_prop: return prop; } -/** - * Create session command property. - */ -static mysql_sescmd_t* mysql_sescmd_init ( - rses_property_t* rses_prop, - GWBUF* sescmd_buf, - ROUTER_CLIENT_SES* rses) -{ - mysql_sescmd_t* sescmd; - - CHK_RSES_PROP(rses_prop); - sescmd = &rses_prop->rses_prop_data.sescmd; - sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */ -#if defined(SS_DEBUG) - sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD; - sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD; -#endif - sescmd->my_sescmd_buf = sescmd_buf; /*< session command query */ - ss_dassert(sescmd_buf->sbuf->refcount > 0); - - return sescmd; -} - /** * Property is freed at the end of router client session. */ @@ -1275,7 +1368,7 @@ static void rses_property_done( switch (prop->rses_prop_type) { case RSES_PROP_TYPE_SESCMD: - mysql_sescmd_done(&prop->rses_prop_data.sescmd); + mysql_sescmd_done(&prop->rses_prop_data.sescmd); break; default: LOGIF(LD, (skygw_log_write_flush( @@ -1292,15 +1385,6 @@ static void rses_property_done( free(prop); } - -static void mysql_sescmd_done( - mysql_sescmd_t* sescmd) -{ - CHK_RSES_PROP(sescmd->my_sescmd_prop); - gwbuf_free(sescmd->my_sescmd_buf); - memset(sescmd, 0, sizeof(mysql_sescmd_t)); -} - /** * Add property to the router_client_ses structure's rses_properties * array. The slot is determined by the type of property. @@ -1309,86 +1393,165 @@ static void mysql_sescmd_done( * Router client session must be locked. */ static void rses_property_add( - ROUTER_CLIENT_SES* rses, - rses_property_t* prop) + ROUTER_CLIENT_SES* rses, + rses_property_t* prop) { - rses_property_t* p; - - CHK_CLIENT_RSES(rses); - CHK_RSES_PROP(prop); - ss_dassert(rses->rses_lock.lock != 0); - p = rses->rses_properties[prop->rses_prop_type]; - - if (p == NULL) - { - rses->rses_properties[prop->rses_prop_type] = prop; - } - else - { - while (p->rses_prop_next != NULL) - { - p = p->rses_prop_next; - } - p->rses_prop_next = prop; - } + rses_property_t* p; + + CHK_CLIENT_RSES(rses); + CHK_RSES_PROP(prop); + ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); + + prop->rses_prop_rsession = rses; + p = rses->rses_properties[prop->rses_prop_type]; + + if (p == NULL) + { + rses->rses_properties[prop->rses_prop_type] = prop; + } + else + { + while (p->rses_prop_next != NULL) + { + p = p->rses_prop_next; + } + p->rses_prop_next = prop; + } } -static void rses_begin_locked_property_action( - rses_property_t* prop) +/** Router sessiosn must be locked */ +static mysql_sescmd_t* rses_property_get_sescmd( + rses_property_t* prop) { - CHK_RSES_PROP(prop); - spinlock_acquire(&prop->rses_prop_lock); + mysql_sescmd_t* sescmd; + + CHK_RSES_PROP(prop); + ss_dassert(prop->rses_prop_rsession == NULL || + SPINLOCK_IS_LOCKED(&prop->rses_prop_rsession->rses_lock)); + + sescmd = &prop->rses_prop_data.sescmd; + + if (sescmd != NULL) + { + CHK_MYSQL_SESCMD(sescmd); + } + return sescmd; +} + +// static rses_property_t* rses_property_get_ptr_next( + +/** +static void rses_begin_locked_property_action( + rses_property_t* prop) +{ + CHK_RSES_PROP(prop); + spinlock_acquire(&prop->rses_prop_lock); } static void rses_end_locked_property_action( - rses_property_t* prop) + rses_property_t* prop) { - CHK_RSES_PROP(prop); - spinlock_release(&prop->rses_prop_lock); + CHK_RSES_PROP(prop); + spinlock_release(&prop->rses_prop_lock); +} +*/ + +/** + * Create session command property. + */ +static mysql_sescmd_t* mysql_sescmd_init ( + rses_property_t* rses_prop, + GWBUF* sescmd_buf, + ROUTER_CLIENT_SES* rses) +{ + mysql_sescmd_t* sescmd; + + CHK_RSES_PROP(rses_prop); + /** Can't call rses_property_get_sescmd with uninitialized sescmd */ + sescmd = &rses_prop->rses_prop_data.sescmd; + sescmd->my_sescmd_prop = rses_prop; /*< reference to owning property */ +// sescmd->my_sescmd_rsession = rses; +#if defined(SS_DEBUG) + sescmd->my_sescmd_chk_top = CHK_NUM_MY_SESCMD; + sescmd->my_sescmd_chk_tail = CHK_NUM_MY_SESCMD; +#endif + /** Set session command buffer */ + sescmd->my_sescmd_buf = sescmd_buf; + + return sescmd; } -/** router must be locked */ -static void sescmd_cursor_set_active( - sescmd_cursor_t* sescmd_cursor, - bool value) +static void mysql_sescmd_done( + mysql_sescmd_t* sescmd) { - ss_dassert(SPINLOCK_IS_LOCKED(&(*sescmd_cursor->scmd_cur_ptr_property)->rses_prop_lock)); - /** avoid calling unnecessarily */ - ss_dassert(sescmd_cursor->scmd_cur_active != value); - sescmd_cursor->scmd_cur_active = value; + CHK_RSES_PROP(sescmd->my_sescmd_prop); + gwbuf_free(sescmd->my_sescmd_buf); + memset(sescmd, 0, sizeof(mysql_sescmd_t)); } -static void sescmd_reply_to_client( + +/** + * Write session command reply from backend to client if command haven't yet + * been replied. + * Return true if succeed, false if command was already replied. + * + * Router session must be locked */ +static bool sescmd_reply_to_client( DCB* client_dcb, - mysql_sescmd_t* scmd) + mysql_sescmd_t* scmd, + GWBUF* writebuf) { - rses_property_t* prop; + bool succp = false; + + // rses_property_t* prop; CHK_DCB(client_dcb); CHK_MYSQL_SESCMD(scmd); - CHK_GWBUF(scmd->my_sescmd_buf); + CHK_GWBUF(writebuf); + ss_dassert(SPINLOCK_IS_LOCKED( + &scmd->my_sescmd_prop->rses_prop_rsession->rses_lock)); - prop = mysql_sescmd_get_property(scmd); +// prop = mysql_sescmd_get_property(scmd); - rses_begin_locked_property_action(prop); +// rses_begin_locked_property_action(prop); if (!scmd->my_sescmd_is_replied) { - CHK_GWBUF(scmd->my_sescmd_buf); - client_dcb->func.write(client_dcb, scmd->my_sescmd_buf); + client_dcb->func.write(client_dcb, writebuf); scmd->my_sescmd_is_replied = true; + succp = true; + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "%lu [sescmd_reply_to_client] Replied to client dcb %p.", + pthread_self(), + client_dcb))); } - rses_end_locked_property_action(prop); + else + { + } +// rses_end_locked_property_action(prop); + return succp; } +/** + * Get the address of current session command. + * + * Router session must be locked */ static mysql_sescmd_t* sescmd_cursor_get_command( sescmd_cursor_t* scur) { - mysql_sescmd_t* scmd = scur->scmd_cur_cmd; - CHK_MYSQL_SESCMD(scmd); - + mysql_sescmd_t* scmd; + + ss_dassert(SPINLOCK_IS_LOCKED(&scur->scmd_cur_rses->rses_lock)); + + scur->scmd_cur_cmd = rses_property_get_sescmd(*scur->scmd_cur_ptr_property); + + CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); + + scmd = scur->scmd_cur_cmd; + return scmd; } @@ -1398,6 +1561,8 @@ static sescmd_cursor_t* rses_get_sescmd_cursor( backend_type_t be_type) { CHK_CLIENT_RSES(rses); + ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); + return &rses->rses_cursor[be_type]; } @@ -1405,18 +1570,35 @@ static sescmd_cursor_t* rses_get_sescmd_cursor( static bool sescmd_cursor_is_active( sescmd_cursor_t* sescmd_cursor) { - bool succp = sescmd_cursor->scmd_cur_active; + bool succp; + ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock)); + + succp = sescmd_cursor->scmd_cur_active; return succp; } -/** Router session must be locked */ -static GWBUF* sescmd_cursor_get_querybuf( +/** router must be locked */ +static void sescmd_cursor_set_active( + sescmd_cursor_t* sescmd_cursor, + bool value) +{ + ss_dassert(SPINLOCK_IS_LOCKED(&sescmd_cursor->scmd_cur_rses->rses_lock)); + /** avoid calling unnecessarily */ + ss_dassert(sescmd_cursor->scmd_cur_active != value); + sescmd_cursor->scmd_cur_active = value; +} + +/** + * Clone session command's command buffer. + * Router session must be locked + */ +static GWBUF* sescmd_cursor_clone_querybuf( sescmd_cursor_t* scur) { GWBUF* buf; ss_dassert(scur->scmd_cur_cmd != NULL); - buf = scur->scmd_cur_cmd->my_sescmd_buf; + buf = gwbuf_clone(scur->scmd_cur_cmd->my_sescmd_buf); CHK_GWBUF(buf); return buf; @@ -1447,24 +1629,48 @@ static bool execute_sescmd_in_backend( CHK_CLIENT_RSES(rses); ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); + /** + * Get cursor pointer and copy of command buffer to cursor. + */ scur = rses_get_sescmd_cursor(rses, be_type); /** Return if there are no pending ses commands */ - if (scur->scmd_cur_cmd == NULL) + if (sescmd_cursor_get_command(scur) == NULL) { succp = false; + goto return_succp; } if (!sescmd_cursor_is_active(scur)) { + /** Cursor is left active when function returns. */ sescmd_cursor_set_active(scur, true); - rc = dcb->func.session(dcb, sescmd_cursor_get_querybuf(scur)); + + LOGIF(LT, tracelog_routed_query(rses, + "execute_sescmd_in_backend", + dcb, + sescmd_cursor_clone_querybuf(scur))); + rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur)); +// rc = dcb->func.session(dcb, sescmd_cursor_clone_querybuf(scur)); - if (rc != 0) + if (rc != 1) { succp = false; } } + else + { + LOGIF(LD, (skygw_log_write_flush( + LOGFILE_DEBUG, + "%lu [routeQuery] Couldn't directly send SESSION " + "WRITER command to dcb %p because session command " + "cursor was executing previous command. Added " + "command to the queue.", + pthread_self(), + dcb))); + } + +return_succp: return succp; } @@ -1477,36 +1683,42 @@ static bool execute_sescmd_in_backend( * backend server succeed. Otherwise false. */ static bool cont_exec_sescmd_in_backend( - ROUTER_CLIENT_SES* rses, - backend_type_t be_type) + ROUTER_CLIENT_SES* rses, + backend_type_t be_type) { - bool succp = true; - DCB* dcb; - sescmd_cursor_t* scur; - int rc; - - CHK_CLIENT_RSES(rses); - - dcb = rses->rses_dcb[be_type]; - CHK_DCB(dcb); - - ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); - - scur = rses_get_sescmd_cursor(rses, be_type); - - ss_dassert(sescmd_cursor_is_active(scur)); + DCB* dcb; + bool succp = true; + int rc = 0; + sescmd_cursor_t* scur; + + dcb = rses->rses_dcb[be_type]; + + CHK_DCB(dcb); + CHK_CLIENT_RSES(rses); + ss_dassert(SPINLOCK_IS_LOCKED(&rses->rses_lock)); + scur = rses_get_sescmd_cursor(rses, be_type); + ss_dassert(sescmd_cursor_is_active(scur)); + + /** Return if there are no pending ses commands */ if (scur->scmd_cur_cmd == NULL) { succp = false; + goto return_succp; } - rc = dcb->func.session(dcb, sescmd_cursor_get_querybuf(scur)); - - if (rc != 0) + + LOGIF(LT, tracelog_routed_query(rses, + "cont_exec_sescmd_in_backend", + dcb, + sescmd_cursor_clone_querybuf(scur))); + +// rc = dcb->func.session(dcb, sescmd_cursor_clone_querybuf(scur)); + rc = dcb->func.write(dcb, sescmd_cursor_clone_querybuf(scur)); + if (rc != 1) { succp = false; } - +return_succp: return succp; } @@ -1522,46 +1734,59 @@ static bool sescmd_cursor_next( bool succp = false; rses_property_t* prop_curr; rses_property_t* prop_next; - - ss_dassert(SPINLOCK_IS_LOCKED(&(*scur->scmd_cur_ptr_property)->rses_prop_lock)); - + + ss_dassert(scur != NULL); + ss_dassert(*(scur->scmd_cur_ptr_property) != NULL); + ss_dassert(SPINLOCK_IS_LOCKED( + &(*(scur->scmd_cur_ptr_property))->rses_prop_rsession->rses_lock)); + + /** Illegal situation */ if (scur == NULL || - *(scur->scmd_cur_ptr_property) == NULL || + *scur->scmd_cur_ptr_property == NULL || scur->scmd_cur_cmd == NULL) { - /** Log error to debug */ + /** Log error */ goto return_succp; } -#if defined(SS_DEBUG) prop_curr = *(scur->scmd_cur_ptr_property); - prop_next = prop_curr->rses_prop_next; -#endif - CHK_RSES_PROP(prop_curr); - CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); - CHK_RSES_PROP(scur->scmd_cur_cmd->my_sescmd_prop); - ss_dassert(scur->scmd_cur_cmd->my_sescmd_prop == prop_curr); + CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); + ss_dassert(prop_curr == mysql_sescmd_get_property(scur->scmd_cur_cmd)); + CHK_RSES_PROP(prop_curr); + + /** Copy address of pointer to next property */ + scur->scmd_cur_ptr_property = &(prop_curr->rses_prop_next); + prop_next = *scur->scmd_cur_ptr_property; + ss_dassert(prop_next == *(scur->scmd_cur_ptr_property)); + + /** If there is a next property move forward */ - if ((*scur->scmd_cur_ptr_property)->rses_prop_next != NULL) + if (prop_next != NULL) { - scur->scmd_cur_ptr_property = - &((*scur->scmd_cur_ptr_property)->rses_prop_next); - scur->scmd_cur_cmd = - &(*scur->scmd_cur_ptr_property)->rses_prop_data.sescmd; + CHK_RSES_PROP(prop_next); + CHK_RSES_PROP((*(scur->scmd_cur_ptr_property))); + + /** Get pointer to next property's sescmd */ + scur->scmd_cur_cmd = rses_property_get_sescmd(prop_next); + + ss_dassert(prop_next == scur->scmd_cur_cmd->my_sescmd_prop); + CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); + CHK_RSES_PROP(scur->scmd_cur_cmd->my_sescmd_prop); } else { /** No more properties, can't proceed. */ goto return_succp; } - CHK_RSES_PROP((*(scur->scmd_cur_ptr_property))); - CHK_MYSQL_SESCMD(scur->scmd_cur_cmd); - ss_dassert(prop_next == *(scur->scmd_cur_ptr_property)); if (scur->scmd_cur_cmd != NULL) { - succp = true; - } + succp = true; + } + else + { + /** Log error, sescmd shouldn't be NULL */ + } return_succp: return succp; } @@ -1572,3 +1797,63 @@ static rses_property_t* mysql_sescmd_get_property( CHK_MYSQL_SESCMD(scmd); return scmd->my_sescmd_prop; } + + +static void tracelog_routed_query( + ROUTER_CLIENT_SES* rses, + char* funcname, + DCB* dcb, + GWBUF* buf) +{ + unsigned char* packet = GWBUF_DATA(buf); + unsigned char packet_type = packet[4]; + size_t len; + size_t buflen = GWBUF_LENGTH(buf); + char* querystr; + char* startpos = (char *)&packet[5]; + backend_type_t be_type; + + len = packet[0]; + len += 255*packet[1]; + len += 255*255*packet[2]; + + if (rses->rses_dcb[BE_MASTER] == dcb) + { + be_type = BE_MASTER; + } + else if (rses->rses_dcb[BE_SLAVE] == dcb) + { + be_type = BE_SLAVE; + } + else + { + be_type = BE_UNDEFINED; + } + + if (packet_type == '\x03') + { + querystr = (char *)malloc(len); + memcpy(querystr, startpos, len-1); + querystr[len-1] = '\0'; + LOGIF(LT, (skygw_log_write_flush( + LOGFILE_TRACE, + "%lu [%s] %d bytes long buf, \"%s\" -> %s:%d %s dcb %p", + pthread_self(), + funcname, + buflen, + querystr, + (be_type == BE_MASTER ? + rses->rses_backend[BE_MASTER]->backend_server->name : + (be_type == BE_SLAVE ? + rses->rses_backend[BE_SLAVE]->backend_server->name : + "Target DCB is neither of the backends. This is error")), + (be_type == BE_MASTER ? + rses->rses_backend[BE_MASTER]->backend_server->port : + (be_type == BE_SLAVE ? + rses->rses_backend[BE_SLAVE]->backend_server->port : + -1)), + STRBETYPE(be_type), + dcb))); + } + gwbuf_free(buf); +} \ No newline at end of file diff --git a/utils/skygw_debug.h b/utils/skygw_debug.h index 508e0f28d..8c11d9782 100644 --- a/utils/skygw_debug.h +++ b/utils/skygw_debug.h @@ -216,7 +216,11 @@ typedef enum skygw_chk_t { ((r) == DCB_ROLE_REQUEST_HANDLER ? "DCB_ROLE_REQUEST_HANDLER" : \ "UNKNOWN DCB ROLE")) - +#define STRBETYPE(t) ((t) == BE_MASTER ? "BE_MASTER" : \ + ((t) == BE_SLAVE ? "BE_SLAVE" : \ + ((t) == BE_UNDEFINED ? "BE_UNDEFINED" : \ + "Unknown backend tpe"))) + #define CHK_MLIST(l) { \ ss_info_dassert((l->mlist_chk_top == CHK_NUM_MLIST && \ l->mlist_chk_tail == CHK_NUM_MLIST), \