Move persistent connections to thread specific lists

Making the lists of persistent DCBs thread specific is both a bug fix and
a performance enhancement. There was a small window where a non-owner
thread could receive events for a DCB. By partitioning the DCBs into
thread specific lists, this is avoided by removing the possibility of DCBs
moving between threads.
This commit is contained in:
Markus Makela
2016-11-26 05:52:26 +02:00
parent bcbff604b0
commit 5aa791d16e
5 changed files with 44 additions and 42 deletions

View File

@ -368,7 +368,7 @@ int dcb_add_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *
int dcb_remove_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *), void *); int dcb_remove_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, void *), void *);
int dcb_isvalid(DCB *); /* Check the DCB is in the linked list */ int dcb_isvalid(DCB *); /* Check the DCB is in the linked list */
int dcb_count_by_usage(DCB_USAGE); /* Return counts of DCBs */ int dcb_count_by_usage(DCB_USAGE); /* Return counts of DCBs */
int dcb_persistent_clean_count(DCB *, bool); /* Clean persistent and return count */ int dcb_persistent_clean_count(DCB *, int, bool); /* Clean persistent and return count */
void dcb_hangup_foreach (struct server* server); void dcb_hangup_foreach (struct server* server);
size_t dcb_get_session_id(DCB* dcb); size_t dcb_get_session_id(DCB* dcb);
bool dcb_get_ses_log_info(DCB* dcb, size_t* sesid, int* enabled_logs); bool dcb_get_ses_log_info(DCB* dcb, size_t* sesid, int* enabled_logs);

View File

@ -110,8 +110,7 @@ typedef struct server
int depth; /**< Replication level in the tree */ int depth; /**< Replication level in the tree */
long slaves[MAX_NUM_SLAVES]; /**< Slaves of this node */ long slaves[MAX_NUM_SLAVES]; /**< Slaves of this node */
bool master_err_is_logged; /*< If node failed, this indicates whether it is logged */ bool master_err_is_logged; /*< If node failed, this indicates whether it is logged */
DCB *persistent; /**< List of unused persistent connections to the server */ DCB **persistent; /**< List of unused persistent connections to the server */
SPINLOCK persistlock; /**< Lock for adjusting the persistent connections list */
long persistpoolmax; /**< Maximum size of persistent connections pool */ long persistpoolmax; /**< Maximum size of persistent connections pool */
long persistmaxtime; /**< Maximum number of seconds connection can live */ long persistmaxtime; /**< Maximum number of seconds connection can live */
int persistmax; /**< Maximum pool size actually achieved since startup */ int persistmax; /**< Maximum pool size actually achieved since startup */
@ -272,7 +271,7 @@ extern void serverAddMonUser(SERVER *, char *, char *);
extern void serverAddParameter(SERVER *, char *, char *); extern void serverAddParameter(SERVER *, char *, char *);
extern char *serverGetParameter(SERVER *, char *); extern char *serverGetParameter(SERVER *, char *);
extern void server_update_credentials(SERVER *, char *, char *); extern void server_update_credentials(SERVER *, char *, char *);
extern DCB *server_get_persistent(SERVER *, char *, const char *); extern DCB *server_get_persistent(SERVER *, char *, const char *, int);
extern void server_update_address(SERVER *, char *); extern void server_update_address(SERVER *, char *);
extern void server_update_port(SERVER *, unsigned short); extern void server_update_port(SERVER *, unsigned short);
extern RESULTSET *serverGetList(); extern RESULTSET *serverGetList();

View File

