Add configuration capabilities for persistent connections.
This commit is contained in:
@ -619,6 +619,7 @@ hashtable_memory_fns(monitorhash,strdup,NULL,free,NULL);
|
|||||||
}
|
}
|
||||||
else if (!strcmp(type, "server"))
|
else if (!strcmp(type, "server"))
|
||||||
{
|
{
|
||||||
|
SERVER *server = obj->element;
|
||||||
char *address;
|
char *address;
|
||||||
char *port;
|
char *port;
|
||||||
char *protocol;
|
char *protocol;
|
||||||
@ -631,6 +632,8 @@ hashtable_memory_fns(monitorhash,strdup,NULL,free,NULL);
|
|||||||
monuser = config_get_value(obj->parameters,
|
monuser = config_get_value(obj->parameters,
|
||||||
"monitoruser");
|
"monitoruser");
|
||||||
monpw = config_get_value(obj->parameters, "monitorpw");
|
monpw = config_get_value(obj->parameters, "monitorpw");
|
||||||
|
server->persistpoolmax = config_get_value(obj->parameters, "persistpoolmax");
|
||||||
|
server->persistmaxtime = config_get_value(obj->parameters, "persistmaxtime");
|
||||||
|
|
||||||
if (address && port && protocol)
|
if (address && port && protocol)
|
||||||
{
|
{
|
||||||
@ -677,6 +680,10 @@ hashtable_memory_fns(monitorhash,strdup,NULL,free,NULL);
|
|||||||
"monitorpw")
|
"monitorpw")
|
||||||
&& strcmp(params->name,
|
&& strcmp(params->name,
|
||||||
"type")
|
"type")
|
||||||
|
&& strcmp(params->name,
|
||||||
|
"persistpoolmax")
|
||||||
|
&& strcmp(params->name,
|
||||||
|
"persistmaxtime")
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
serverAddParameter(obj->element,
|
serverAddParameter(obj->element,
|
||||||
|
@ -56,6 +56,7 @@
|
|||||||
#include <stdarg.h>
|
#include <stdarg.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
#include <time.h>
|
||||||
#include <dcb.h>
|
#include <dcb.h>
|
||||||
#include <spinlock.h>
|
#include <spinlock.h>
|
||||||
#include <server.h>
|
#include <server.h>
|
||||||
@ -1297,7 +1298,9 @@ dcb_close(DCB *dcb)
|
|||||||
{
|
{
|
||||||
char *user;
|
char *user;
|
||||||
user = session_getUser(dcb->session);
|
user = session_getUser(dcb->session);
|
||||||
if (user && dcb->server)
|
if (user && dcb->server
|
||||||
|
&& dcb->server->persistpoolmax
|
||||||
|
&& dcb_persistent_clean_count(dcb) < dcb->server->persistpoolmax)
|
||||||
{
|
{
|
||||||
dcb->user = strdup(user);
|
dcb->user = strdup(user);
|
||||||
spinlock_acquire(&dcb->server->persistlock);
|
spinlock_acquire(&dcb->server->persistlock);
|
||||||
@ -2311,6 +2314,46 @@ dcb_null_auth(DCB *dcb, SERVER *server, SESSION *session, GWBUF *buf)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check persistent pool for expiry or excess size and count
|
||||||
|
*
|
||||||
|
* @param dcb The DCB being closed.
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
dcb_persistent_clean_count(DCB *dcb)
|
||||||
|
{
|
||||||
|
int count = 0;
|
||||||
|
if (dcb)
|
||||||
|
{
|
||||||
|
SERVER *server = dcb->server;
|
||||||
|
DCB *previousdcb = NULL;
|
||||||
|
DCB *persistentdcb = server->persistent;
|
||||||
|
|
||||||
|
while (persistentdcb) {
|
||||||
|
if (count >= server->persistpoolmax || (persistentdcb->last_read + server->persistmaxtime) < time(NULL))
|
||||||
|
{
|
||||||
|
if (previousdcb) {
|
||||||
|
previousdcb->nextpersistent = persistentdcb->nextpersistent;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
dcb->nextpersistent = persistentdcb->nextpersistent;
|
||||||
|
}
|
||||||
|
/** Call possible callback for this DCB in case of close */
|
||||||
|
dcb_call_callback(persistentdcb, DCB_REASON_CLOSE);
|
||||||
|
dcb_add_to_zombieslist(persistentdcb);
|
||||||
|
atomic_add(&server->stats.n_persistent, -1);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
persistentdcb = persistentdcb->nextpersistent;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return DCB counts optionally filtered by usage
|
* Return DCB counts optionally filtered by usage
|
||||||
*
|
*
|
||||||
|
@ -140,14 +140,13 @@ SERVER *ptr;
|
|||||||
DCB *
|
DCB *
|
||||||
server_get_persistent(SERVER *server, char *user, const char *protocol)
|
server_get_persistent(SERVER *server, char *user, const char *protocol)
|
||||||
{
|
{
|
||||||
DCB *dcb, *previous;
|
DCB *dcb, *previous = NULL;
|
||||||
int rc;
|
|
||||||
|
|
||||||
|
if (server->persistent && dcb_persistent_clean_count(server->persistent) && server->persistent)
|
||||||
|
{
|
||||||
spinlock_acquire(&server->persistlock);
|
spinlock_acquire(&server->persistlock);
|
||||||
dcb = server->persistent;
|
dcb = server->persistent;
|
||||||
previous = NULL;
|
|
||||||
while (dcb) {
|
while (dcb) {
|
||||||
/* Test for expired, free and remove from list if it is */
|
|
||||||
if (dcb->user && dcb->protoname && 0 == strcmp(dcb->user, user) && 0 == strcmp(dcb->protoname, protocol))
|
if (dcb->user && dcb->protoname && 0 == strcmp(dcb->user, user) && 0 == strcmp(dcb->protoname, protocol))
|
||||||
{
|
{
|
||||||
if (NULL == previous)
|
if (NULL == previous)
|
||||||
@ -168,11 +167,8 @@ server_get_persistent(SERVER *server, char *user, const char *protocol)
|
|||||||
previous = dcb;
|
previous = dcb;
|
||||||
dcb = dcb->nextpersistent;
|
dcb = dcb->nextpersistent;
|
||||||
}
|
}
|
||||||
if (NULL != server->persistent)
|
|
||||||
{
|
|
||||||
/* Change user, remove DCB from list, release spinlock, return dcb */
|
|
||||||
}
|
|
||||||
spinlock_release(&server->persistlock);
|
spinlock_release(&server->persistlock);
|
||||||
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -716,7 +712,7 @@ SERVER_PARAM *param;
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retreive a parameter value from a server
|
* Retrieve a parameter value from a server
|
||||||
*
|
*
|
||||||
* @param server The server we are looking for a parameter of
|
* @param server The server we are looking for a parameter of
|
||||||
* @param name The name of the parameter we require
|
* @param name The name of the parameter we require
|
||||||
|
@ -335,6 +335,7 @@ int dcb_remove_callback(DCB *, DCB_REASON, int (*)(struct dcb *, DCB_REASON, vo
|
|||||||
void *);
|
void *);
|
||||||
int dcb_isvalid(DCB *); /* Check the DCB is in the linked list */
|
int dcb_isvalid(DCB *); /* Check the DCB is in the linked list */
|
||||||
int dcb_count_by_usage(DCB_USAGE); /* Return counts of DCBs */
|
int dcb_count_by_usage(DCB_USAGE); /* Return counts of DCBs */
|
||||||
|
int dcb_persistent_clean_count(DCB *); /* Clean persistent and return count */
|
||||||
|
|
||||||
bool dcb_set_state(DCB* dcb, dcb_state_t new_state, dcb_state_t* old_state);
|
bool dcb_set_state(DCB* dcb, dcb_state_t new_state, dcb_state_t* old_state);
|
||||||
void dcb_call_foreach (struct server* server, DCB_REASON reason);
|
void dcb_call_foreach (struct server* server, DCB_REASON reason);
|
||||||
|
@ -97,6 +97,8 @@ typedef struct server {
|
|||||||
bool master_err_is_logged; /*< If node failed, this indicates whether it is logged */
|
bool master_err_is_logged; /*< If node failed, this indicates whether it is logged */
|
||||||
DCB *persistent; /**< List of unused persistent connections to the server */
|
DCB *persistent; /**< List of unused persistent connections to the server */
|
||||||
SPINLOCK persistlock; /**< Lock for adjusting the persistent connections list */
|
SPINLOCK persistlock; /**< Lock for adjusting the persistent connections list */
|
||||||
|
long persistpoolmax; /**< Maximum size of persistent connections pool */
|
||||||
|
long persistmaxtime; /**< Maximum number of seconds connection can live */
|
||||||
} SERVER;
|
} SERVER;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -847,7 +847,7 @@ int gw_read_client_event(
|
|||||||
* created. A BREF_CLOSED flag is set so dcb_close won't
|
* created. A BREF_CLOSED flag is set so dcb_close won't
|
||||||
* send redundant COM_QUIT.
|
* send redundant COM_QUIT.
|
||||||
*/
|
*/
|
||||||
SESSION_ROUTE_QUERY(session, read_buffer);
|
/* Temporarily suppressed: SESSION_ROUTE_QUERY(session, read_buffer); */
|
||||||
/**
|
/**
|
||||||
* Close router session which causes closing of backends.
|
* Close router session which causes closing of backends.
|
||||||
*/
|
*/
|
||||||
|
Reference in New Issue
Block a user