From 94c37799b1e493310c8b3ca86b5ae406ea8854d8 Mon Sep 17 00:00:00 2001 From: vraatikka Date: Wed, 16 Oct 2013 08:29:22 +0300 Subject: [PATCH] dcb.c dcb_write earlier returned an error (== 0) if errno was not set and other conditions were satisfied. In practice, if write was done in write queue, queue pointer was not updated and dcb_write returned and error. Changed the error detection condition so that it requires errno being set, at least. readwritesplit.c Clean up. --- server/core/dcb.c | 28 +- .../routing/readwritesplit/readwritesplit.c | 247 ++++++++---------- 2 files changed, 130 insertions(+), 145 deletions(-) diff --git a/server/core/dcb.c b/server/core/dcb.c index 0551b4aaf..b7d0fbcd7 100644 --- a/server/core/dcb.c +++ b/server/core/dcb.c @@ -632,9 +632,15 @@ return_n: int dcb_write(DCB *dcb, GWBUF *queue) { -int w, saved_errno = 0; + int w; + int saved_errno = 0; ss_dassert(queue != NULL); + + if (queue == NULL) { + return 0; + } + spinlock_acquire(&dcb->writeqlock); if (dcb->writeq != NULL) @@ -694,11 +700,12 @@ int w, saved_errno = 0; w = gw_write(dcb->fd, GWBUF_DATA(queue), len); dcb->stats.n_writes++; ); - saved_errno = errno; - errno = 0; if (w < 0) { + saved_errno = errno; + errno = 0; + if (saved_errno == EPIPE) { skygw_log_write( LOGFILE_TRACE, @@ -740,16 +747,23 @@ int w, saved_errno = 0; STRDCBSTATE(dcb->state), dcb->fd); } - /* Buffer the balance of any data */ - dcb->writeq = queue; - if (queue) + /** + * What wasn't successfully written is stored to write queue + * for suspended write. + */ + dcb->writeq = queue; + + if (queue != NULL) { dcb->stats.n_buffered++; } } /* if (dcb->writeq) */ spinlock_release(&dcb->writeqlock); - if (queue && (saved_errno != EAGAIN || saved_errno != EWOULDBLOCK)) + if (saved_errno != 0 && + queue != NULL && + saved_errno != EAGAIN && + saved_errno != EWOULDBLOCK) { queue = gwbuf_consume(queue, gwbuf_length(queue)); skygw_log_write_flush( diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index 6de5710ba..53f398214 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -80,7 +80,7 @@ static ROUTER_OBJECT MyObject = { clientReply }; -static SPINLOCK instlock; +static SPINLOCK instlock; static ROUTER_INSTANCE* instances; /** @@ -101,8 +101,9 @@ version() void ModuleInit() { - skygw_log_write_flush(LOGFILE_MESSAGE, - "Initialize read/write split router module.\n"); + skygw_log_write_flush( + LOGFILE_MESSAGE, + "Initializing statemend-based read/write split router module."); spinlock_init(&instlock); instances = NULL; } @@ -115,28 +116,19 @@ ModuleInit() * * @return The module object */ -ROUTER_OBJECT* GetModuleObject() { - skygw_log_write( - LOGFILE_TRACE, - "Returning readwritesplit router module object."); +ROUTER_OBJECT* GetModuleObject() +{ return &MyObject; } /** - * Create an instance of the router for a particular service - * within the gateway. + * Create an instance of read/write statemtn router within the MaxScale. * - * The job of ths entry point is to create the service wide data needed - * for the query router. This is information needed to route queries that - * is not related to any individual client session, exmaples of data that - * might be stored in the ROUTER object for a particular query router are - * connections counts, last used connection etc so that balancing may - * take place. * * @param service The service this router is being create for * @param options The options for this query router * - * @return The instance data for this new instance + * @return NULL in failure, pointer to router in success. */ static ROUTER* createInstance( SERVICE* service, @@ -160,7 +152,6 @@ static ROUTER* createInstance( for (n=0; server != NULL; server=server->nextdb) { n++; } - router->servers = (BACKEND **)calloc(n + 1, sizeof(BACKEND *)); if (router->servers == NULL) @@ -173,7 +164,7 @@ static ROUTER* createInstance( { skygw_log_write_flush( LOGFILE_MESSAGE, - "Router options supplied to read/write split router " + "Router options supplied to read/write statement router " "module but none are supported. The options will be " "ignored."); } @@ -281,8 +272,7 @@ static void* newSession( if (client_rses == NULL) { return NULL; - } - + } /** * Find a backend server to connect to. This is the extent of the * load balancing algorithm we need to implement for this simple @@ -353,78 +343,44 @@ static void closeSession( ROUTER* instance, void* router_session) { -#if 0 - ROUTER_INSTANCE* router; -#endif - ROUTER_CLIENT_SES* rsession; + ROUTER_CLIENT_SES* router_cli_ses; - rsession = (ROUTER_CLIENT_SES *)router_session; -#if 0 - router = (ROUTER_INSTANCE *)instance; - atomic_add(&rsession->be_slave->backend_conn_count, -1); - atomic_add(&rsession->be_master->backend_conn_count, -1); - atomic_add(&rsession->be_slave->backend_server->stats.n_current, -1); - atomic_add(&rsession->be_master->backend_server->stats.n_current, -1); -#endif + router_cli_ses = (ROUTER_CLIENT_SES *)router_session; /** * Close the connection to the backend servers */ - rsession->slave_dcb->func.close(rsession->slave_dcb); - rsession->master_dcb->func.close(rsession->master_dcb); -#if 0 - spinlock_acquire(&router->lock); - if (router->connections == rsession) { - router->connections = rsession->next; - } else { - ROUTER_CLIENT_SES* ptr = router->connections; - - while (ptr && ptr->next != rsession) { - ptr = ptr->next; - } - - if (ptr) { - ptr->next = rsession->next; - } - } - spinlock_release(&router->lock); - - /* - * We are no longer in the linked list, free - * all the memory and other resources associated - * to the client session. - */ - free(rsession); -#endif + router_cli_ses->slave_dcb->func.close(router_cli_ses->slave_dcb); + router_cli_ses->master_dcb->func.close(router_cli_ses->master_dcb); } static void freeSession( ROUTER* router_instance, void* router_client_session) { - ROUTER_CLIENT_SES* rsession; + ROUTER_CLIENT_SES* router_cli_ses; ROUTER_INSTANCE* router; - rsession = (ROUTER_CLIENT_SES *)router_client_session; + router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session; router = (ROUTER_INSTANCE *)router_instance; - atomic_add(&rsession->be_slave->backend_conn_count, -1); - atomic_add(&rsession->be_master->backend_conn_count, -1); - atomic_add(&rsession->be_slave->backend_server->stats.n_current, -1); - atomic_add(&rsession->be_master->backend_server->stats.n_current, -1); + atomic_add(&router_cli_ses->be_slave->backend_conn_count, -1); + atomic_add(&router_cli_ses->be_master->backend_conn_count, -1); + atomic_add(&router_cli_ses->be_slave->backend_server->stats.n_current, -1); + atomic_add(&router_cli_ses->be_master->backend_server->stats.n_current, -1); spinlock_acquire(&router->lock); - if (router->connections == rsession) { - router->connections = rsession->next; + if (router->connections == router_cli_ses) { + router->connections = router_cli_ses->next; } else { ROUTER_CLIENT_SES* ptr = router->connections; - while (ptr && ptr->next != rsession) { + while (ptr && ptr->next != router_cli_ses) { ptr = ptr->next; } if (ptr) { - ptr->next = rsession->next; + ptr->next = router_cli_ses->next; } } spinlock_release(&router->lock); @@ -434,7 +390,7 @@ static void freeSession( * all the memory and other resources associated * to the client session. */ - free(rsession); + free(router_cli_ses); return; } @@ -469,7 +425,7 @@ static int routeQuery( GWBUF *cq = NULL; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; - ROUTER_CLIENT_SES* rsession = (ROUTER_CLIENT_SES *)router_session; + ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; inst->stats.n_queries++; packet = GWBUF_DATA(queue); @@ -480,80 +436,80 @@ static int routeQuery( len += 255*255*packet[2]; switch(packet_type) { - case COM_QUIT: /**< 1 QUIT will close all sessions */ - case COM_INIT_DB: /**< 2 DDL must go to the master */ - case COM_REFRESH: /**< 7 - I guess this is session but not sure */ - case COM_DEBUG: /**< 0d all servers dump debug info to stdout */ - case COM_PING: /**< 0e all servers are pinged */ - case COM_CHANGE_USER: /**< 11 all servers change it accordingly */ + case COM_QUIT: /**< 1 QUIT will close all sessions */ + case COM_INIT_DB: /**< 2 DDL must go to the master */ + case COM_REFRESH: /**< 7 - I guess this is session but not sure */ + case COM_DEBUG: /**< 0d all servers dump debug info to stdout */ + case COM_PING: /**< 0e all servers are pinged */ + case COM_CHANGE_USER: /**< 11 all servers change it accordingly */ qtype = QUERY_TYPE_SESSION_WRITE; break; - - case COM_CREATE_DB: /**< 5 DDL must go to the master */ - case COM_DROP_DB: /**< 6 DDL must go to the master */ + + case COM_CREATE_DB: /**< 5 DDL must go to the master */ + case COM_DROP_DB: /**< 6 DDL must go to the master */ qtype = QUERY_TYPE_WRITE; break; - case COM_QUERY: + case COM_QUERY: querystr = (char *)malloc(len); memcpy(querystr, startpos, len-1); memset(&querystr[len-1], 0, 1); qtype = skygw_query_classifier_get_type(querystr, 0); break; - - default: - case COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */ - case COM_STATISTICS: /**< 9 ? */ - case COM_PROCESS_INFO: /**< 0a ? */ - case COM_CONNECT: /**< 0b ? */ - case COM_PROCESS_KILL: /**< 0c ? */ - case COM_TIME: /**< 0f should this be run in gateway ? */ - case COM_DELAYED_INSERT: /**< 10 ? */ - case COM_DAEMON: /**< 1d ? */ + + case COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */ + case COM_STATISTICS: /**< 9 ? */ + case COM_PROCESS_INFO: /**< 0a ? */ + case COM_CONNECT: /**< 0b ? */ + case COM_PROCESS_KILL: /**< 0c ? */ + case COM_TIME: /**< 0f should this be run in gateway ? */ + case COM_DELAYED_INSERT: /**< 10 ? */ + case COM_DAEMON: /**< 1d ? */ + default: break; - } - -#if defined(SS_DEBUG_) - skygw_log_write(NULL, LOGFILE_TRACE, "String\t\"%s\"", querystr); - skygw_log_write(NULL, - LOGFILE_TRACE, + } /**< switch by packet type */ + + skygw_log_write(LOGFILE_TRACE, "String\t\"%s\"", querystr); + skygw_log_write(LOGFILE_TRACE, "Packet type\t%s", STRPACKETTYPE(packet_type)); -#endif switch (qtype) { case QUERY_TYPE_WRITE: -#if defined(SS_DEBUG_) - skygw_log_write(NULL, - LOGFILE_TRACE, - "Query type\t%s, routing to Master.", - STRQTYPE(qtype)); -#endif - ret = rsession->master_dcb->func.write(rsession->master_dcb, queue); + skygw_log_write(LOGFILE_TRACE, + "%lu [routeQuery:rwsplit] Query type\t%s, routing to " + "Master.", + pthread_self(), + STRQTYPE(qtype)); + + ret = router_cli_ses->master_dcb->func.write( + router_cli_ses->master_dcb, + queue); atomic_add(&inst->stats.n_master, 1); goto return_ret; break; case QUERY_TYPE_READ: -#if defined(SS_DEBUG_) - skygw_log_write(NULL, - LOGFILE_TRACE, - "Query type\t%s, routing to Slave.", + skygw_log_write(LOGFILE_TRACE, + "%lu [routeQuery:rwsplit] Query type\t%s, routing " + "to Slave.", + pthread_self(), STRQTYPE(qtype)); -#endif - ret = rsession->slave_dcb->func.write(rsession->slave_dcb, queue); + + ret = router_cli_ses->slave_dcb->func.write( + router_cli_ses->slave_dcb, + queue); atomic_add(&inst->stats.n_slave, 1); goto return_ret; break; case QUERY_TYPE_SESSION_WRITE: -#if defined(SS_DEBUG_) - skygw_log_write(NULL, - LOGFILE_TRACE, - "Query type\t%s, routing to All servers.", + skygw_log_write(LOGFILE_TRACE, + "%lu [routeQuery:rwsplit] Query type\t%s, " + "routing to All servers.", + pthread_self(), STRQTYPE(qtype)); -#endif /** * TODO! Connection to all servers must be established, and * the command must be executed in them. @@ -563,40 +519,54 @@ static int routeQuery( switch(packet_type) { case COM_QUIT: - ret = rsession->master_dcb->func.write(rsession->master_dcb, queue); - rsession->slave_dcb->func.write(rsession->slave_dcb, cq); + ret = router_cli_ses->master_dcb->func.write( + router_cli_ses->master_dcb, + queue); + router_cli_ses->slave_dcb->func.write( + router_cli_ses->slave_dcb, + cq); break; case COM_CHANGE_USER: - rsession->master_dcb->func.auth(rsession->master_dcb, NULL, rsession->master_dcb->session, queue); - rsession->slave_dcb->func.auth(rsession->slave_dcb, NULL, rsession->master_dcb->session, cq); + router_cli_ses->master_dcb->func.auth( + router_cli_ses->master_dcb, + NULL, + router_cli_ses->master_dcb->session, + queue); + router_cli_ses->slave_dcb->func.auth( + router_cli_ses->slave_dcb, + NULL, + router_cli_ses->master_dcb->session, + cq); break; default: - ret = rsession->master_dcb->func.session( - rsession->master_dcb, + ret = router_cli_ses->master_dcb->func.session( + router_cli_ses->master_dcb, (void *)queue); - rsession->slave_dcb->func.session( - rsession->slave_dcb, + router_cli_ses->slave_dcb->func.session( + router_cli_ses->slave_dcb, (void *)cq); break; - } + } /**< switch by packet type */ atomic_add(&inst->stats.n_all, 1); goto return_ret; break; default: -#if defined(SS_DEBUG_) - skygw_log_write(NULL, - LOGFILE_TRACE, - "Query type\t%s, routing to Master by default.", + skygw_log_write(LOGFILE_TRACE, + "%lu [routeQuery:rwsplit] Query type\t%s, " + "routing to Master by default.", + pthread_self(), STRQTYPE(qtype)); -#endif + /** Is this really ok? */ - ret = rsession->master_dcb->func.write(rsession->master_dcb, queue); + ret = router_cli_ses->master_dcb->func.write( + router_cli_ses->master_dcb, + queue); atomic_add(&inst->stats.n_master, 1); goto return_ret; break; - } + } /**< switch by query type */ return_ret: free(querystr); @@ -614,16 +584,16 @@ return_ret: static void diagnostic(ROUTER *instance, DCB *dcb) { -ROUTER_CLIENT_SES *rsession; +ROUTER_CLIENT_SES *router_cli_ses; ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; int i = 0; spinlock_acquire(&router->lock); - rsession = router->connections; - while (rsession) + router_cli_ses = router->connections; + while (router_cli_ses) { i++; - rsession = rsession->next; + router_cli_ses = router_cli_ses->next; } spinlock_release(&router->lock); @@ -665,10 +635,11 @@ static void clientReply( { DCB* master_dcb; DCB* client_dcb; - ROUTER_CLIENT_SES* rsession; + ROUTER_CLIENT_SES* router_cli_ses; - rsession = (ROUTER_CLIENT_SES *)router_session; - master_dcb = rsession->master_dcb; + router_cli_ses = (ROUTER_CLIENT_SES *)router_session; + ss_dassert(router_cli_ses != NULL); + master_dcb = router_cli_ses->master_dcb; client_dcb = backend_dcb->session->client; if (backend_dcb->command == ROUTER_CHANGE_SESSION) {