@ -646,7 +646,7 @@ dcb_connect(SERVER *server, SESSION *session, const char *protocol)
{ {
MXS_DEBUG("%lu [dcb_connect] Looking for persistent connection DCB " MXS_DEBUG("%lu [dcb_connect] Looking for persistent connection DCB "
"user %s protocol %s\n", pthread_self(), user, protocol); "user %s protocol %s\n", pthread_self(), user, protocol);
dcb = server_get_persistent(server, user, protocol); dcb = server_get_persistent(server, user, protocol, session->client_dcb->thread.id);
if (dcb) if (dcb)
{ {
/** /**
@ -1695,7 +1695,7 @@ dcb_maybe_add_persistent(DCB *dcb)
&& (dcb->server->status & SERVER_RUNNING) && (dcb->server->status & SERVER_RUNNING)
&& !dcb->dcb_errhandle_called && !dcb->dcb_errhandle_called
&& !(dcb->flags & DCBF_HUNG) && !(dcb->flags & DCBF_HUNG)
&& (poolcount = dcb_persistent_clean_count(dcb, false)) < dcb->server->persistpoolmax) && (poolcount = dcb_persistent_clean_count(dcb, dcb->thread.id, false)) < dcb->server->persistpoolmax)
{ {
DCB_CALLBACK *loopcallback; DCB_CALLBACK *loopcallback;
MXS_DEBUG("%lu [dcb_maybe_add_persistent] Adding DCB to persistent pool, user %s.\n", MXS_DEBUG("%lu [dcb_maybe_add_persistent] Adding DCB to persistent pool, user %s.\n",
@ -1724,10 +1724,8 @@ dcb_maybe_add_persistent(DCB *dcb)
MXS_FREE(loopcallback); MXS_FREE(loopcallback);
} }
spinlock_release(&dcb->cb_lock); spinlock_release(&dcb->cb_lock);
spinlock_acquire(&dcb->server->persistlock); dcb->nextpersistent = dcb->server->persistent[dcb->thread.id];
dcb->nextpersistent = dcb->server->persistent; dcb->server->persistent[dcb->thread.id] = dcb;
dcb->server->persistent = dcb;
spinlock_release(&dcb->server->persistlock);
atomic_add(&dcb->server->stats.n_persistent, 1); atomic_add(&dcb->server->stats.n_persistent, 1);
atomic_add(&dcb->server->stats.n_current, -1); atomic_add(&dcb->server->stats.n_current, -1);
return true; return true;
@ -2580,7 +2578,7 @@ dcb_null_auth(DCB *dcb, SERVER *server, SESSION *session, GWBUF *buf)
* @return A count of the DCBs remaining in the pool * @return A count of the DCBs remaining in the pool
*/ */
int int
dcb_persistent_clean_count(DCB *dcb, bool cleanall) dcb_persistent_clean_count(DCB *dcb, int id, bool cleanall)
{ {
int count = 0; int count = 0;
if (dcb && dcb->server) if (dcb && dcb->server)
@ -2591,8 +2589,7 @@ dcb_persistent_clean_count(DCB *dcb, bool cleanall)
DCB *disposals = NULL; DCB *disposals = NULL;
CHK_SERVER(server); CHK_SERVER(server);
spinlock_acquire(&server->persistlock); persistentdcb = server->persistent[id];
persistentdcb = server->persistent;
while (persistentdcb) while (persistentdcb)
{ {
CHK_DCB(persistentdcb); CHK_DCB(persistentdcb);
@ -2611,7 +2608,7 @@ dcb_persistent_clean_count(DCB *dcb, bool cleanall)
} }
else else
{ {
server->persistent = nextdcb; server->persistent[id] = nextdcb;
} }
/* Add removed DCBs to disposal list for processing outside spinlock */ /* Add removed DCBs to disposal list for processing outside spinlock */
persistentdcb->nextpersistent = disposals; persistentdcb->nextpersistent = disposals;
@ -2626,7 +2623,7 @@ dcb_persistent_clean_count(DCB *dcb, bool cleanall)
persistentdcb = nextdcb; persistentdcb = nextdcb;
} }
server->persistmax = MXS_MAX(server->persistmax, count); server->persistmax = MXS_MAX(server->persistmax, count);
spinlock_release(&server->persistlock);
/** Call possible callback for this DCB in case of close */ /** Call possible callback for this DCB in case of close */
while (disposals) while (disposals)
{ {

View File

@ -445,7 +445,7 @@ poll_add_dcb(DCB *dcb)
int int
poll_remove_dcb(DCB *dcb) poll_remove_dcb(DCB *dcb)
{ {
int dcbfd, rc = -1; int dcbfd, rc = 0;
struct epoll_event ev; struct epoll_event ev;
CHK_DCB(dcb); CHK_DCB(dcb);
@ -493,12 +493,13 @@ poll_remove_dcb(DCB *dcb)
for (int i = 0; i < nthr; i++) for (int i = 0; i < nthr; i++)
{ {
int tmp_rc = epoll_ctl(epoll_fd[i], EPOLL_CTL_DEL, dcb->fd, &ev); int tmp_rc = epoll_ctl(epoll_fd[i], EPOLL_CTL_DEL, dcb->fd, &ev);
if (tmp_rc) if (tmp_rc && rc == 0)
{ {
/** Even if one of the instances failed to remove it, try /** Even if one of the instances failed to remove it, try
* to remove it from all the others */ * to remove it from all the others */
rc = tmp_rc; rc = tmp_rc;
error_num = errno; error_num = errno;
ss_dassert(error_num);
} }
} }
} }

View File

@ -87,15 +87,18 @@ SERVER* server_alloc(const char *name, const char *address, unsigned short port,
return NULL; return NULL;
} }
int nthr = config_threadcount();
SERVER *server = (SERVER *)MXS_CALLOC(1, sizeof(SERVER)); SERVER *server = (SERVER *)MXS_CALLOC(1, sizeof(SERVER));
char *my_name = MXS_STRDUP(name); char *my_name = MXS_STRDUP(name);
char *my_protocol = MXS_STRDUP(protocol); char *my_protocol = MXS_STRDUP(protocol);
char *my_authenticator = MXS_STRDUP(authenticator); char *my_authenticator = MXS_STRDUP(authenticator);
DCB **persistent = MXS_CALLOC(nthr, sizeof(*persistent));
if (!server || !my_name || !my_protocol || !my_authenticator) if (!server || !my_name || !my_protocol || !my_authenticator || !persistent)
{ {
MXS_FREE(server); MXS_FREE(server);
MXS_FREE(my_name); MXS_FREE(my_name);
MXS_FREE(persistent);
MXS_FREE(my_protocol); MXS_FREE(my_protocol);
MXS_FREE(my_authenticator); MXS_FREE(my_authenticator);
return NULL; return NULL;
@ -125,7 +128,7 @@ SERVER* server_alloc(const char *name, const char *address, unsigned short port,
server->parameters = NULL; server->parameters = NULL;
server->server_string = NULL; server->server_string = NULL;
spinlock_init(&server->lock); spinlock_init(&server->lock);
server->persistent = NULL; server->persistent = persistent;
server->persistmax = 0; server->persistmax = 0;
server->persistmaxtime = 0; server->persistmaxtime = 0;
server->persistpoolmax = 0; server->persistpoolmax = 0;
@ -133,7 +136,6 @@ SERVER* server_alloc(const char *name, const char *address, unsigned short port,
server->monpw[0] = '\0'; server->monpw[0] = '\0';
server->is_active = true; server->is_active = true;
server->charset = SERVER_DEFAULT_CHARSET; server->charset = SERVER_DEFAULT_CHARSET;
spinlock_init(&server->persistlock);
spinlock_acquire(&server_spin); spinlock_acquire(&server_spin);
server->next = allServers; server->next = allServers;
@ -183,7 +185,12 @@ server_free(SERVER *tofreeserver)
if (tofreeserver->persistent) if (tofreeserver->persistent)
{ {
dcb_persistent_clean_count(tofreeserver->persistent, true); int nthr = config_threadcount();
for (int i = 0; i < nthr; i++)
{
dcb_persistent_clean_count(tofreeserver->persistent[i], i, true);
}
} }
MXS_FREE(tofreeserver); MXS_FREE(tofreeserver);
return 1; return 1;
@ -197,17 +204,16 @@ server_free(SERVER *tofreeserver)
* @param protocol The name of the protocol needed for the connection * @param protocol The name of the protocol needed for the connection
*/ */
DCB * DCB *
server_get_persistent(SERVER *server, char *user, const char *protocol) server_get_persistent(SERVER *server, char *user, const char *protocol, int id)
{ {
DCB *dcb, *previous = NULL; DCB *dcb, *previous = NULL;
if (server->persistent if (server->persistent[id]
&& dcb_persistent_clean_count(server->persistent, false) && dcb_persistent_clean_count(server->persistent[id], id, false)
&& server->persistent && server->persistent[id] // Check after cleaning
&& (server->status & SERVER_RUNNING)) && (server->status & SERVER_RUNNING))
{ {
spinlock_acquire(&server->persistlock); dcb = server->persistent[id];
dcb = server->persistent;
while (dcb) while (dcb)
{ {
if (dcb->user if (dcb->user
@ -219,7 +225,7 @@ server_get_persistent(SERVER *server, char *user, const char *protocol)
{ {
if (NULL == previous) if (NULL == previous)
{ {
server->persistent = dcb->nextpersistent; server->persistent[id] = dcb->nextpersistent;
} }
else else
{ {
@ -227,7 +233,6 @@ server_get_persistent(SERVER *server, char *user, const char *protocol)
} }
MXS_FREE(dcb->user); MXS_FREE(dcb->user);
dcb->user = NULL; dcb->user = NULL;
spinlock_release(&server->persistlock);
atomic_add(&server->stats.n_persistent, -1); atomic_add(&server->stats.n_persistent, -1);
atomic_add(&server->stats.n_current, 1); atomic_add(&server->stats.n_current, 1);
return dcb; return dcb;
@ -249,7 +254,6 @@ server_get_persistent(SERVER *server, char *user, const char *protocol)
previous = dcb; previous = dcb;
dcb = dcb->nextpersistent; dcb = dcb->nextpersistent;
} }
spinlock_release(&server->persistlock);
} }
return NULL; return NULL;
} }
@ -549,8 +553,7 @@ dprintServer(DCB *dcb, SERVER *server)
if (server->persistpoolmax) if (server->persistpoolmax)
{ {
dcb_printf(dcb, "\tPersistent pool size: %d\n", server->stats.n_persistent); dcb_printf(dcb, "\tPersistent pool size: %d\n", server->stats.n_persistent);
dcb_printf(dcb, "\tPersistent measured pool size: %d\n", dcb_printf(dcb, "\tPersistent measured pool size: %d\n", server->stats.n_persistent);
dcb_persistent_clean_count(server->persistent, false));
dcb_printf(dcb, "\tPersistent actual size max: %d\n", server->persistmax); dcb_printf(dcb, "\tPersistent actual size max: %d\n", server->persistmax);
dcb_printf(dcb, "\tPersistent pool size limit: %ld\n", server->persistpoolmax); dcb_printf(dcb, "\tPersistent pool size limit: %ld\n", server->persistpoolmax);
dcb_printf(dcb, "\tPersistent max time (secs): %ld\n", server->persistmaxtime); dcb_printf(dcb, "\tPersistent max time (secs): %ld\n", server->persistmaxtime);
@ -595,19 +598,21 @@ void
dprintPersistentDCBs(DCB *pdcb, SERVER *server) dprintPersistentDCBs(DCB *pdcb, SERVER *server)
{ {
DCB *dcb; DCB *dcb;
int nthr = config_threadcount();
spinlock_acquire(&server->persistlock); for (int i = 0; i < nthr; i++)
#if SPINLOCK_PROFILE
dcb_printf(pdcb, "DCB List Spinlock Statistics:\n");
spinlock_stats(&server->persistlock, spin_reporter, pdcb);
#endif
dcb = server->persistent;
while (dcb)
{ {
dprintOneDCB(pdcb, dcb); #if SPINLOCK_PROFILE
dcb = dcb->nextpersistent; dcb_printf(pdcb, "DCB List Spinlock Statistics:\n");
spinlock_stats(&server->persistlock, spin_reporter, pdcb);
#endif
dcb = server->persistent[i];
while (dcb)
{
dprintOneDCB(pdcb, dcb);
dcb = dcb->nextpersistent;
}
} }
spinlock_release(&server->persistlock);
} }
/** /**