poll.c: added maxscale thread id to log

session.c: Replaced free(session->router_sesision) with call to freeSession callback
users.c: Removed reference to uninitialized variable.
router.h: Added freeSession callback to function block.
mysql_backend.c: try to ensure that client dcb is still listening in epoll_wait when writing reply to it.
mysql_common.c: assert debug build is mysql_protocol_init is called with dcb == NULL
readconnroute.c, readwritesplit.c, debugcli.c and testroute.c : Added freeSession to function block and an inmplementation of it.
This commit is contained in:
vraatikka 2013-09-12 22:17:11 +03:00
parent c4d01cdaed
commit bbc9dcc9a3
10 changed files with 227 additions and 95 deletions

View File

@ -199,8 +199,10 @@ poll_waitevents(void *arg)
#else
if (!no_op) {
skygw_log_write(LOGFILE_TRACE,
"%lu [poll_waitevents] > epoll_wait <",
pthread_self());
"%lu [poll_waitevents] MaxScale thread %d > "
"epoll_wait <",
pthread_self(),
thread_id);
no_op = TRUE;
}
simple_mutex_lock(&epoll_wait_mutex, TRUE);
@ -251,27 +253,21 @@ poll_waitevents(void *arg)
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] event %d",
"%lu %d [poll_waitevents] event %d dcb %p",
pthread_self(),
ev);
thread_id,
ev,
dcb);
if (ev & EPOLLERR)
{
atomic_add(&pollStats.n_error, 1);
dcb->func.error(dcb);
if (DCB_ISZOMBIE(dcb)) {
continue;
}
}
}
if (ev & EPOLLHUP)
{
atomic_add(&pollStats.n_hup, 1);
dcb->func.hangup(dcb);
if (DCB_ISZOMBIE(dcb)) {
continue;
}
}
if (ev & EPOLLOUT)
{
@ -282,9 +278,10 @@ poll_waitevents(void *arg)
dcb->dcb_write_active = TRUE;
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] "
"Write in fd %d",
"%lu %d [poll_waitevents] "
"Write in fd %d",
pthread_self(),
thread_id,
dcb->fd);
atomic_add(&pollStats.n_write, 1);
dcb->func.write_ready(dcb);
@ -303,9 +300,10 @@ poll_waitevents(void *arg)
{
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] "
"%lu %d [poll_waitevents] "
"Accept in fd %d",
pthread_self(),
thread_id,
dcb->fd);
atomic_add(&pollStats.n_accept, 1);
dcb->func.accept(dcb);
@ -314,9 +312,10 @@ poll_waitevents(void *arg)
{
skygw_log_write(
LOGFILE_TRACE,
"%lu [poll_waitevents] "
"Read in fd %d",
"%lu %d [poll_waitevents] "
"Read in fd %d",
pthread_self(),
thread_id,
dcb->fd);
atomic_add(&pollStats.n_read, 1);
dcb->func.read(dcb);

View File

@ -205,7 +205,9 @@ bool session_free(
/* Free router_session and session */
if (session->router_session) {
free(session->router_session);
session->service->router->freeSession(
session->service->router_instance,
session->router_session);
}
free(session);
succp = true;

View File

@ -114,7 +114,6 @@ int del;
atomic_add(&users->stats.n_deletes, 1);
if (users->stats.n_entries == 1) {
atomic_add(&users->stats.n_entries, del * -1);
return 0;
}
del = hashtable_delete(users->data, user);

View File

@ -32,7 +32,7 @@
* @endverbatim
*/
#define MAX_EVENTS 1000
#define EPOLL_TIMEOUT 1000 /**< The epoll timeout we use (milliseconds) */
#define EPOLL_TIMEOUT 1000 /**< The epoll timeout in milliseconds */
extern void poll_init();
extern int poll_add_dcb(DCB *);

View File

@ -66,6 +66,7 @@ typedef struct router_object {
ROUTER *(*createInstance)(SERVICE *service, char **options);
void *(*newSession)(ROUTER *instance, SESSION *session);
void (*closeSession)(ROUTER *instance, void *router_session);
void (*freeSession)(ROUTER *instance, void *router_session);
int (*routeQuery)(ROUTER *instance, void *router_session, GWBUF *queue);
void (*diagnostics)(ROUTER *instance, DCB *dcb);
void (*clientReply)(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb);

View File

@ -152,9 +152,6 @@ static int gw_read_backend_event(DCB *dcb) {
ss_info_dassert(dcb->session != NULL,
"Backend dcb doesn't have session");
ss_info_dassert(dcb->session->client != NULL,
"Session's client dcb pointer is NULL");
client_protocol = SESSION_PROTOCOL(dcb->session, MySQLProtocol);
backend_protocol = (MySQLProtocol *) dcb->protocol;
/** return only with complete session */
@ -225,11 +222,6 @@ static int gw_read_backend_event(DCB *dcb) {
current_session->user);
backend_protocol->state = MYSQL_AUTH_FAILED;
#if 0
/** vraa : this traps easily. Why? */
ss_dassert(backend_protocol->state !=
MYSQL_AUTH_FAILED);
#endif
/* send an error to the client */
mysql_send_custom_error(
dcb->session->client,
@ -327,26 +319,30 @@ static int gw_read_backend_event(DCB *dcb) {
*/
/**
* If dcb->session->client is freed already it may be NULL, and
* protocol can't be read. However, then it wouldn't be possible
* that there was anything to write to client in that case.
* Should this be protected somehow, anyway?
* If dcb->session->client is freed already it may be NULL.
*/
client_protocol = SESSION_PROTOCOL(dcb->session, MySQLProtocol);
CHK_PROTOCOL(client_protocol);
if (client_protocol != NULL &&
(client_protocol->state == MYSQL_WAITING_RESULT ||
client_protocol->state == MYSQL_IDLE))
{
router->clientReply(router_instance, rsession, head, dcb);
rc = 1;
if (dcb->session->client != NULL) {
client_protocol = SESSION_PROTOCOL(dcb->session, MySQLProtocol);
}
goto return_rc;
}
rc = 0;
if (client_protocol != NULL) {
CHK_PROTOCOL(client_protocol);
if (client_protocol->state == MYSQL_WAITING_RESULT ||
client_protocol->state == MYSQL_IDLE)
{
router->clientReply(router_instance,
rsession,
head,
dcb);
rc = 1;
}
goto return_rc;
}
}
return_rc:
return rc;
return rc;
}
/*
@ -361,6 +357,19 @@ static int gw_write_backend_event(DCB *dcb) {
//fprintf(stderr, ">>> backend EPOLLOUT %i, protocol state [%s]\n", backend_protocol->fd, gw_mysql_protocol_state2string(backend_protocol->state));
// spinlock_acquire(&dcb->connectlock);
/**
* Don't write to backend if backend_dcb is not in poll set anymore.
*/
if (dcb->state != DCB_STATE_POLLING) {
mysql_send_custom_error(
dcb->session->client,
1,
0,
"Writing to backend failed");
return 0;
}
/**
* vraa: what is the logic in this?
*/
@ -386,6 +395,19 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
{
MySQLProtocol *backend_protocol = dcb->protocol;
/**
* Don't write to backend if backend_dcb is not in poll set anymore.
*/
if (dcb->state != DCB_STATE_POLLING) {
mysql_send_custom_error(
dcb->session->client,
1,
0,
"Writing to backend failed");
return 0;
}
spinlock_acquire(&dcb->authlock);
/**
@ -414,11 +436,30 @@ gw_MySQLWrite_backend(DCB *dcb, GWBUF *queue)
*
*/
static int gw_error_backend_event(DCB *dcb) {
/*
fprintf(stderr, ">>> Handle Backend error function for %i\n", dcb->fd);
*/
if (dcb->state != DCB_STATE_POLLING) {
mysql_send_custom_error(
dcb->session->client,
1,
0,
"Writing to backend failed.");
return 0;
}
skygw_log_write_flush(
LOGFILE_ERROR,
"%lu [gw_error_backend_event] Some error occurred in backend.",
pthread_self());
mysql_send_custom_error(
dcb->session->client,
1,
0,
"Closed backend connection.");
dcb_close(dcb);
return 1;
}

View File

@ -49,11 +49,20 @@ static char *version_str = "V1.0.1";
static ROUTER *createInstance(SERVICE *service, char **options);
static void *newSession(ROUTER *instance, SESSION *session);
static void closeSession(ROUTER *instance, void *router_session);
static void freeSession(ROUTER *instance, void *router_session);
static int execute(ROUTER *instance, void *router_session, GWBUF *queue);
static void diagnostics(ROUTER *instance, DCB *dcb);
/** The module object definition */
static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, execute, diagnostics, NULL };
static ROUTER_OBJECT MyObject = {
createInstance,
newSession,
closeSession,
freeSession,
execute,
diagnostics,
NULL
};
extern int execute_cmd(CLI_SESSION *cli);
@ -199,6 +208,13 @@ CLI_SESSION *session = (CLI_SESSION *)router_session;
*/
}
static void freeSession(
ROUTER* router_instance,
void* router_client_session)
{
return;
}
/**
* We have data from the client, we must route it to the backend.
* This is simply a case of sending it to the connection that was

View File

@ -88,6 +88,7 @@ static char *version_str = "V1.0.2";
static ROUTER *createInstance(SERVICE *service, char **options);
static void *newSession(ROUTER *instance, SESSION *session);
static void closeSession(ROUTER *instance, void *router_session);
static void freeSession(ROUTER *instance, void *router_session);
static int routeQuery(ROUTER *instance, void *router_session, GWBUF *queue);
static void diagnostics(ROUTER *instance, DCB *dcb);
static void clientReply(
@ -101,6 +102,7 @@ static ROUTER_OBJECT MyObject = {
createInstance,
newSession,
closeSession,
freeSession,
routeQuery,
diagnostics,
clientReply
@ -393,6 +395,69 @@ int i;
return (void *)client_ses;
}
/**
* @node Unlink from backend server, unlink from router's connection list,
* and free memory of a router client session.
*
* Parameters:
* @param router - <usage>
* <description>
*
* @param router_cli_ses - <usage>
* <description>
*
* @return void
*
*
* @details (write detailed description here)
*
*/
static void freeSession(
ROUTER* router_instance,
void* router_client_ses)
{
ROUTER_INSTANCE* router = (ROUTER_INSTANCE *)router_instance;
ROUTER_CLIENT_SES* router_cli_ses =
(ROUTER_CLIENT_SES *)router_client_ses;
int prev_val;
prev_val = atomic_add(&router_cli_ses->backend->current_connection_count, -1);
ss_dassert(prev_val > 0);
atomic_add(&router_cli_ses->backend->server->stats.n_current, -1);
spinlock_acquire(&router->lock);
if (router->connections == router_cli_ses) {
router->connections = router_cli_ses->next;
} else {
ROUTER_CLIENT_SES *ptr = router->connections;
while (ptr != NULL && ptr->next != router_cli_ses) {
ptr = ptr->next;
}
if (ptr != NULL) {
ptr->next = router_cli_ses->next;
}
}
spinlock_release(&router->lock);
skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [freeSession] Unlinked router_client_session %p from "
"router %p and form server on port %d. Connections : %d "
"session %p.",
pthread_self(),
router_cli_ses,
router,
router_cli_ses->backend->server->port,
prev_val-1,
router_cli_ses->backend_dcb->session);
free(router_cli_ses);
}
/**
* Close a session with the router, this is the mechanism
* by which a router may cleanup data structure etc.
@ -410,34 +475,7 @@ bool succp = false;
/*
* Close the connection to the backend
*/
skygw_log_write_flush(
LOGFILE_TRACE,
"%lu [closeSession] closing session with "
"router_session "
"%p, and inst %p.",
pthread_self(),
router_ses,
router_inst);
router_ses->backend_dcb->func.close(router_ses->backend_dcb);
atomic_add(&router_ses->backend->current_connection_count, -1);
atomic_add(&router_ses->backend->server->stats.n_current, -1);
spinlock_acquire(&router_inst->lock);
if (router_inst->connections == router_ses)
router_inst->connections = router_ses->next;
else
{
ROUTER_CLIENT_SES *ptr = router_inst->connections;
while (ptr && ptr->next != router_ses)
ptr = ptr->next;
if (ptr)
ptr->next = router_ses->next;
}
spinlock_release(&router_inst->lock);
/**
* Router session is freed in session.c:session_close, when session who
* owns it, is freed.
*/
}
/**
@ -457,23 +495,34 @@ ROUTER_INSTANCE *inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES *session = (ROUTER_CLIENT_SES *)router_session;
uint8_t *payload = GWBUF_DATA(queue);
int mysql_command = -1;
int rc;
inst->stats.n_queries++;
mysql_command = MYSQL_GET_COMMAND(payload);
switch(mysql_command) {
case MYSQL_COM_CHANGE_USER:
return session->backend_dcb->func.auth(
session->backend_dcb,
NULL,
session->backend_dcb->session,
queue);
default:
return session->backend_dcb->func.write(
session->backend_dcb,
queue);
case MYSQL_COM_CHANGE_USER:
rc = session->backend_dcb->func.auth(
session->backend_dcb,
NULL,
session->backend_dcb->session,
queue);
default:
rc = session->backend_dcb->func.write(
session->backend_dcb,
queue);
}
skygw_log_write(
LOGFILE_DEBUG,
"%lu [readconnroute:routeQuery] Routed command %d to dcb %p "
"with return value %d.",
pthread_self(),
mysql_command,
session->backend_dcb,
rc);
return rc;
}
/**
@ -530,4 +579,4 @@ clientReply(
client->func.write(client, queue);
}
///

View File

@ -55,18 +55,21 @@ static char *version_str = "V1.0.2";
static ROUTER* createInstance(SERVICE *service, char **options);
static void* newSession(ROUTER *instance, SESSION *session);
static void closeSession(ROUTER *instance, void *session);
static void freeSession(ROUTER *instance, void *session);
static int routeQuery(ROUTER *instance, void *session, GWBUF *queue);
static void diagnostic(ROUTER *instance, DCB *dcb);
static void clientReply(ROUTER* instance, void* router_session, GWBUF* queue, DCB *backend_dcb);
static ROUTER_OBJECT MyObject =
{ createInstance,
newSession,
closeSession,
routeQuery,
diagnostic,
clientReply };
static ROUTER_OBJECT MyObject = {
createInstance,
newSession,
closeSession,
freeSession,
routeQuery,
diagnostic,
clientReply
};
static SPINLOCK instlock;
static INSTANCE* instances;
@ -365,6 +368,12 @@ static void closeSession(
free(session);
}
static void freeSession(
ROUTER* router_instance,
void* router_client_session)
{
return;
}
/**
* The main routing entry, this is called with every packet that is

View File

@ -23,10 +23,20 @@ static char *version_str = "V1.0.0";
static ROUTER *createInstance(SERVICE *service, char **options);
static void *newSession(ROUTER *instance, SESSION *session);
static void closeSession(ROUTER *instance, void *session);
static void freeSession(ROUTER *instance, void *session);
static int routeQuery(ROUTER *instance, void *session, GWBUF *queue);
static void diagnostic(ROUTER *instance, DCB *dcb);
static ROUTER_OBJECT MyObject = { createInstance, newSession, closeSession, routeQuery, diagnostic, NULL };
static ROUTER_OBJECT MyObject = {
createInstance,
newSession,
closeSession,
freeSession,
routeQuery,
diagnostic,
NULL
};
/**
* Implementation of the mandatory version entry point
@ -104,6 +114,12 @@ closeSession(ROUTER *instance, void *session)
{
}
static void freeSession(
ROUTER* router_instance,
void* router_client_session)
{
return;
}
static int
routeQuery(ROUTER *instance, void *session, GWBUF *queue)