dcb_write earlier returned an error (== 0) if errno was not set and other conditions were satisfied. In practice, if write was done in write queue, queue pointer was not updated and dcb_write returned and error. Changed the error detection condition so that it requires errno being set, at least.

readwritesplit.c
	Clean up.
This commit is contained in:
vraatikka
2013-10-16 08:29:22 +03:00
parent 4db366290d
commit 94c37799b1
2 changed files with 130 additions and 145 deletions

View File

@ -632,9 +632,15 @@ return_n:
int int
dcb_write(DCB *dcb, GWBUF *queue) dcb_write(DCB *dcb, GWBUF *queue)
{ {
int w, saved_errno = 0; int w;
int saved_errno = 0;
ss_dassert(queue != NULL); ss_dassert(queue != NULL);
if (queue == NULL) {
return 0;
}
spinlock_acquire(&dcb->writeqlock); spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq != NULL) if (dcb->writeq != NULL)
@ -694,11 +700,12 @@ int w, saved_errno = 0;
w = gw_write(dcb->fd, GWBUF_DATA(queue), len); w = gw_write(dcb->fd, GWBUF_DATA(queue), len);
dcb->stats.n_writes++; dcb->stats.n_writes++;
); );
saved_errno = errno;
errno = 0;
if (w < 0) if (w < 0)
{ {
saved_errno = errno;
errno = 0;
if (saved_errno == EPIPE) { if (saved_errno == EPIPE) {
skygw_log_write( skygw_log_write(
LOGFILE_TRACE, LOGFILE_TRACE,
@ -740,16 +747,23 @@ int w, saved_errno = 0;
STRDCBSTATE(dcb->state), STRDCBSTATE(dcb->state),
dcb->fd); dcb->fd);
} }
/* Buffer the balance of any data */ /**
dcb->writeq = queue; * What wasn't successfully written is stored to write queue
if (queue) * for suspended write.
*/
dcb->writeq = queue;
if (queue != NULL)
{ {
dcb->stats.n_buffered++; dcb->stats.n_buffered++;
} }
} /* if (dcb->writeq) */ } /* if (dcb->writeq) */
spinlock_release(&dcb->writeqlock); spinlock_release(&dcb->writeqlock);
if (queue && (saved_errno != EAGAIN || saved_errno != EWOULDBLOCK)) if (saved_errno != 0 &&
queue != NULL &&
saved_errno != EAGAIN &&
saved_errno != EWOULDBLOCK)
{ {
queue = gwbuf_consume(queue, gwbuf_length(queue)); queue = gwbuf_consume(queue, gwbuf_length(queue));
skygw_log_write_flush( skygw_log_write_flush(

View File

@ -80,7 +80,7 @@ static ROUTER_OBJECT MyObject = {
clientReply clientReply
}; };
static SPINLOCK instlock; static SPINLOCK instlock;
static ROUTER_INSTANCE* instances; static ROUTER_INSTANCE* instances;
/** /**
@ -101,8 +101,9 @@ version()
void void
ModuleInit() ModuleInit()
{ {
skygw_log_write_flush(LOGFILE_MESSAGE, skygw_log_write_flush(
"Initialize read/write split router module.\n"); LOGFILE_MESSAGE,
"Initializing statemend-based read/write split router module.");
spinlock_init(&instlock); spinlock_init(&instlock);
instances = NULL; instances = NULL;
} }
@ -115,28 +116,19 @@ ModuleInit()
* *
* @return The module object * @return The module object
*/ */
ROUTER_OBJECT* GetModuleObject() { ROUTER_OBJECT* GetModuleObject()
skygw_log_write( {
LOGFILE_TRACE,
"Returning readwritesplit router module object.");
return &MyObject; return &MyObject;
} }
/** /**
* Create an instance of the router for a particular service * Create an instance of read/write statemtn router within the MaxScale.
* within the gateway.
* *
* The job of ths entry point is to create the service wide data needed
* for the query router. This is information needed to route queries that
* is not related to any individual client session, exmaples of data that
* might be stored in the ROUTER object for a particular query router are
* connections counts, last used connection etc so that balancing may
* take place.
* *
* @param service The service this router is being create for * @param service The service this router is being create for
* @param options The options for this query router * @param options The options for this query router
* *
* @return The instance data for this new instance * @return NULL in failure, pointer to router in success.
*/ */
static ROUTER* createInstance( static ROUTER* createInstance(
SERVICE* service, SERVICE* service,
@ -160,7 +152,6 @@ static ROUTER* createInstance(
for (n=0; server != NULL; server=server->nextdb) { for (n=0; server != NULL; server=server->nextdb) {
n++; n++;
} }
router->servers = (BACKEND **)calloc(n + 1, sizeof(BACKEND *)); router->servers = (BACKEND **)calloc(n + 1, sizeof(BACKEND *));
if (router->servers == NULL) if (router->servers == NULL)
@ -173,7 +164,7 @@ static ROUTER* createInstance(
{ {
skygw_log_write_flush( skygw_log_write_flush(
LOGFILE_MESSAGE, LOGFILE_MESSAGE,
"Router options supplied to read/write split router " "Router options supplied to read/write statement router "
"module but none are supported. The options will be " "module but none are supported. The options will be "
"ignored."); "ignored.");
} }
@ -281,8 +272,7 @@ static void* newSession(
if (client_rses == NULL) if (client_rses == NULL)
{ {
return NULL; return NULL;
} }
/** /**
* Find a backend server to connect to. This is the extent of the * Find a backend server to connect to. This is the extent of the
* load balancing algorithm we need to implement for this simple * load balancing algorithm we need to implement for this simple
@ -353,78 +343,44 @@ static void closeSession(
ROUTER* instance, ROUTER* instance,
void* router_session) void* router_session)
{ {
#if 0 ROUTER_CLIENT_SES* router_cli_ses;
ROUTER_INSTANCE* router;
#endif
ROUTER_CLIENT_SES* rsession;
rsession = (ROUTER_CLIENT_SES *)router_session; router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
#if 0
router = (ROUTER_INSTANCE *)instance;
atomic_add(&rsession->be_slave->backend_conn_count, -1);
atomic_add(&rsession->be_master->backend_conn_count, -1);
atomic_add(&rsession->be_slave->backend_server->stats.n_current, -1);
atomic_add(&rsession->be_master->backend_server->stats.n_current, -1);
#endif
/** /**
* Close the connection to the backend servers * Close the connection to the backend servers
*/ */
rsession->slave_dcb->func.close(rsession->slave_dcb); router_cli_ses->slave_dcb->func.close(router_cli_ses->slave_dcb);
rsession->master_dcb->func.close(rsession->master_dcb); router_cli_ses->master_dcb->func.close(router_cli_ses->master_dcb);
#if 0
spinlock_acquire(&router->lock);
if (router->connections == rsession) {
router->connections = rsession->next;
} else {
ROUTER_CLIENT_SES* ptr = router->connections;
while (ptr && ptr->next != rsession) {
ptr = ptr->next;
}
if (ptr) {
ptr->next = rsession->next;
}
}
spinlock_release(&router->lock);
/*
* We are no longer in the linked list, free
* all the memory and other resources associated
* to the client session.
*/
free(rsession);
#endif
} }
static void freeSession( static void freeSession(
ROUTER* router_instance, ROUTER* router_instance,
void* router_client_session) void* router_client_session)
{ {
ROUTER_CLIENT_SES* rsession; ROUTER_CLIENT_SES* router_cli_ses;
ROUTER_INSTANCE* router; ROUTER_INSTANCE* router;
rsession = (ROUTER_CLIENT_SES *)router_client_session; router_cli_ses = (ROUTER_CLIENT_SES *)router_client_session;
router = (ROUTER_INSTANCE *)router_instance; router = (ROUTER_INSTANCE *)router_instance;
atomic_add(&rsession->be_slave->backend_conn_count, -1); atomic_add(&router_cli_ses->be_slave->backend_conn_count, -1);
atomic_add(&rsession->be_master->backend_conn_count, -1); atomic_add(&router_cli_ses->be_master->backend_conn_count, -1);
atomic_add(&rsession->be_slave->backend_server->stats.n_current, -1); atomic_add(&router_cli_ses->be_slave->backend_server->stats.n_current, -1);
atomic_add(&rsession->be_master->backend_server->stats.n_current, -1); atomic_add(&router_cli_ses->be_master->backend_server->stats.n_current, -1);
spinlock_acquire(&router->lock); spinlock_acquire(&router->lock);
if (router->connections == rsession) { if (router->connections == router_cli_ses) {
router->connections = rsession->next; router->connections = router_cli_ses->next;
} else { } else {
ROUTER_CLIENT_SES* ptr = router->connections; ROUTER_CLIENT_SES* ptr = router->connections;
while (ptr && ptr->next != rsession) { while (ptr && ptr->next != router_cli_ses) {
ptr = ptr->next; ptr = ptr->next;
} }
if (ptr) { if (ptr) {
ptr->next = rsession->next; ptr->next = router_cli_ses->next;
} }
} }
spinlock_release(&router->lock); spinlock_release(&router->lock);
@ -434,7 +390,7 @@ static void freeSession(
* all the memory and other resources associated * all the memory and other resources associated
* to the client session. * to the client session.
*/ */
free(rsession); free(router_cli_ses);
return; return;
} }
@ -469,7 +425,7 @@ static int routeQuery(
GWBUF *cq = NULL; GWBUF *cq = NULL;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* rsession = (ROUTER_CLIENT_SES *)router_session; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
inst->stats.n_queries++; inst->stats.n_queries++;
packet = GWBUF_DATA(queue); packet = GWBUF_DATA(queue);
@ -480,80 +436,80 @@ static int routeQuery(
len += 255*255*packet[2]; len += 255*255*packet[2];
switch(packet_type) { switch(packet_type) {
case COM_QUIT: /**< 1 QUIT will close all sessions */ case COM_QUIT: /**< 1 QUIT will close all sessions */
case COM_INIT_DB: /**< 2 DDL must go to the master */ case COM_INIT_DB: /**< 2 DDL must go to the master */
case COM_REFRESH: /**< 7 - I guess this is session but not sure */ case COM_REFRESH: /**< 7 - I guess this is session but not sure */
case COM_DEBUG: /**< 0d all servers dump debug info to stdout */ case COM_DEBUG: /**< 0d all servers dump debug info to stdout */
case COM_PING: /**< 0e all servers are pinged */ case COM_PING: /**< 0e all servers are pinged */
case COM_CHANGE_USER: /**< 11 all servers change it accordingly */ case COM_CHANGE_USER: /**< 11 all servers change it accordingly */
qtype = QUERY_TYPE_SESSION_WRITE; qtype = QUERY_TYPE_SESSION_WRITE;
break; break;
case COM_CREATE_DB: /**< 5 DDL must go to the master */ case COM_CREATE_DB: /**< 5 DDL must go to the master */
case COM_DROP_DB: /**< 6 DDL must go to the master */ case COM_DROP_DB: /**< 6 DDL must go to the master */
qtype = QUERY_TYPE_WRITE; qtype = QUERY_TYPE_WRITE;
break; break;
case COM_QUERY: case COM_QUERY:
querystr = (char *)malloc(len); querystr = (char *)malloc(len);
memcpy(querystr, startpos, len-1); memcpy(querystr, startpos, len-1);
memset(&querystr[len-1], 0, 1); memset(&querystr[len-1], 0, 1);
qtype = skygw_query_classifier_get_type(querystr, 0); qtype = skygw_query_classifier_get_type(querystr, 0);
break; break;
default: case COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
case COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */ case COM_STATISTICS: /**< 9 ? */
case COM_STATISTICS: /**< 9 ? */ case COM_PROCESS_INFO: /**< 0a ? */
case COM_PROCESS_INFO: /**< 0a ? */ case COM_CONNECT: /**< 0b ? */
case COM_CONNECT: /**< 0b ? */ case COM_PROCESS_KILL: /**< 0c ? */
case COM_PROCESS_KILL: /**< 0c ? */ case COM_TIME: /**< 0f should this be run in gateway ? */
case COM_TIME: /**< 0f should this be run in gateway ? */ case COM_DELAYED_INSERT: /**< 10 ? */
case COM_DELAYED_INSERT: /**< 10 ? */ case COM_DAEMON: /**< 1d ? */
case COM_DAEMON: /**< 1d ? */ default:
break; break;
} } /**< switch by packet type */
#if defined(SS_DEBUG_) skygw_log_write(LOGFILE_TRACE, "String\t\"%s\"", querystr);
skygw_log_write(NULL, LOGFILE_TRACE, "String\t\"%s\"", querystr); skygw_log_write(LOGFILE_TRACE,
skygw_log_write(NULL,
LOGFILE_TRACE,
"Packet type\t%s", "Packet type\t%s",
STRPACKETTYPE(packet_type)); STRPACKETTYPE(packet_type));
#endif
switch (qtype) { switch (qtype) {
case QUERY_TYPE_WRITE: case QUERY_TYPE_WRITE:
#if defined(SS_DEBUG_) skygw_log_write(LOGFILE_TRACE,
skygw_log_write(NULL, "%lu [routeQuery:rwsplit] Query type\t%s, routing to "
LOGFILE_TRACE, "Master.",
"Query type\t%s, routing to Master.", pthread_self(),
STRQTYPE(qtype)); STRQTYPE(qtype));
#endif
ret = rsession->master_dcb->func.write(rsession->master_dcb, queue); ret = router_cli_ses->master_dcb->func.write(
router_cli_ses->master_dcb,
queue);
atomic_add(&inst->stats.n_master, 1); atomic_add(&inst->stats.n_master, 1);
goto return_ret; goto return_ret;
break; break;
case QUERY_TYPE_READ: case QUERY_TYPE_READ:
#if defined(SS_DEBUG_) skygw_log_write(LOGFILE_TRACE,
skygw_log_write(NULL, "%lu [routeQuery:rwsplit] Query type\t%s, routing "
LOGFILE_TRACE, "to Slave.",
"Query type\t%s, routing to Slave.", pthread_self(),
STRQTYPE(qtype)); STRQTYPE(qtype));
#endif
ret = rsession->slave_dcb->func.write(rsession->slave_dcb, queue); ret = router_cli_ses->slave_dcb->func.write(
router_cli_ses->slave_dcb,
queue);
atomic_add(&inst->stats.n_slave, 1); atomic_add(&inst->stats.n_slave, 1);
goto return_ret; goto return_ret;
break; break;
case QUERY_TYPE_SESSION_WRITE: case QUERY_TYPE_SESSION_WRITE:
#if defined(SS_DEBUG_) skygw_log_write(LOGFILE_TRACE,
skygw_log_write(NULL, "%lu [routeQuery:rwsplit] Query type\t%s, "
LOGFILE_TRACE, "routing to All servers.",
"Query type\t%s, routing to All servers.", pthread_self(),
STRQTYPE(qtype)); STRQTYPE(qtype));
#endif
/** /**
* TODO! Connection to all servers must be established, and * TODO! Connection to all servers must be established, and
* the command must be executed in them. * the command must be executed in them.
@ -563,40 +519,54 @@ static int routeQuery(
switch(packet_type) { switch(packet_type) {
case COM_QUIT: case COM_QUIT:
ret = rsession->master_dcb->func.write(rsession->master_dcb, queue); ret = router_cli_ses->master_dcb->func.write(
rsession->slave_dcb->func.write(rsession->slave_dcb, cq); router_cli_ses->master_dcb,
queue);
router_cli_ses->slave_dcb->func.write(
router_cli_ses->slave_dcb,
cq);
break; break;
case COM_CHANGE_USER: case COM_CHANGE_USER:
rsession->master_dcb->func.auth(rsession->master_dcb, NULL, rsession->master_dcb->session, queue); router_cli_ses->master_dcb->func.auth(
rsession->slave_dcb->func.auth(rsession->slave_dcb, NULL, rsession->master_dcb->session, cq); router_cli_ses->master_dcb,
NULL,
router_cli_ses->master_dcb->session,
queue);
router_cli_ses->slave_dcb->func.auth(
router_cli_ses->slave_dcb,
NULL,
router_cli_ses->master_dcb->session,
cq);
break; break;
default: default:
ret = rsession->master_dcb->func.session( ret = router_cli_ses->master_dcb->func.session(
rsession->master_dcb, router_cli_ses->master_dcb,
(void *)queue); (void *)queue);
rsession->slave_dcb->func.session( router_cli_ses->slave_dcb->func.session(
rsession->slave_dcb, router_cli_ses->slave_dcb,
(void *)cq); (void *)cq);
break; break;
} } /**< switch by packet type */
atomic_add(&inst->stats.n_all, 1); atomic_add(&inst->stats.n_all, 1);
goto return_ret; goto return_ret;
break; break;
default: default:
#if defined(SS_DEBUG_) skygw_log_write(LOGFILE_TRACE,
skygw_log_write(NULL, "%lu [routeQuery:rwsplit] Query type\t%s, "
LOGFILE_TRACE, "routing to Master by default.",
"Query type\t%s, routing to Master by default.", pthread_self(),
STRQTYPE(qtype)); STRQTYPE(qtype));
#endif
/** Is this really ok? */ /** Is this really ok? */
ret = rsession->master_dcb->func.write(rsession->master_dcb, queue); ret = router_cli_ses->master_dcb->func.write(
router_cli_ses->master_dcb,
queue);
atomic_add(&inst->stats.n_master, 1); atomic_add(&inst->stats.n_master, 1);
goto return_ret; goto return_ret;
break; break;
} } /**< switch by query type */
return_ret: return_ret:
free(querystr); free(querystr);
@ -614,16 +584,16 @@ return_ret:
static void static void
diagnostic(ROUTER *instance, DCB *dcb) diagnostic(ROUTER *instance, DCB *dcb)
{ {
ROUTER_CLIENT_SES *rsession; ROUTER_CLIENT_SES *router_cli_ses;
ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance; ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
int i = 0; int i = 0;
spinlock_acquire(&router->lock); spinlock_acquire(&router->lock);
rsession = router->connections; router_cli_ses = router->connections;
while (rsession) while (router_cli_ses)
{ {
i++; i++;
rsession = rsession->next; router_cli_ses = router_cli_ses->next;
} }
spinlock_release(&router->lock); spinlock_release(&router->lock);
@ -665,10 +635,11 @@ static void clientReply(
{ {
DCB* master_dcb; DCB* master_dcb;
DCB* client_dcb; DCB* client_dcb;
ROUTER_CLIENT_SES* rsession; ROUTER_CLIENT_SES* router_cli_ses;
rsession = (ROUTER_CLIENT_SES *)router_session; router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
master_dcb = rsession->master_dcb; ss_dassert(router_cli_ses != NULL);
master_dcb = router_cli_ses->master_dcb;
client_dcb = backend_dcb->session->client; client_dcb = backend_dcb->session->client;
if (backend_dcb->command == ROUTER_CHANGE_SESSION) { if (backend_dcb->command == ROUTER_CHANGE_SESSION) {