Fixed idle session processing
The current implementation of idle connection timeouts is not safe. The sessions are handled in a way which is not thread-safe and the checking is done from a non-polling thread. With this change, the checks for the session timeouts are done in one of the polling threads in a thread-safe manner only if at least one service has enabled the timing out of idle client connections.
This commit is contained in:
@ -53,7 +53,7 @@ static HKTASK *tasks = NULL;
|
|||||||
static SPINLOCK tasklock = SPINLOCK_INIT;
|
static SPINLOCK tasklock = SPINLOCK_INIT;
|
||||||
|
|
||||||
static int do_shutdown = 0;
|
static int do_shutdown = 0;
|
||||||
unsigned long hkheartbeat = 0;
|
long hkheartbeat = 0; /*< One heartbeat is 100 milliseconds */
|
||||||
|
|
||||||
static void hkthread(void *);
|
static void hkthread(void *);
|
||||||
|
|
||||||
|
@ -35,6 +35,7 @@
|
|||||||
#include <maxconfig.h>
|
#include <maxconfig.h>
|
||||||
#include <mysql.h>
|
#include <mysql.h>
|
||||||
#include <resultset.h>
|
#include <resultset.h>
|
||||||
|
#include <session.h>
|
||||||
|
|
||||||
#define PROFILE_POLL 0
|
#define PROFILE_POLL 0
|
||||||
|
|
||||||
@ -705,6 +706,11 @@ poll_waitevents(void *arg)
|
|||||||
timeout_bias = 1;
|
timeout_bias = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (check_timeouts && hkheartbeat >= next_timeout_check)
|
||||||
|
{
|
||||||
|
process_idle_sessions();
|
||||||
|
}
|
||||||
|
|
||||||
if (thread_data)
|
if (thread_data)
|
||||||
{
|
{
|
||||||
thread_data[thread_id].state = THREAD_ZPROCESSING;
|
thread_data[thread_id].state = THREAD_ZPROCESSING;
|
||||||
|
@ -98,7 +98,7 @@ static int find_type(typelib_t* tl, const char* needle, int maxlen);
|
|||||||
|
|
||||||
static void service_add_qualified_param(SERVICE* svc,
|
static void service_add_qualified_param(SERVICE* svc,
|
||||||
CONFIG_PARAMETER* param);
|
CONFIG_PARAMETER* param);
|
||||||
void service_interal_restart(void *data);
|
static void service_internal_restart(void *data);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allocate a new service for the gateway to support
|
* Allocate a new service for the gateway to support
|
||||||
@ -142,6 +142,7 @@ service_alloc(const char *servname, const char *router)
|
|||||||
service->resources = NULL;
|
service->resources = NULL;
|
||||||
service->localhost_match_wildcard_host = SERVICE_PARAM_UNINIT;
|
service->localhost_match_wildcard_host = SERVICE_PARAM_UNINIT;
|
||||||
service->retry_start = true;
|
service->retry_start = true;
|
||||||
|
service->conn_idle_timeout = SERVICE_NO_SESSION_TIMEOUT;
|
||||||
service->weightby = NULL;
|
service->weightby = NULL;
|
||||||
service->credentials.authdata = NULL;
|
service->credentials.authdata = NULL;
|
||||||
service->credentials.name = NULL;
|
service->credentials.name = NULL;
|
||||||
@ -431,11 +432,6 @@ int serviceStartAllPorts(SERVICE* service)
|
|||||||
{
|
{
|
||||||
service->state = SERVICE_STATE_STARTED;
|
service->state = SERVICE_STATE_STARTED;
|
||||||
service->stats.started = time(0);
|
service->stats.started = time(0);
|
||||||
/** Add the task that monitors session timeouts */
|
|
||||||
if (service->conn_timeout > 0)
|
|
||||||
{
|
|
||||||
hktask_add("connection_timeout", session_close_timeouts, NULL, 5);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else if (service->retry_start)
|
else if (service->retry_start)
|
||||||
{
|
{
|
||||||
@ -445,7 +441,7 @@ int serviceStartAllPorts(SERVICE* service)
|
|||||||
int retry_after = MIN(service->stats.n_failed_starts * 10, SERVICE_MAX_RETRY_INTERVAL);
|
int retry_after = MIN(service->stats.n_failed_starts * 10, SERVICE_MAX_RETRY_INTERVAL);
|
||||||
snprintf(taskname, sizeof (taskname), "%s_start_retry_%d",
|
snprintf(taskname, sizeof (taskname), "%s_start_retry_%d",
|
||||||
service->name, service->stats.n_failed_starts);
|
service->name, service->stats.n_failed_starts);
|
||||||
hktask_oneshot(taskname, service_interal_restart,
|
hktask_oneshot(taskname, service_internal_restart,
|
||||||
(void*) service, retry_after);
|
(void*) service, retry_after);
|
||||||
MXS_NOTICE("Failed to start service %s, retrying in %d seconds.",
|
MXS_NOTICE("Failed to start service %s, retrying in %d seconds.",
|
||||||
service->name, retry_after);
|
service->name, retry_after);
|
||||||
@ -1117,7 +1113,13 @@ serviceSetTimeout(SERVICE *service, int val)
|
|||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
service->conn_timeout = val;
|
|
||||||
|
/** Enable the session timeout checks if and only if at least one service is
|
||||||
|
* configured with a idle timeout. */
|
||||||
|
if ((service->conn_idle_timeout = val))
|
||||||
|
{
|
||||||
|
enable_session_timeouts();
|
||||||
|
}
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
@ -2241,7 +2243,7 @@ int serviceInitSSL(SERVICE* service)
|
|||||||
* Function called by the housekeeper thread to retry starting of a service
|
* Function called by the housekeeper thread to retry starting of a service
|
||||||
* @param data Service to restart
|
* @param data Service to restart
|
||||||
*/
|
*/
|
||||||
void service_interal_restart(void *data)
|
static void service_internal_restart(void *data)
|
||||||
{
|
{
|
||||||
SERVICE* service = (SERVICE*)data;
|
SERVICE* service = (SERVICE*)data;
|
||||||
serviceStartAllPorts(service);
|
serviceStartAllPorts(service);
|
||||||
|
@ -54,6 +54,14 @@ static SESSION *allSessions = NULL;
|
|||||||
|
|
||||||
static struct session session_dummy_struct;
|
static struct session session_dummy_struct;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* These two are declared in session.h
|
||||||
|
*/
|
||||||
|
bool check_timeouts = false;
|
||||||
|
long next_timeout_check = 0;
|
||||||
|
|
||||||
|
static SPINLOCK timeout_lock = SPINLOCK_INIT;
|
||||||
|
|
||||||
static int session_setup_filters(SESSION *session);
|
static int session_setup_filters(SESSION *session);
|
||||||
static void session_simple_free(SESSION *session, DCB *dcb);
|
static void session_simple_free(SESSION *session, DCB *dcb);
|
||||||
|
|
||||||
@ -961,35 +969,49 @@ SESSION *get_all_sessions()
|
|||||||
return allSessions;
|
return allSessions;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enable the timing out of idle connections.
|
||||||
|
*
|
||||||
|
* This will prevent unnecessary acquisitions of the session spinlock if no
|
||||||
|
* service is configured with a session idle timeout.
|
||||||
|
*/
|
||||||
|
void enable_session_timeouts()
|
||||||
|
{
|
||||||
|
check_timeouts = true;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Close sessions that have been idle for too long.
|
* Close sessions that have been idle for too long.
|
||||||
*
|
*
|
||||||
* If the time since a session last sent data is grater than the set value in the
|
* If the time since a session last sent data is greater than the set value in the
|
||||||
* service, it is disconnected. The default value for the timeout for a service is 0.
|
* service, it is disconnected. The connection timeout is disabled by default.
|
||||||
* This means that connections are never timed out.
|
|
||||||
* @param data NULL, this is only here to satisfy the housekeeper function requirements.
|
|
||||||
*/
|
*/
|
||||||
void session_close_timeouts(void* data)
|
void process_idle_sessions()
|
||||||
{
|
{
|
||||||
SESSION* ses;
|
if (spinlock_acquire_nowait(&timeout_lock))
|
||||||
|
{
|
||||||
|
if (hkheartbeat >= next_timeout_check)
|
||||||
|
{
|
||||||
|
/** Because the resolution of the timeout is one second, we only need to
|
||||||
|
* check for it once per second. One heartbeat is 100 milliseconds. */
|
||||||
|
next_timeout_check = hkheartbeat + 10;
|
||||||
spinlock_acquire(&session_spin);
|
spinlock_acquire(&session_spin);
|
||||||
ses = get_all_sessions();
|
SESSION *ses = get_all_sessions();
|
||||||
spinlock_release(&session_spin);
|
|
||||||
|
|
||||||
while (ses)
|
while (ses)
|
||||||
{
|
{
|
||||||
if (ses->client && ses->client->state == DCB_STATE_POLLING &&
|
if (ses->service && ses->client && ses->client->state == DCB_STATE_POLLING &&
|
||||||
ses->service->conn_timeout > 0 &&
|
hkheartbeat - ses->client->last_read > ses->service->conn_idle_timeout * 10)
|
||||||
hkheartbeat - ses->client->last_read > ses->service->conn_timeout * 10)
|
|
||||||
{
|
{
|
||||||
dcb_close(ses->client);
|
dcb_close(ses->client);
|
||||||
}
|
}
|
||||||
|
|
||||||
spinlock_acquire(&session_spin);
|
|
||||||
ses = ses->next;
|
ses = ses->next;
|
||||||
|
}
|
||||||
spinlock_release(&session_spin);
|
spinlock_release(&session_spin);
|
||||||
}
|
}
|
||||||
|
spinlock_release(&timeout_lock);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -2,10 +2,10 @@
|
|||||||
#define _HK_HEARTBEAT_H
|
#define _HK_HEARTBEAT_H
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The global housekeeper heartbeat value. This value is increamente
|
* The global housekeeper heartbeat value. This value is incremented
|
||||||
* every 100ms and may be used for crude timing etc.
|
* every 100 milliseconds and may be used for crude timing etc.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
extern unsigned long hkheartbeat;
|
extern long hkheartbeat;
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -135,6 +135,10 @@ enum
|
|||||||
|
|
||||||
#define DEFAULT_SSL_CERT_VERIFY_DEPTH 100 /*< The default certificate verification depth */
|
#define DEFAULT_SSL_CERT_VERIFY_DEPTH 100 /*< The default certificate verification depth */
|
||||||
#define SERVICE_MAX_RETRY_INTERVAL 3600 /*< The maximum interval between service start retries */
|
#define SERVICE_MAX_RETRY_INTERVAL 3600 /*< The maximum interval between service start retries */
|
||||||
|
|
||||||
|
/** Value of service timeout if timeout checks are disabled */
|
||||||
|
#define SERVICE_NO_SESSION_TIMEOUT LONG_MAX
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parameters that are automatically detected but can also be configured by the
|
* Parameters that are automatically detected but can also be configured by the
|
||||||
* user are initially set to this value.
|
* user are initially set to this value.
|
||||||
@ -180,7 +184,7 @@ typedef struct service
|
|||||||
SERVICE_REFRESH_RATE rate_limit; /**< The refresh rate limit for users table */
|
SERVICE_REFRESH_RATE rate_limit; /**< The refresh rate limit for users table */
|
||||||
FILTER_DEF **filters; /**< Ordered list of filters */
|
FILTER_DEF **filters; /**< Ordered list of filters */
|
||||||
int n_filters; /**< Number of filters */
|
int n_filters; /**< Number of filters */
|
||||||
int conn_timeout; /*< Session timeout in seconds */
|
long conn_idle_timeout; /**< Session timeout in seconds */
|
||||||
ssl_mode_t ssl_mode; /*< one of DISABLED, ENABLED or REQUIRED */
|
ssl_mode_t ssl_mode; /*< one of DISABLED, ENABLED or REQUIRED */
|
||||||
char *weightby;
|
char *weightby;
|
||||||
struct service *next; /**< The next service in the linked list */
|
struct service *next; /**< The next service in the linked list */
|
||||||
|
@ -146,6 +146,13 @@ typedef struct session
|
|||||||
#endif
|
#endif
|
||||||
} SESSION;
|
} SESSION;
|
||||||
|
|
||||||
|
/** Whether to do session timeout checks */
|
||||||
|
extern bool check_timeouts;
|
||||||
|
|
||||||
|
/** When the next timeout check is done. This is compared to hkheartbeat in
|
||||||
|
* hk_heartbeat.h */
|
||||||
|
extern long next_timeout_check;
|
||||||
|
|
||||||
#define SESSION_PROTOCOL(x, type) DCB_PROTOCOL((x)->client, type)
|
#define SESSION_PROTOCOL(x, type) DCB_PROTOCOL((x)->client, type)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -184,7 +191,7 @@ int session_unlink_dcb(SESSION*, DCB*);
|
|||||||
SESSION* get_session_by_router_ses(void* rses);
|
SESSION* get_session_by_router_ses(void* rses);
|
||||||
void session_enable_log_priority(SESSION* ses, int priority);
|
void session_enable_log_priority(SESSION* ses, int priority);
|
||||||
void session_disable_log_priority(SESSION* ses, int priority);
|
void session_disable_log_priority(SESSION* ses, int priority);
|
||||||
void session_close_timeouts(void* data);
|
|
||||||
RESULTSET *sessionGetList(SESSIONLISTFILTER);
|
RESULTSET *sessionGetList(SESSIONLISTFILTER);
|
||||||
|
void process_idle_sessions();
|
||||||
|
void enable_session_timeouts();
|
||||||
#endif
|
#endif
|
||||||
|
Reference in New Issue
Block a user