Reindent server/core/dcb.c

This commit is contained in:
Johan Wikman
2015-11-30 12:25:59 +02:00
parent 17760bb3e6
commit 2afe60dd0e
2 changed files with 1648 additions and 1519 deletions

View File

@ -172,7 +172,7 @@ dcb_get_zombies(void)
DCB * DCB *
dcb_alloc(dcb_role_t role) dcb_alloc(dcb_role_t role)
{ {
DCB *newdcb; DCB *newdcb;
if ((newdcb = calloc(1, sizeof(DCB))) == NULL) if ((newdcb = calloc(1, sizeof(DCB))) == NULL)
{ {
@ -223,7 +223,9 @@ DCB *newdcb;
spinlock_acquire(&dcbspin); spinlock_acquire(&dcbspin);
if (allDCBs == NULL) if (allDCBs == NULL)
{
allDCBs = newdcb; allDCBs = newdcb;
}
else else
{ {
DCB *ptr = allDCBs; DCB *ptr = allDCBs;
@ -232,7 +234,10 @@ DCB *newdcb;
ptr->next = newdcb; ptr->next = newdcb;
} }
nDCBs++; nDCBs++;
if (nDCBs > maxDCBs) maxDCBs = nDCBs; if (nDCBs > maxDCBs)
{
maxDCBs = nDCBs;
}
spinlock_release(&dcbspin); spinlock_release(&dcbspin);
return newdcb; return newdcb;
} }
@ -260,7 +265,7 @@ dcb_free(DCB *dcb)
DCB * DCB *
dcb_clone(DCB *orig) dcb_clone(DCB *orig)
{ {
DCB *clonedcb; DCB *clonedcb;
if ((clonedcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER))) if ((clonedcb = dcb_alloc(DCB_ROLE_REQUEST_HANDLER)))
{ {
@ -269,9 +274,13 @@ DCB *clonedcb;
clonedcb->state = orig->state; clonedcb->state = orig->state;
clonedcb->data = orig->data; clonedcb->data = orig->data;
if (orig->remote) if (orig->remote)
{
clonedcb->remote = strdup(orig->remote); clonedcb->remote = strdup(orig->remote);
}
if (orig->user) if (orig->user)
{
clonedcb->user = strdup(orig->user); clonedcb->user = strdup(orig->user);
}
clonedcb->protocol = orig->protocol; clonedcb->protocol = orig->protocol;
clonedcb->func.write = dcb_null_write; clonedcb->func.write = dcb_null_write;
@ -327,10 +336,14 @@ dcb_final_free(DCB *dcb)
*/ */
DCB *ptr = allDCBs; DCB *ptr = allDCBs;
while (ptr && ptr->next != dcb) while (ptr && ptr->next != dcb)
{
ptr = ptr->next; ptr = ptr->next;
}
if (ptr) if (ptr)
{
ptr->next = dcb->next; ptr->next = dcb->next;
} }
}
nDCBs--; nDCBs--;
spinlock_release(&dcbspin); spinlock_release(&dcbspin);
@ -359,29 +372,48 @@ dcb_final_free(DCB *dcb)
} }
if (dcb->protocol && (!DCB_IS_CLONE(dcb))) if (dcb->protocol && (!DCB_IS_CLONE(dcb)))
{
free(dcb->protocol); free(dcb->protocol);
}
if (dcb->protoname) if (dcb->protoname)
{
free(dcb->protoname); free(dcb->protoname);
}
if (dcb->remote) if (dcb->remote)
{
free(dcb->remote); free(dcb->remote);
}
if (dcb->user) if (dcb->user)
{
free(dcb->user); free(dcb->user);
}
/* Clear write and read buffers */ /* Clear write and read buffers */
if (dcb->delayq) { if (dcb->delayq)
{
GWBUF *queue = dcb->delayq; GWBUF *queue = dcb->delayq;
while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL); while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL)
{
;
}
dcb->delayq = NULL; dcb->delayq = NULL;
} }
if (dcb->writeq) { if (dcb->writeq)
{
GWBUF *queue = dcb->writeq; GWBUF *queue = dcb->writeq;
while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL); while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL)
{
;
}
dcb->writeq = NULL; dcb->writeq = NULL;
} }
if (dcb->dcb_readqueue) if (dcb->dcb_readqueue)
{ {
GWBUF* queue = dcb->dcb_readqueue; GWBUF* queue = dcb->dcb_readqueue;
while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL); while ((queue = gwbuf_consume(queue, GWBUF_LENGTH(queue))) != NULL)
{
;
}
dcb->dcb_readqueue = NULL; dcb->dcb_readqueue = NULL;
} }
@ -392,8 +424,10 @@ dcb_final_free(DCB *dcb)
free(cb); free(cb);
} }
spinlock_release(&dcb->cb_lock); spinlock_release(&dcb->cb_lock);
if(dcb->ssl) if (dcb->ssl)
{
SSL_free(dcb->ssl); SSL_free(dcb->ssl);
}
bitmask_free(&dcb->memdata.bitmask); bitmask_free(&dcb->memdata.bitmask);
free(dcb); free(dcb);
} }
@ -413,9 +447,9 @@ dcb_final_free(DCB *dcb)
DCB * DCB *
dcb_process_zombies(int threadid) dcb_process_zombies(int threadid)
{ {
DCB *zombiedcb; DCB *zombiedcb;
DCB *previousdcb = NULL, *nextdcb; DCB *previousdcb = NULL, *nextdcb;
DCB *listofdcb = NULL; DCB *listofdcb = NULL;
/** /**
* Perform a dirty read to see if there is anything in the queue. * Perform a dirty read to see if there is anything in the queue.
@ -550,7 +584,8 @@ dcb_process_victim_queue(DCB *listofdcb)
dcb, dcb,
STRDCBSTATE(dcb->state)); STRDCBSTATE(dcb->state));
} }
else { else
{
/* Must be DCB_STATE_POLLING */ /* Must be DCB_STATE_POLLING */
spinlock_release(&dcb->dcb_initlock); spinlock_release(&dcb->dcb_initlock);
if (0 == dcb->persistentstart && dcb_maybe_add_persistent(dcb)) if (0 == dcb->persistentstart && dcb_maybe_add_persistent(dcb))
@ -569,7 +604,10 @@ dcb_process_victim_queue(DCB *listofdcb)
dcb->memdata.next = zombies; dcb->memdata.next = zombies;
zombies = dcb; zombies = dcb;
nzombies++; nzombies++;
if (nzombies > maxzombies) maxzombies = nzombies; if (nzombies > maxzombies)
{
maxzombies = nzombies;
}
spinlock_release(&zombiespin); spinlock_release(&zombiespin);
dcb = nextdcb; dcb = nextdcb;
continue; continue;
@ -638,7 +676,7 @@ dcb_process_victim_queue(DCB *listofdcb)
* @param dcb The DCB to be processed * @param dcb The DCB to be processed
*/ */
static void static void
dcb_stop_polling_and_shutdown (DCB *dcb) dcb_stop_polling_and_shutdown(DCB *dcb)
{ {
poll_remove_dcb(dcb); poll_remove_dcb(dcb);
/** /**
@ -735,7 +773,8 @@ dcb_connect(SERVER *server, SESSION *session, const char *protocol)
} }
fd = dcb->func.connect(dcb, server, session); fd = dcb->func.connect(dcb, server, session);
if (fd == DCBFD_CLOSED) { if (fd == DCBFD_CLOSED)
{
MXS_DEBUG("%lu [dcb_connect] Failed to connect to server %s:%d, " MXS_DEBUG("%lu [dcb_connect] Failed to connect to server %s:%d, "
"from backend dcb %p, client dcp %p fd %d.", "from backend dcb %p, client dcp %p fd %d.",
pthread_self(), pthread_self(),
@ -747,7 +786,9 @@ dcb_connect(SERVER *server, SESSION *session, const char *protocol)
dcb->state = DCB_STATE_DISCONNECTED; dcb->state = DCB_STATE_DISCONNECTED;
dcb_final_free(dcb); dcb_final_free(dcb);
return NULL; return NULL;
} else { }
else
{
MXS_DEBUG("%lu [dcb_connect] Connected to server %s:%d, " MXS_DEBUG("%lu [dcb_connect] Connected to server %s:%d, "
"from backend dcb %p, client dcp %p fd %d.", "from backend dcb %p, client dcp %p fd %d.",
pthread_self(), pthread_self(),
@ -783,7 +824,8 @@ dcb_connect(SERVER *server, SESSION *session, const char *protocol)
*/ */
rc = poll_add_dcb(dcb); rc = poll_add_dcb(dcb);
if (rc) { if (rc)
{
dcb->state = DCB_STATE_DISCONNECTED; dcb->state = DCB_STATE_DISCONNECTED;
dcb_final_free(dcb); dcb_final_free(dcb);
return NULL; return NULL;
@ -810,8 +852,7 @@ dcb_connect(SERVER *server, SESSION *session, const char *protocol)
* @return -1 on error, otherwise the number of read bytes on * @return -1 on error, otherwise the number of read bytes on
* the last iteration of while loop. 0 is returned if no data available. * the last iteration of while loop. 0 is returned if no data available.
*/ */
int dcb_read( int dcb_read(DCB *dcb,
DCB *dcb,
GWBUF **head, GWBUF **head,
int maxbytes) int maxbytes)
{ {
@ -879,7 +920,10 @@ int dcb_read(
dcb->last_read = hkheartbeat; dcb->last_read = hkheartbeat;
bufsize = MIN(bytesavailable, MAX_BUFFER_SIZE); bufsize = MIN(bytesavailable, MAX_BUFFER_SIZE);
if (maxbytes) bufsize = MIN(bufsize, maxbytes-nreadtotal); if (maxbytes)
{
bufsize = MIN(bufsize, maxbytes-nreadtotal);
}
if ((buffer = gwbuf_alloc(bufsize)) == NULL) if ((buffer = gwbuf_alloc(bufsize)) == NULL)
{ {
@ -949,8 +993,7 @@ int dcb_read(
* @return -1 on error, otherwise the number of read bytes on the last * @return -1 on error, otherwise the number of read bytes on the last
* iteration of while loop. 0 is returned if no data available. * iteration of while loop. 0 is returned if no data available.
*/ */
int dcb_read_SSL(DCB *dcb, int dcb_read_SSL(DCB *dcb, GWBUF **head)
GWBUF **head)
{ {
GWBUF *buffer = NULL; GWBUF *buffer = NULL;
int b, n, nread = 0; int b, n, nread = 0;
@ -1055,12 +1098,15 @@ int dcb_read_SSL(DCB *dcb,
int int
dcb_write(DCB *dcb, GWBUF *queue) dcb_write(DCB *dcb, GWBUF *queue)
{ {
int written; int written;
int below_water; int below_water;
below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0; below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0;
// The following guarantees that queue is not NULL // The following guarantees that queue is not NULL
if (!dcb_write_parameter_check(dcb, queue)) return 0; if (!dcb_write_parameter_check(dcb, queue))
{
return 0;
}
spinlock_acquire(&dcb->writeqlock); spinlock_acquire(&dcb->writeqlock);
if (dcb->writeq) if (dcb->writeq)
@ -1119,7 +1165,6 @@ int below_water;
STRDCBSTATE(dcb->state), STRDCBSTATE(dcb->state),
dcb->fd); dcb->fd);
} /*< while (queue != NULL) */ } /*< while (queue != NULL) */
} /* if (dcb->writeq) */ } /* if (dcb->writeq) */
dcb_write_tidy_up(dcb, below_water); dcb_write_tidy_up(dcb, below_water);
@ -1137,8 +1182,7 @@ int below_water;
static inline void static inline void
dcb_write_fake_code(DCB *dcb) dcb_write_fake_code(DCB *dcb)
{ {
if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER && if (dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER && dcb->session != NULL)
dcb->session != NULL)
{ {
if (dcb_isclient(dcb) && fail_next_client_fd) if (dcb_isclient(dcb) && fail_next_client_fd)
{ {
@ -1146,8 +1190,7 @@ dcb_write_fake_code(DCB *dcb)
dcb_fake_write_ev[dcb->fd] = 29; dcb_fake_write_ev[dcb->fd] = 29;
fail_next_client_fd = false; fail_next_client_fd = false;
} }
else if (!dcb_isclient(dcb) && else if (!dcb_isclient(dcb) && fail_next_backend_fd)
fail_next_backend_fd)
{ {
dcb_fake_write_errno[dcb->fd] = 32; dcb_fake_write_errno[dcb->fd] = 32;
dcb_fake_write_ev[dcb->fd] = 29; dcb_fake_write_ev[dcb->fd] = 29;
@ -1167,7 +1210,10 @@ dcb_write_fake_code(DCB *dcb)
static inline bool static inline bool
dcb_write_parameter_check(DCB *dcb, GWBUF *queue) dcb_write_parameter_check(DCB *dcb, GWBUF *queue)
{ {
if (queue == NULL) return false; if (queue == NULL)
{
return false;
}
if (dcb->fd <= 0) if (dcb->fd <= 0)
{ {
@ -1188,12 +1234,9 @@ dcb_write_parameter_check(DCB *dcb, GWBUF *queue)
* still be writable. * still be writable.
*/ */
if (dcb->state != DCB_STATE_ALLOC && if (dcb->state != DCB_STATE_ALLOC &&
dcb->state != DCB_STATE_POLLING && dcb->state != DCB_STATE_POLLING &&
dcb->state != DCB_STATE_LISTENING && dcb->state != DCB_STATE_LISTENING &&
dcb->state != DCB_STATE_NOPOLLING) dcb->state != DCB_STATE_NOPOLLING)
{ {
MXS_DEBUG("%lu [dcb_write] Write aborted to dcb %p because " MXS_DEBUG("%lu [dcb_write] Write aborted to dcb %p because "
"it is in state %s", "it is in state %s",
@ -1321,7 +1364,7 @@ dcb_log_write_failure(DCB *dcb, GWBUF *queue, int eno)
* @param below_water A boolean * @param below_water A boolean
*/ */
static inline void static inline void
dcb_write_tidy_up (DCB *dcb, bool below_water) dcb_write_tidy_up(DCB *dcb, bool below_water)
{ {
spinlock_release(&dcb->writeqlock); spinlock_release(&dcb->writeqlock);
@ -1349,7 +1392,10 @@ dcb_write_SSL(DCB *dcb, GWBUF *queue)
below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0; below_water = (dcb->high_water && dcb->writeqlen < dcb->high_water) ? 1 : 0;
// The following guarantees that queue is not NULL // The following guarantees that queue is not NULL
if (!dcb_write_parameter_check(dcb, queue)) return 0; if (!dcb_write_parameter_check(dcb, queue))
{
return 0;
}
spinlock_acquire(&dcb->writeqlock); spinlock_acquire(&dcb->writeqlock);
@ -1393,7 +1439,8 @@ dcb_write_SSL(DCB *dcb, GWBUF *queue)
} }
#endif #endif
} }
} while(w <= 0); }
while (w <= 0);
/** Remove written bytes from the queue */ /** Remove written bytes from the queue */
queue = gwbuf_consume(queue, w); queue = gwbuf_consume(queue, w);
@ -1426,7 +1473,7 @@ dcb_write_SSL(DCB *dcb, GWBUF *queue)
* @param ssl_errno The SSL error code * @param ssl_errno The SSL error code
*/ */
static void static void
dcb_write_SSL_error_report (DCB *dcb, int ret, int ssl_errno) dcb_write_SSL_error_report(DCB *dcb, int ret, int ssl_errno)
{ {
char errbuf[STRERROR_BUFLEN]; char errbuf[STRERROR_BUFLEN];
@ -1475,9 +1522,9 @@ dcb_write_SSL_error_report (DCB *dcb, int ret, int ssl_errno)
STRDCBSTATE(dcb->state), STRDCBSTATE(dcb->state),
dcb->fd, dcb->fd,
ssl_errno); ssl_errno);
if(ssl_errno == SSL_ERROR_SSL || ssl_errno == SSL_ERROR_SYSCALL) if (ssl_errno == SSL_ERROR_SSL || ssl_errno == SSL_ERROR_SYSCALL)
{ {
if(ssl_errno == SSL_ERROR_SYSCALL) if (ssl_errno == SSL_ERROR_SYSCALL)
{ {
MXS_ERROR("%d:%s", errno, strerror_r(errno, errbuf, sizeof(errbuf))); MXS_ERROR("%d:%s", errno, strerror_r(errno, errbuf, sizeof(errbuf)));
} }
@ -1486,17 +1533,19 @@ dcb_write_SSL_error_report (DCB *dcb, int ret, int ssl_errno)
char errbuf[SSL_ERRBUF_LEN]; char errbuf[SSL_ERRBUF_LEN];
ERR_error_string_n(ssl_errno,errbuf, sizeof(errbuf)); ERR_error_string_n(ssl_errno,errbuf, sizeof(errbuf));
MXS_ERROR("%d:%s", ssl_errno,errbuf); MXS_ERROR("%d:%s", ssl_errno,errbuf);
} while((ssl_errno = ERR_get_error()) != 0); }
while ((ssl_errno = ERR_get_error()) != 0);
} }
} }
else if(ret == 0) else if (ret == 0)
{ {
do do
{ {
char errbuf[SSL_ERRBUF_LEN]; char errbuf[SSL_ERRBUF_LEN];
ERR_error_string_n(ssl_errno,errbuf,sizeof(errbuf)); ERR_error_string_n(ssl_errno,errbuf,sizeof(errbuf));
MXS_ERROR("%d:%s", ssl_errno,errbuf); MXS_ERROR("%d:%s", ssl_errno,errbuf);
} while((ssl_errno = ERR_get_error()) != 0); }
while ((ssl_errno = ERR_get_error()) != 0);
} }
} }
} }
@ -1512,10 +1561,10 @@ dcb_write_SSL_error_report (DCB *dcb, int ret, int ssl_errno)
int int
dcb_drain_writeq(DCB *dcb) dcb_drain_writeq(DCB *dcb)
{ {
int n = 0; int n = 0;
int w; int w;
int saved_errno = 0; int saved_errno = 0;
int above_water; int above_water;
above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0; above_water = (dcb->low_water && dcb->writeqlen > dcb->low_water) ? 1 : 0;
@ -1579,7 +1628,9 @@ int above_water;
/* The write queue has drained, potentially need to call a callback function */ /* The write queue has drained, potentially need to call a callback function */
if (dcb->writeq == NULL) if (dcb->writeq == NULL)
{
dcb_call_callback(dcb, DCB_REASON_DRAINED); dcb_call_callback(dcb, DCB_REASON_DRAINED);
}
if (above_water && dcb->writeqlen < dcb->low_water) if (above_water && dcb->writeqlen < dcb->low_water)
{ {
@ -1628,7 +1679,7 @@ dcb_drain_writeq_SSL(DCB *dcb)
{ {
int ssl_errno = SSL_get_error(dcb->ssl,w); int ssl_errno = SSL_get_error(dcb->ssl,w);
if(ssl_errno == SSL_ERROR_WANT_WRITE || ssl_errno == SSL_ERROR_WANT_READ) if (ssl_errno == SSL_ERROR_WANT_WRITE || ssl_errno == SSL_ERROR_WANT_READ)
{ {
break; break;
} }
@ -1637,13 +1688,13 @@ dcb_drain_writeq_SSL(DCB *dcb)
{ {
case SSL_ERROR_SSL: case SSL_ERROR_SSL:
case SSL_ERROR_SYSCALL: case SSL_ERROR_SYSCALL:
while((ssl_errno = ERR_get_error()) != 0) while ((ssl_errno = ERR_get_error()) != 0)
{ {
char errbuf[SSL_ERRBUF_LEN]; char errbuf[SSL_ERRBUF_LEN];
ERR_error_string_n(ssl_errno,errbuf,sizeof(errbuf)); ERR_error_string_n(ssl_errno,errbuf,sizeof(errbuf));
MXS_ERROR("%s", errbuf); MXS_ERROR("%s", errbuf);
} }
if(errno != 0) if (errno != 0)
{ {
char errbuf[STRERROR_BUFLEN]; char errbuf[STRERROR_BUFLEN];
MXS_ERROR("%d:%s", errno, strerror_r(errno, errbuf, sizeof(errbuf))); MXS_ERROR("%d:%s", errno, strerror_r(errno, errbuf, sizeof(errbuf)));
@ -1658,8 +1709,6 @@ dcb_drain_writeq_SSL(DCB *dcb)
break; break;
} }
break; break;
} }
/* /*
* Pull the number of bytes we have written from * Pull the number of bytes we have written from
@ -1674,7 +1723,9 @@ dcb_drain_writeq_SSL(DCB *dcb)
/* The write queue has drained, potentially need to call a callback function */ /* The write queue has drained, potentially need to call a callback function */
if (dcb->writeq == NULL) if (dcb->writeq == NULL)
{
dcb_call_callback(dcb, DCB_REASON_DRAINED); dcb_call_callback(dcb, DCB_REASON_DRAINED);
}
if (above_water && dcb->writeqlen < dcb->low_water) if (above_water && dcb->writeqlen < dcb->low_water)
{ {
@ -1846,13 +1897,21 @@ printDCB(DCB *dcb)
printf("DCB: %p\n", (void *)dcb); printf("DCB: %p\n", (void *)dcb);
printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state)); printf("\tDCB state: %s\n", gw_dcb_state2string(dcb->state));
if (dcb->remote) if (dcb->remote)
{
printf("\tConnected to: %s\n", dcb->remote); printf("\tConnected to: %s\n", dcb->remote);
}
if (dcb->user) if (dcb->user)
{
printf("\tUsername: %s\n", dcb->user); printf("\tUsername: %s\n", dcb->user);
}
if (dcb->protoname) if (dcb->protoname)
{
printf("\tProtocol: %s\n", dcb->protoname); printf("\tProtocol: %s\n", dcb->protoname);
}
if (dcb->writeq) if (dcb->writeq)
{
printf("\tQueued write data: %d\n",gwbuf_length(dcb->writeq)); printf("\tQueued write data: %d\n",gwbuf_length(dcb->writeq));
}
char *statusname = server_status(dcb->server); char *statusname = server_status(dcb->server);
if (statusname) if (statusname)
{ {
@ -1899,7 +1958,7 @@ spin_reporter(void *dcb, char *desc, int value)
*/ */
void printAllDCBs() void printAllDCBs()
{ {
DCB *dcb; DCB *dcb;
spinlock_acquire(&dcbspin); spinlock_acquire(&dcbspin);
dcb = allDCBs; dcb = allDCBs;
@ -1924,29 +1983,43 @@ dprintOneDCB(DCB *pdcb, DCB *dcb)
dcb_printf(pdcb, "\tDCB state: %s\n", dcb_printf(pdcb, "\tDCB state: %s\n",
gw_dcb_state2string(dcb->state)); gw_dcb_state2string(dcb->state));
if (dcb->session && dcb->session->service) if (dcb->session && dcb->session->service)
{
dcb_printf(pdcb, "\tService: %s\n", dcb_printf(pdcb, "\tService: %s\n",
dcb->session->service->name); dcb->session->service->name);
}
if (dcb->remote) if (dcb->remote)
{
dcb_printf(pdcb, "\tConnected to: %s\n", dcb_printf(pdcb, "\tConnected to: %s\n",
dcb->remote); dcb->remote);
}
if (dcb->server) if (dcb->server)
{ {
if (dcb->server->name) if (dcb->server->name)
{
dcb_printf(pdcb, "\tServer name/IP: %s\n", dcb_printf(pdcb, "\tServer name/IP: %s\n",
dcb->server->name); dcb->server->name);
}
if (dcb->server->port) if (dcb->server->port)
{
dcb_printf(pdcb, "\tPort number: %d\n", dcb_printf(pdcb, "\tPort number: %d\n",
dcb->server->port); dcb->server->port);
} }
}
if (dcb->user) if (dcb->user)
{
dcb_printf(pdcb, "\tUsername: %s\n", dcb_printf(pdcb, "\tUsername: %s\n",
dcb->user); dcb->user);
}
if (dcb->protoname) if (dcb->protoname)
{
dcb_printf(pdcb, "\tProtocol: %s\n", dcb_printf(pdcb, "\tProtocol: %s\n",
dcb->protoname); dcb->protoname);
}
if (dcb->writeq) if (dcb->writeq)
{
dcb_printf(pdcb, "\tQueued write data: %d\n", dcb_printf(pdcb, "\tQueued write data: %d\n",
gwbuf_length(dcb->writeq)); gwbuf_length(dcb->writeq));
}
char *statusname = server_status(dcb->server); char *statusname = server_status(dcb->server);
if (statusname) if (statusname)
{ {
@ -1976,7 +2049,9 @@ dprintOneDCB(DCB *pdcb, DCB *dcb)
dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water); dcb_printf(pdcb, "\t\tNo. of High Water Events: %d\n", dcb->stats.n_high_water);
dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water); dcb_printf(pdcb, "\t\tNo. of Low Water Events: %d\n", dcb->stats.n_low_water);
if (dcb->flags & DCBF_CLONE) if (dcb->flags & DCBF_CLONE)
{
dcb_printf(pdcb, "\t\tDCB is a clone.\n"); dcb_printf(pdcb, "\t\tDCB is a clone.\n");
}
if (dcb->persistentstart) if (dcb->persistentstart)
{ {
char buff[20]; char buff[20];
@ -1994,7 +2069,7 @@ dprintOneDCB(DCB *pdcb, DCB *dcb)
void void
dprintAllDCBs(DCB *pdcb) dprintAllDCBs(DCB *pdcb)
{ {
DCB *dcb; DCB *dcb;
spinlock_acquire(&dcbspin); spinlock_acquire(&dcbspin);
#if SPINLOCK_PROFILE #if SPINLOCK_PROFILE
@ -2020,7 +2095,7 @@ DCB *dcb;
void void
dListDCBs(DCB *pdcb) dListDCBs(DCB *pdcb)
{ {
DCB *dcb; DCB *dcb;
spinlock_acquire(&dcbspin); spinlock_acquire(&dcbspin);
dcb = allDCBs; dcb = allDCBs;
@ -2033,7 +2108,6 @@ DCB *dcb;
{ {
dcb_printf(pdcb, " %-16p | %-26s | %-18s | %s\n", dcb_printf(pdcb, " %-16p | %-26s | %-18s | %s\n",
dcb, gw_dcb_state2string(dcb->state), dcb, gw_dcb_state2string(dcb->state),
((dcb->session && dcb->session->service) ? dcb->session->service->name : ""), ((dcb->session && dcb->session->service) ? dcb->session->service->name : ""),
(dcb->remote ? dcb->remote : "")); (dcb->remote ? dcb->remote : ""));
dcb = dcb->next; dcb = dcb->next;
@ -2050,7 +2124,7 @@ DCB *dcb;
void void
dListClients(DCB *pdcb) dListClients(DCB *pdcb)
{ {
DCB *dcb; DCB *dcb;
spinlock_acquire(&dcbspin); spinlock_acquire(&dcbspin);
dcb = allDCBs; dcb = allDCBs;
@ -2061,8 +2135,7 @@ DCB *dcb;
dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n"); dcb_printf(pdcb, "-----------------+------------------+----------------------+------------\n");
while (dcb) while (dcb)
{ {
if (dcb_isclient(dcb) if (dcb_isclient(dcb) && dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER)
&& dcb->dcb_role == DCB_ROLE_REQUEST_HANDLER)
{ {
dcb_printf(pdcb, " %-15s | %16p | %-20s | %10p\n", dcb_printf(pdcb, " %-15s | %16p | %-20s | %10p\n",
(dcb->remote ? dcb->remote : ""), (dcb->remote ? dcb->remote : ""),
@ -2089,21 +2162,33 @@ dprintDCB(DCB *pdcb, DCB *dcb)
dcb_printf(pdcb, "DCB: %p\n", (void *)dcb); dcb_printf(pdcb, "DCB: %p\n", (void *)dcb);
dcb_printf(pdcb, "\tDCB state: %s\n", gw_dcb_state2string(dcb->state)); dcb_printf(pdcb, "\tDCB state: %s\n", gw_dcb_state2string(dcb->state));
if (dcb->session && dcb->session->service) if (dcb->session && dcb->session->service)
{
dcb_printf(pdcb, "\tService: %s\n", dcb_printf(pdcb, "\tService: %s\n",
dcb->session->service->name); dcb->session->service->name);
}
if (dcb->remote) if (dcb->remote)
{
dcb_printf(pdcb, "\tConnected to: %s\n", dcb->remote); dcb_printf(pdcb, "\tConnected to: %s\n", dcb->remote);
}
if (dcb->user) if (dcb->user)
{
dcb_printf(pdcb, "\tUsername: %s\n", dcb_printf(pdcb, "\tUsername: %s\n",
dcb->user); dcb->user);
}
if (dcb->protoname) if (dcb->protoname)
{
dcb_printf(pdcb, "\tProtocol: %s\n", dcb_printf(pdcb, "\tProtocol: %s\n",
dcb->protoname); dcb->protoname);
}
dcb_printf(pdcb, "\tOwning Session: %p\n", dcb->session); dcb_printf(pdcb, "\tOwning Session: %p\n", dcb->session);
if (dcb->writeq) if (dcb->writeq)
{
dcb_printf(pdcb, "\tQueued write data: %d\n", gwbuf_length(dcb->writeq)); dcb_printf(pdcb, "\tQueued write data: %d\n", gwbuf_length(dcb->writeq));
}
if (dcb->delayq) if (dcb->delayq)
{
dcb_printf(pdcb, "\tDelayed write data: %d\n", gwbuf_length(dcb->delayq)); dcb_printf(pdcb, "\tDelayed write data: %d\n", gwbuf_length(dcb->delayq));
}
char *statusname = server_status(dcb->server); char *statusname = server_status(dcb->server);
if (statusname) if (statusname)
{ {
@ -2133,10 +2218,11 @@ dprintDCB(DCB *pdcb, DCB *dcb)
{ {
dcb_printf(pdcb, "\t\tPending events in the queue: %x %s\n", dcb_printf(pdcb, "\t\tPending events in the queue: %x %s\n",
dcb->evq.pending_events, dcb->evq.processing ? "(processing)" : ""); dcb->evq.pending_events, dcb->evq.processing ? "(processing)" : "");
} }
if (dcb->flags & DCBF_CLONE) if (dcb->flags & DCBF_CLONE)
{
dcb_printf(pdcb, "\t\tDCB is a clone.\n"); dcb_printf(pdcb, "\t\tDCB is a clone.\n");
}
#if SPINLOCK_PROFILE #if SPINLOCK_PROFILE
dcb_printf(pdcb, "\tInitlock Statistics:\n"); dcb_printf(pdcb, "\tInitlock Statistics:\n");
spinlock_stats(&dcb->dcb_initlock, spin_reporter, pdcb); spinlock_stats(&dcb->dcb_initlock, spin_reporter, pdcb);
@ -2169,7 +2255,7 @@ dprintDCB(DCB *pdcb, DCB *dcb)
* *
*/ */
const char * const char *
gw_dcb_state2string (int state) gw_dcb_state2string(int state)
{ {
switch(state) { switch(state) {
case DCB_STATE_ALLOC: case DCB_STATE_ALLOC:
@ -2202,11 +2288,13 @@ gw_dcb_state2string (int state)
void void
dcb_printf(DCB *dcb, const char *fmt, ...) dcb_printf(DCB *dcb, const char *fmt, ...)
{ {
GWBUF *buf; GWBUF *buf;
va_list args; va_list args;
if ((buf = gwbuf_alloc(10240)) == NULL) if ((buf = gwbuf_alloc(10240)) == NULL)
{
return; return;
}
va_start(args, fmt); va_start(args, fmt);
vsnprintf(GWBUF_DATA(buf), 10240, fmt, args); vsnprintf(GWBUF_DATA(buf), 10240, fmt, args);
va_end(args); va_end(args);
@ -2309,11 +2397,13 @@ gw_write(DCB *dcb, const void *buf, size_t nbytes)
ss_dassert(dcb_fake_write_ev[fd] != 0); ss_dassert(dcb_fake_write_ev[fd] != 0);
w = write(fd, buf, nbytes/2); /*< leave peer to read missing bytes */ w = write(fd, buf, nbytes/2); /*< leave peer to read missing bytes */
if (w > 0) { if (w > 0)
{
w = -1; w = -1;
errno = dcb_fake_write_errno[fd]; errno = dcb_fake_write_errno[fd];
} }
} else if (fd > 0) }
else if (fd > 0)
{ {
w = write(fd, buf, nbytes); w = write(fd, buf, nbytes);
} }
@ -2390,10 +2480,13 @@ gw_write(DCB *dcb, const void *buf, size_t nbytes)
* @return Non-zero (true) if the callback was added * @return Non-zero (true) if the callback was added
*/ */
int int
dcb_add_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata) dcb_add_callback(DCB *dcb,
DCB_REASON reason,
int (*callback)(struct dcb *, DCB_REASON, void *),
void *userdata)
{ {
DCB_CALLBACK *cb, *ptr; DCB_CALLBACK *cb, *ptr;
int rval = 1; int rval = 1;
if ((ptr = (DCB_CALLBACK *)malloc(sizeof(DCB_CALLBACK))) == NULL) if ((ptr = (DCB_CALLBACK *)malloc(sizeof(DCB_CALLBACK))) == NULL)
{ {
@ -2446,10 +2539,13 @@ int rval = 1;
* @return Non-zero (true) if the callback was removed * @return Non-zero (true) if the callback was removed
*/ */
int int
dcb_remove_callback(DCB *dcb, DCB_REASON reason, int (*callback)(struct dcb *, DCB_REASON, void *), void *userdata) dcb_remove_callback(DCB *dcb,
DCB_REASON reason,
int (*callback)(struct dcb *, DCB_REASON, void *),
void *userdata)
{ {
DCB_CALLBACK *cb, *pcb = NULL; DCB_CALLBACK *cb, *pcb = NULL;
int rval = 0; int rval = 0;
spinlock_acquire(&dcb->cb_lock); spinlock_acquire(&dcb->cb_lock);
cb = dcb->callbacks; cb = dcb->callbacks;
@ -2461,13 +2557,18 @@ int rval = 0;
{ {
while (cb) while (cb)
{ {
if (cb->reason == reason && cb->cb == callback if (cb->reason == reason &&
&& cb->userdata == userdata) cb->cb == callback &&
cb->userdata == userdata)
{ {
if (pcb != NULL) if (pcb != NULL)
{
pcb->next = cb->next; pcb->next = cb->next;
}
else else
{
dcb->callbacks = cb->next; dcb->callbacks = cb->next;
}
spinlock_release(&dcb->cb_lock); spinlock_release(&dcb->cb_lock);
free(cb); free(cb);
rval = 1; rval = 1;
@ -2478,7 +2579,9 @@ int rval = 0;
} }
} }
if (!rval) if (!rval)
{
spinlock_release(&dcb->cb_lock); spinlock_release(&dcb->cb_lock);
}
return rval; return rval;
} }
@ -2491,7 +2594,7 @@ int rval = 0;
static void static void
dcb_call_callback(DCB *dcb, DCB_REASON reason) dcb_call_callback(DCB *dcb, DCB_REASON reason)
{ {
DCB_CALLBACK *cb, *nextcb; DCB_CALLBACK *cb, *nextcb;
spinlock_acquire(&dcb->cb_lock); spinlock_acquire(&dcb->cb_lock);
cb = dcb->callbacks; cb = dcb->callbacks;
@ -2511,8 +2614,10 @@ DCB_CALLBACK *cb, *nextcb;
cb = nextcb; cb = nextcb;
} }
else else
{
cb = cb->next; cb = cb->next;
} }
}
spinlock_release(&dcb->cb_lock); spinlock_release(&dcb->cb_lock);
} }
@ -2525,7 +2630,7 @@ DCB_CALLBACK *cb, *nextcb;
int int
dcb_isvalid(DCB *dcb) dcb_isvalid(DCB *dcb)
{ {
int rval = 0; int rval = 0;
if (dcb) if (dcb)
{ {
@ -2579,7 +2684,7 @@ dcb_isvalid_nolock(DCB *dcb)
* @return The pointer to the next DCB or NULL if this is the last * @return The pointer to the next DCB or NULL if this is the last
*/ */
static DCB * static DCB *
dcb_get_next (DCB *dcb) dcb_get_next(DCB *dcb)
{ {
spinlock_acquire(&dcbspin); spinlock_acquire(&dcbspin);
if (dcb) { if (dcb) {
@ -2789,8 +2894,8 @@ dcb_persistent_clean_count(DCB *dcb, bool cleanall)
int int
dcb_count_by_usage(DCB_USAGE usage) dcb_count_by_usage(DCB_USAGE usage)
{ {
int rval = 0; int rval = 0;
DCB *ptr; DCB *ptr;
spinlock_acquire(&dcbspin); spinlock_acquire(&dcbspin);
ptr = allDCBs; ptr = allDCBs;
@ -2800,24 +2905,34 @@ DCB *ptr;
{ {
case DCB_USAGE_CLIENT: case DCB_USAGE_CLIENT:
if (dcb_isclient(ptr)) if (dcb_isclient(ptr))
{
rval++; rval++;
}
break; break;
case DCB_USAGE_LISTENER: case DCB_USAGE_LISTENER:
if (ptr->state == DCB_STATE_LISTENING) if (ptr->state == DCB_STATE_LISTENING)
{
rval++; rval++;
}
break; break;
case DCB_USAGE_BACKEND: case DCB_USAGE_BACKEND:
if (dcb_isclient(ptr) == 0 if (dcb_isclient(ptr) == 0
&& ptr->dcb_role == DCB_ROLE_REQUEST_HANDLER) && ptr->dcb_role == DCB_ROLE_REQUEST_HANDLER)
{
rval++; rval++;
}
break; break;
case DCB_USAGE_INTERNAL: case DCB_USAGE_INTERNAL:
if (ptr->dcb_role == DCB_ROLE_REQUEST_HANDLER) if (ptr->dcb_role == DCB_ROLE_REQUEST_HANDLER)
{
rval++; rval++;
}
break; break;
case DCB_USAGE_ZOMBIE: case DCB_USAGE_ZOMBIE:
if (DCB_ISZOMBIE(ptr)) if (DCB_ISZOMBIE(ptr))
{
rval++; rval++;
}
break; break;
case DCB_USAGE_ALL: case DCB_USAGE_ALL:
rval++; rval++;
@ -2839,19 +2954,18 @@ DCB *ptr;
*/ */
int dcb_create_SSL(DCB* dcb) int dcb_create_SSL(DCB* dcb)
{ {
if (serviceInitSSL(dcb->service) != 0)
if(serviceInitSSL(dcb->service) != 0)
{ {
return -1; return -1;
} }
if((dcb->ssl = SSL_new(dcb->service->ctx)) == NULL) if ((dcb->ssl = SSL_new(dcb->service->ctx)) == NULL)
{ {
MXS_ERROR("Failed to initialize SSL for connection."); MXS_ERROR("Failed to initialize SSL for connection.");
return -1; return -1;
} }
if(SSL_set_fd(dcb->ssl,dcb->fd) == 0) if (SSL_set_fd(dcb->ssl,dcb->fd) == 0)
{ {
MXS_ERROR("Failed to set file descriptor for SSL connection."); MXS_ERROR("Failed to set file descriptor for SSL connection.");
return -1; return -1;
@ -2889,10 +3003,10 @@ int dcb_accept_SSL(DCB* dcb)
ssl_errnum = SSL_get_error(dcb->ssl,ssl_rval); ssl_errnum = SSL_get_error(dcb->ssl,ssl_rval);
MXS_ERROR("SSL authentication failed (SSL error %d):", ssl_errnum); MXS_ERROR("SSL authentication failed (SSL error %d):", ssl_errnum);
if(ssl_errnum == SSL_ERROR_SSL || if (ssl_errnum == SSL_ERROR_SSL ||
ssl_errnum == SSL_ERROR_SYSCALL) ssl_errnum == SSL_ERROR_SYSCALL)
{ {
while((err_errnum = ERR_get_error()) != 0) while ((err_errnum = ERR_get_error()) != 0)
{ {
ERR_error_string_n(err_errnum,errbuf,sizeof(errbuf)); ERR_error_string_n(err_errnum,errbuf,sizeof(errbuf));
MXS_ERROR("%s", errbuf); MXS_ERROR("%s", errbuf);
@ -2906,10 +3020,9 @@ int dcb_accept_SSL(DCB* dcb)
return rval; return rval;
case -1: case -1:
ssl_errnum = SSL_get_error(dcb->ssl,ssl_rval); ssl_errnum = SSL_get_error(dcb->ssl,ssl_rval);
if(ssl_errnum == SSL_ERROR_WANT_READ || ssl_errnum == SSL_ERROR_WANT_WRITE) if (ssl_errnum == SSL_ERROR_WANT_READ || ssl_errnum == SSL_ERROR_WANT_WRITE)
{ {
/** Not all of the data has been read. Go back to the poll /** Not all of the data has been read. Go back to the poll
queue and wait for more.*/ queue and wait for more.*/
@ -2924,15 +3037,15 @@ int dcb_accept_SSL(DCB* dcb)
dcb->remote, dcb->remote,
SSL_get_version(dcb->ssl), SSL_get_version(dcb->ssl),
ssl_errnum); ssl_errnum);
if(ssl_errnum == SSL_ERROR_SSL || if (ssl_errnum == SSL_ERROR_SSL ||
ssl_errnum == SSL_ERROR_SYSCALL) ssl_errnum == SSL_ERROR_SYSCALL)
{ {
while((err_errnum = ERR_get_error()) != 0) while ((err_errnum = ERR_get_error()) != 0)
{ {
ERR_error_string_n(err_errnum,errbuf,sizeof(errbuf)); ERR_error_string_n(err_errnum,errbuf,sizeof(errbuf));
MXS_ERROR("%s", errbuf); MXS_ERROR("%s", errbuf);
} }
if(errno) if (errno)
{ {
MXS_ERROR("SSL authentication failed due to system" MXS_ERROR("SSL authentication failed due to system"
" error %d: %s", errno, strerror_r(errno, errbuf, sizeof(errbuf))); " error %d: %s", errno, strerror_r(errno, errbuf, sizeof(errbuf)));
@ -2951,7 +3064,8 @@ int dcb_accept_SSL(DCB* dcb)
#ifdef SS_DEBUG #ifdef SS_DEBUG
MXS_DEBUG("[dcb_accept_SSL] fd %d: %d bytes, %d pending", fd, b, pending); MXS_DEBUG("[dcb_accept_SSL] fd %d: %d bytes, %d pending", fd, b, pending);
#endif #endif
}while((b > 0 || pending > 0) && rval != -1); }
while ((b > 0 || pending > 0) && rval != -1);
return rval; return rval;
} }
@ -2990,7 +3104,7 @@ int dcb_connect_SSL(DCB* dcb)
case -1: case -1:
errnum = SSL_get_error(dcb->ssl,rval); errnum = SSL_get_error(dcb->ssl,rval);
if(errnum == SSL_ERROR_WANT_READ || errnum == SSL_ERROR_WANT_WRITE) if (errnum == SSL_ERROR_WANT_READ || errnum == SSL_ERROR_WANT_WRITE)
{ {
/** Not all of the data has been read. Go back to the poll /** Not all of the data has been read. Go back to the poll
queue and wait for more.*/ queue and wait for more.*/
@ -3030,20 +3144,28 @@ int dcb_connect_SSL(DCB* dcb)
char * char *
dcb_role_name(DCB *dcb) dcb_role_name(DCB *dcb)
{ {
char *name = NULL; char *name = NULL;
if (NULL != (name = (char *)malloc(64))) if (NULL != (name = (char *)malloc(64)))
{ {
name[0] = 0; name[0] = 0;
if (DCB_ROLE_SERVICE_LISTENER == dcb->dcb_role) if (DCB_ROLE_SERVICE_LISTENER == dcb->dcb_role)
{
strcat(name, "Service Listener"); strcat(name, "Service Listener");
}
else if (DCB_ROLE_REQUEST_HANDLER == dcb->dcb_role) else if (DCB_ROLE_REQUEST_HANDLER == dcb->dcb_role)
{
strcat(name, "Request Handler"); strcat(name, "Request Handler");
}
else if (DCB_ROLE_INTERNAL == dcb->dcb_role) else if (DCB_ROLE_INTERNAL == dcb->dcb_role)
{
strcat(name, "Internal"); strcat(name, "Internal");
}
else else
{
strcat(name, "Unknown"); strcat(name, "Unknown");
} }
}
return name; return name;
} }

View File

@ -65,7 +65,7 @@ struct service;
struct dcb; struct dcb;
/** /**
* @verbatim * @verbatim
* The operations that can be performed on the descriptor * The operations that can be performed on the descriptor
* *
@ -88,7 +88,8 @@ struct dcb;
* *
* @see load_module * @see load_module
*/ */
typedef struct gw_protocol { typedef struct gw_protocol
{
int (*read)(struct dcb *); int (*read)(struct dcb *);
int (*write)(struct dcb *, GWBUF *); int (*write)(struct dcb *, GWBUF *);
int (*write_ready)(struct dcb *); int (*write_ready)(struct dcb *);
@ -115,7 +116,8 @@ typedef struct gw_protocol {
* inserted Insertion time for logging purposes * inserted Insertion time for logging purposes
* started Time that the processign started * started Time that the processign started
*/ */
typedef struct { typedef struct
{
struct dcb *next; struct dcb *next;
struct dcb *prev; struct dcb *prev;
uint32_t pending_events; uint32_t pending_events;
@ -138,7 +140,8 @@ typedef struct {
/** /**
* The statistics gathered on a descriptor control block * The statistics gathered on a descriptor control block
*/ */
typedef struct dcbstats { typedef struct dcbstats
{
int n_reads; /*< Number of reads on this descriptor */ int n_reads; /*< Number of reads on this descriptor */
int n_writes; /*< Number of writes on this descriptor */ int n_writes; /*< Number of writes on this descriptor */
int n_accepts; /*< Number of accepts on this descriptor */ int n_accepts; /*< Number of accepts on this descriptor */
@ -165,13 +168,15 @@ typedef struct dcbstats {
* will clear the bit value that corresponds to the calling thread. Once the bitmask * will clear the bit value that corresponds to the calling thread. Once the bitmask
* is completely cleared the DCB can finally be freed and removed from the zombie list. * is completely cleared the DCB can finally be freed and removed from the zombie list.
*/ */
typedef struct { typedef struct
{
GWBITMASK bitmask; /*< The bitmask of threads */ GWBITMASK bitmask; /*< The bitmask of threads */
struct dcb *next; /*< Next pointer for the zombie list */ struct dcb *next; /*< Next pointer for the zombie list */
} DCBMM; } DCBMM;
/* DCB states */ /* DCB states */
typedef enum { typedef enum
{
DCB_STATE_UNDEFINED, /*< State variable with no state */ DCB_STATE_UNDEFINED, /*< State variable with no state */
DCB_STATE_ALLOC, /*< Memory allocated but not populated */ DCB_STATE_ALLOC, /*< Memory allocated but not populated */
DCB_STATE_POLLING, /*< Waiting in the poll loop */ DCB_STATE_POLLING, /*< Waiting in the poll loop */
@ -181,7 +186,8 @@ typedef enum {
DCB_STATE_ZOMBIE, /*< DCB is no longer active, waiting to free it */ DCB_STATE_ZOMBIE, /*< DCB is no longer active, waiting to free it */
} dcb_state_t; } dcb_state_t;
typedef enum { typedef enum
{
DCB_ROLE_SERVICE_LISTENER, /*< Receives initial connect requests from clients */ DCB_ROLE_SERVICE_LISTENER, /*< Receives initial connect requests from clients */
DCB_ROLE_REQUEST_HANDLER, /*< Serves dedicated client */ DCB_ROLE_REQUEST_HANDLER, /*< Serves dedicated client */
DCB_ROLE_INTERNAL /*< Internal DCB not connected to the outside */ DCB_ROLE_INTERNAL /*< Internal DCB not connected to the outside */
@ -190,7 +196,8 @@ typedef enum {
/** /**
* Callback reasons for the DCB callback mechanism. * Callback reasons for the DCB callback mechanism.
*/ */
typedef enum { typedef enum
{
DCB_REASON_CLOSE, /*< The DCB is closing */ DCB_REASON_CLOSE, /*< The DCB is closing */
DCB_REASON_DRAINED, /*< The write delay queue has drained */ DCB_REASON_DRAINED, /*< The write delay queue has drained */
DCB_REASON_HIGH_WATER, /*< Cross high water mark */ DCB_REASON_HIGH_WATER, /*< Cross high water mark */
@ -203,12 +210,12 @@ typedef enum {
/** /**
* Callback structure - used to track callbacks registered on a DCB * Callback structure - used to track callbacks registered on a DCB
*/ */
typedef struct dcb_callback { typedef struct dcb_callback
{
DCB_REASON reason; /*< The reason for the callback */ DCB_REASON reason; /*< The reason for the callback */
int (*cb)(struct dcb *dcb, DCB_REASON reason, void *userdata); int (*cb)(struct dcb *dcb, DCB_REASON reason, void *userdata);
void *userdata; /*< User data to be sent in the callback */ void *userdata; /*< User data to be sent in the callback */
struct dcb_callback struct dcb_callback *next; /*< Next callback for this DCB */
*next; /*< Next callback for this DCB */
} DCB_CALLBACK; } DCB_CALLBACK;
@ -223,7 +230,8 @@ typedef struct dcb_callback {
* It is important to hold the state information here such that any thread within the * It is important to hold the state information here such that any thread within the
* gateway may be selected to execute the required actions when a network event occurs. * gateway may be selected to execute the required actions when a network event occurs.
*/ */
typedef struct dcb { typedef struct dcb
{
skygw_chk_t dcb_chk_top; skygw_chk_t dcb_chk_top;
bool dcb_errhandle_called; /*< this can be called only once */ bool dcb_errhandle_called; /*< this can be called only once */
bool dcb_is_zombie; /**< Whether the DCB is in the zombie list */ bool dcb_is_zombie; /**< Whether the DCB is in the zombie list */
@ -278,7 +286,8 @@ typedef struct dcb {
/** /**
* The DCB usage filer used for returning DCB's in use for a certain reason * The DCB usage filer used for returning DCB's in use for a certain reason
*/ */
typedef enum { typedef enum
{
DCB_USAGE_CLIENT, DCB_USAGE_CLIENT,
DCB_USAGE_LISTENER, DCB_USAGE_LISTENER,
DCB_USAGE_BACKEND, DCB_USAGE_BACKEND,
@ -330,10 +339,8 @@ const char *gw_dcb_state2string(int); /* DCB state to string */
void dcb_printf(DCB *, const char *, ...); /* DCB version of printf */ void dcb_printf(DCB *, const char *, ...); /* DCB version of printf */
int dcb_isclient(DCB *); /* the DCB is the client of the session */ int dcb_isclient(DCB *); /* the DCB is the client of the session */
void dcb_hashtable_stats(DCB *, void *); /**< Print statisitics */ void dcb_hashtable_stats(DCB *, void *); /**< Print statisitics */
int dcb_add_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *), int dcb_add_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *), void *);
void *); int dcb_remove_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *), void *);
int dcb_remove_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *),
void *);
int dcb_isvalid(DCB *); /* Check the DCB is in the linked list */ int dcb_isvalid(DCB *); /* Check the DCB is in the linked list */
int dcb_count_by_usage(DCB_USAGE); /* Return counts of DCBs */ int dcb_count_by_usage(DCB_USAGE); /* Return counts of DCBs */
int dcb_persistent_clean_count(DCB *, bool); /* Clean persistent and return count */ int dcb_persistent_clean_count(DCB *, bool); /* Clean persistent and return count */