Compile server, service and session as C++
This commit is contained in:
983
server/core/session.cc
Normal file
983
server/core/session.cc
Normal file
@ -0,0 +1,983 @@
|
||||
/*
|
||||
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||
*
|
||||
* Use of this software is governed by the Business Source License included
|
||||
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||
*
|
||||
* Change Date: 2019-07-01
|
||||
*
|
||||
* On the date above, in accordance with the Business Source License, use
|
||||
* of this software will be governed by version 2 or later of the General
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @file session.c - A representation of the session within the gateway.
|
||||
*
|
||||
* @verbatim
|
||||
* Revision History
|
||||
*
|
||||
* Date Who Description
|
||||
* 17/06/13 Mark Riddoch Initial implementation
|
||||
* 02/09/13 Massimiliano Pinto Added session refcounter
|
||||
* 29/05/14 Mark Riddoch Addition of filter mechanism
|
||||
* 23/08/15 Martin Brampton Tidying; slight improvement in safety
|
||||
* 17/09/15 Martin Brampton Keep failed session in existence - leave DCBs to close
|
||||
* 27/06/16 Martin Brampton Amend to utilise list manager
|
||||
*
|
||||
* @endverbatim
|
||||
*/
|
||||
#include <maxscale/session.h>
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
#include <string.h>
|
||||
#include <errno.h>
|
||||
|
||||
#include <maxscale/alloc.h>
|
||||
#include <maxscale/atomic.h>
|
||||
#include <maxscale/dcb.h>
|
||||
#include <maxscale/housekeeper.h>
|
||||
#include <maxscale/log_manager.h>
|
||||
#include <maxscale/poll.h>
|
||||
#include <maxscale/router.h>
|
||||
#include <maxscale/service.h>
|
||||
#include <maxscale/spinlock.h>
|
||||
|
||||
#include "maxscale/session.h"
|
||||
#include "maxscale/filter.h"
|
||||
|
||||
/** Global session id; updated safely by use of atomic_add */
|
||||
static int session_id;
|
||||
|
||||
static struct session session_dummy_struct;
|
||||
|
||||
static void session_initialize(void *session);
|
||||
static int session_setup_filters(MXS_SESSION *session);
|
||||
static void session_simple_free(MXS_SESSION *session, DCB *dcb);
|
||||
static void session_add_to_all_list(MXS_SESSION *session);
|
||||
static MXS_SESSION *session_find_free();
|
||||
static void session_final_free(MXS_SESSION *session);
|
||||
|
||||
/**
|
||||
* The clientReply of the session.
|
||||
*
|
||||
* @param inst In reality an MXS_SESSION*.
|
||||
* @param session In reality an MXS_SESSION*.
|
||||
* @param data The data to send to the client.
|
||||
*/
|
||||
static int session_reply(MXS_FILTER *inst, MXS_FILTER_SESSION *session, GWBUF *data);
|
||||
|
||||
/**
|
||||
* @brief Initialize a session
|
||||
*
|
||||
* This routine puts initial values into the fields of the session pointed to
|
||||
* by the parameter.
|
||||
*
|
||||
* @param *session Pointer to the session to be initialized
|
||||
*/
|
||||
static void
|
||||
session_initialize(MXS_SESSION *session)
|
||||
{
|
||||
memset(session, 0, sizeof(MXS_SESSION));
|
||||
|
||||
session->ses_chk_top = CHK_NUM_SESSION;
|
||||
session->state = SESSION_STATE_ALLOC;
|
||||
session->ses_chk_tail = CHK_NUM_SESSION;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocate a new session for a new client of the specified service.
|
||||
*
|
||||
* Create the link to the router session by calling the newSession
|
||||
* entry point of the router using the router instance of the
|
||||
* service this session is part of.
|
||||
*
|
||||
* @param service The service this connection was established by
|
||||
* @param client_dcb The client side DCB
|
||||
* @return The newly created session or NULL if an error occured
|
||||
*/
|
||||
MXS_SESSION *
|
||||
session_alloc(SERVICE *service, DCB *client_dcb)
|
||||
{
|
||||
MXS_SESSION *session = (MXS_SESSION *)(MXS_MALLOC(sizeof(*session)));
|
||||
|
||||
if (NULL == session)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
session_initialize(session);
|
||||
|
||||
/** Assign a session id and increase */
|
||||
session->ses_id = (size_t)atomic_add(&session_id, 1) + 1;
|
||||
session->ses_is_child = (bool) DCB_IS_CLONE(client_dcb);
|
||||
session->service = service;
|
||||
session->client_dcb = client_dcb;
|
||||
session->stats.connect = time(0);
|
||||
session->stmt.buffer = NULL;
|
||||
session->stmt.target = NULL;
|
||||
/*<
|
||||
* Associate the session to the client DCB and set the reference count on
|
||||
* the session to indicate that there is a single reference to the
|
||||
* session. There is no need to protect this or use atomic add as the
|
||||
* session has not been made available to the other threads at this
|
||||
* point.
|
||||
*/
|
||||
session->refcount = 1;
|
||||
/*<
|
||||
* This indicates that session is ready to be shared with backend
|
||||
* DCBs. Note that this doesn't mean that router is initialized yet!
|
||||
*/
|
||||
session->state = SESSION_STATE_READY;
|
||||
|
||||
session->trx_state = SESSION_TRX_INACTIVE;
|
||||
session->autocommit = true;
|
||||
/*
|
||||
* Only create a router session if we are not the listening
|
||||
* DCB or an internal DCB. Creating a router session may create a connection to a
|
||||
* backend server, depending upon the router module implementation
|
||||
* and should be avoided for the listener session
|
||||
*
|
||||
* Router session creation may create other DCBs that link to the
|
||||
* session, therefore it is important that the session lock is
|
||||
* relinquished before the router call.
|
||||
*/
|
||||
if (client_dcb->state != DCB_STATE_LISTENING &&
|
||||
client_dcb->dcb_role != DCB_ROLE_INTERNAL)
|
||||
{
|
||||
session->router_session = service->router->newSession(service->router_instance, session);
|
||||
if (session->router_session == NULL)
|
||||
{
|
||||
session->state = SESSION_STATE_TO_BE_FREED;
|
||||
MXS_ERROR("Failed to create new router session for service '%s'. "
|
||||
"See previous errors for more details.", service->name);
|
||||
}
|
||||
/*
|
||||
* Pending filter chain being setup set the head of the chain to
|
||||
* be the router. As filters are inserted the current head will
|
||||
* be pushed to the filter and the head updated.
|
||||
*
|
||||
* NB This dictates that filters are created starting at the end
|
||||
* of the chain nearest the router working back to the client
|
||||
* protocol end of the chain.
|
||||
*/
|
||||
// NOTE: Here we cast the router instance into a MXS_FILTER and
|
||||
// NOTE: and the router session into a MXS_FILTER_SESSION and
|
||||
// NOTE: the router routeQuery into a filter routeQuery. That
|
||||
// NOTE: is in order to be able to treat the router as the first
|
||||
// NOTE: filter.
|
||||
session->head.instance = (MXS_FILTER*)service->router_instance;
|
||||
session->head.session = (MXS_FILTER_SESSION*)session->router_session;
|
||||
session->head.routeQuery =
|
||||
(int32_t (*)(MXS_FILTER*, MXS_FILTER_SESSION*, GWBUF*))service->router->routeQuery;
|
||||
|
||||
// NOTE: Here we cast the session into a MXS_FILTER and MXS_FILTER_SESSION
|
||||
// NOTE: and session_reply into a filter clientReply. That's dubious but ok
|
||||
// NOTE: as session_reply will know what to do. In practice, the session
|
||||
// NOTE: will be called as if it would be the last filter.
|
||||
session->tail.instance = (MXS_FILTER*)session;
|
||||
session->tail.session = (MXS_FILTER_SESSION*)session;
|
||||
session->tail.clientReply = session_reply;
|
||||
|
||||
if (SESSION_STATE_TO_BE_FREED != session->state
|
||||
&& service->n_filters > 0
|
||||
&& !session_setup_filters(session))
|
||||
{
|
||||
session->state = SESSION_STATE_TO_BE_FREED;
|
||||
MXS_ERROR("Setting up filters failed. "
|
||||
"Terminating session %s.",
|
||||
service->name);
|
||||
}
|
||||
}
|
||||
|
||||
if (SESSION_STATE_TO_BE_FREED != session->state)
|
||||
{
|
||||
session->state = SESSION_STATE_ROUTER_READY;
|
||||
|
||||
if (session->client_dcb->user == NULL)
|
||||
{
|
||||
MXS_INFO("Started session [%lu] for %s service ",
|
||||
session->ses_id,
|
||||
service->name);
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_INFO("Started %s client session [%lu] for '%s' from %s",
|
||||
service->name,
|
||||
session->ses_id,
|
||||
session->client_dcb->user,
|
||||
session->client_dcb->remote);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
MXS_INFO("Start %s client session [%lu] for '%s' from %s failed, will be "
|
||||
"closed as soon as all related DCBs have been closed.",
|
||||
service->name,
|
||||
session->ses_id,
|
||||
session->client_dcb->user,
|
||||
session->client_dcb->remote);
|
||||
}
|
||||
atomic_add(&service->stats.n_sessions, 1);
|
||||
atomic_add(&service->stats.n_current, 1);
|
||||
CHK_SESSION(session);
|
||||
|
||||
client_dcb->session = session;
|
||||
return SESSION_STATE_TO_BE_FREED == session->state ? NULL : session;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocate a dummy session so that DCBs can always have sessions.
|
||||
*
|
||||
* Only one dummy session exists, it is statically declared
|
||||
*
|
||||
* @param client_dcb The client side DCB
|
||||
* @return The dummy created session
|
||||
*/
|
||||
MXS_SESSION *
|
||||
session_set_dummy(DCB *client_dcb)
|
||||
{
|
||||
MXS_SESSION *session;
|
||||
|
||||
session = &session_dummy_struct;
|
||||
session->ses_chk_top = CHK_NUM_SESSION;
|
||||
session->ses_chk_tail = CHK_NUM_SESSION;
|
||||
session->ses_is_child = false;
|
||||
session->service = NULL;
|
||||
session->client_dcb = NULL;
|
||||
session->n_filters = 0;
|
||||
memset(&session->stats, 0, sizeof(MXS_SESSION_STATS));
|
||||
session->stats.connect = 0;
|
||||
session->state = SESSION_STATE_DUMMY;
|
||||
session->refcount = 1;
|
||||
session->ses_id = 0;
|
||||
|
||||
client_dcb->session = session;
|
||||
return session;
|
||||
}
|
||||
|
||||
/**
|
||||
* Link a session to a DCB.
|
||||
*
|
||||
* @param session The session to link with the dcb
|
||||
* @param dcb The DCB to be linked
|
||||
* @return True if the session was sucessfully linked to the DCB
|
||||
*/
|
||||
bool
|
||||
session_link_dcb(MXS_SESSION *session, DCB *dcb)
|
||||
{
|
||||
ss_info_dassert(session->state != SESSION_STATE_FREE,
|
||||
"If session->state is SESSION_STATE_FREE then this attempt to "
|
||||
"access freed memory block.");
|
||||
if (session->state == SESSION_STATE_FREE)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
atomic_add(&session->refcount, 1);
|
||||
dcb->session = session;
|
||||
/** Move this DCB under the same thread */
|
||||
dcb->thread.id = session->client_dcb->thread.id;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deallocate the specified session, minimal actions during session_alloc
|
||||
* Since changes to keep new session in existence until all related DCBs
|
||||
* have been destroyed, this function is redundant. Just left until we are
|
||||
* sure of the direction taken.
|
||||
*
|
||||
* @param session The session to deallocate
|
||||
*/
|
||||
static void
|
||||
session_simple_free(MXS_SESSION *session, DCB *dcb)
|
||||
{
|
||||
/* Does this possibly need a lock? */
|
||||
if (dcb->data && !DCB_IS_CLONE(dcb))
|
||||
{
|
||||
void * clientdata = dcb->data;
|
||||
dcb->data = NULL;
|
||||
MXS_FREE(clientdata);
|
||||
}
|
||||
if (session)
|
||||
{
|
||||
if (SESSION_STATE_DUMMY == session->state)
|
||||
{
|
||||
return;
|
||||
}
|
||||
if (session && session->router_session)
|
||||
{
|
||||
session->service->router->freeSession(
|
||||
session->service->router_instance,
|
||||
session->router_session);
|
||||
}
|
||||
session->state = SESSION_STATE_STOPPING;
|
||||
}
|
||||
|
||||
session_final_free(session);
|
||||
}
|
||||
|
||||
void session_close(MXS_SESSION *session)
|
||||
{
|
||||
if (!session->ses_is_child && session->router_session)
|
||||
{
|
||||
if (session->state != SESSION_STATE_STOPPING)
|
||||
{
|
||||
session->state = SESSION_STATE_STOPPING;
|
||||
}
|
||||
|
||||
MXS_ROUTER_OBJECT* router = session->service->router;
|
||||
MXS_ROUTER* router_instance = session->service->router_instance;
|
||||
|
||||
/** Close router session and all its connections */
|
||||
router->closeSession(router_instance, session->router_session);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deallocate the specified session
|
||||
*
|
||||
* @param session The session to deallocate
|
||||
*/
|
||||
static void session_free(MXS_SESSION *session)
|
||||
{
|
||||
CHK_SESSION(session);
|
||||
ss_dassert(session->refcount == 0);
|
||||
|
||||
session->state = SESSION_STATE_TO_BE_FREED;
|
||||
atomic_add(&session->service->stats.n_current, -1);
|
||||
|
||||
if (session->client_dcb)
|
||||
{
|
||||
dcb_free_all_memory(session->client_dcb);
|
||||
}
|
||||
/**
|
||||
* If session is not child of some other session, free router_session.
|
||||
* Otherwise let the parent free it.
|
||||
*/
|
||||
if (!session->ses_is_child && session->router_session)
|
||||
{
|
||||
session->service->router->freeSession(session->service->router_instance,
|
||||
session->router_session);
|
||||
}
|
||||
if (session->n_filters)
|
||||
{
|
||||
int i;
|
||||
for (i = 0; i < session->n_filters; i++)
|
||||
{
|
||||
if (session->filters[i].filter)
|
||||
{
|
||||
session->filters[i].filter->obj->closeSession(session->filters[i].instance,
|
||||
session->filters[i].session);
|
||||
}
|
||||
}
|
||||
for (i = 0; i < session->n_filters; i++)
|
||||
{
|
||||
if (session->filters[i].filter)
|
||||
{
|
||||
session->filters[i].filter->obj->freeSession(session->filters[i].instance,
|
||||
session->filters[i].session);
|
||||
}
|
||||
}
|
||||
MXS_FREE(session->filters);
|
||||
}
|
||||
|
||||
MXS_INFO("Stopped %s client session [%lu]", session->service->name, session->ses_id);
|
||||
|
||||
/** If session doesn't have parent referencing to it, it can be freed */
|
||||
if (!session->ses_is_child)
|
||||
{
|
||||
session->state = SESSION_STATE_FREE;
|
||||
session_final_free(session);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
session_final_free(MXS_SESSION *session)
|
||||
{
|
||||
gwbuf_free(session->stmt.buffer);
|
||||
MXS_FREE(session);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check to see if a session is valid, i.e. in the list of all sessions
|
||||
*
|
||||
* @param session Session to check
|
||||
* @return 1 if the session is valid otherwise 0
|
||||
*/
|
||||
int
|
||||
session_isvalid(MXS_SESSION *session)
|
||||
{
|
||||
return session != NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* Print details of an individual session
|
||||
*
|
||||
* @param session Session to print
|
||||
*/
|
||||
void
|
||||
printSession(MXS_SESSION *session)
|
||||
{
|
||||
struct tm result;
|
||||
char timebuf[40];
|
||||
|
||||
printf("Session %p\n", session);
|
||||
printf("\tState: %s\n", session_state(session->state));
|
||||
printf("\tService: %s (%p)\n", session->service->name, session->service);
|
||||
printf("\tClient DCB: %p\n", session->client_dcb);
|
||||
printf("\tConnected: %s\n",
|
||||
asctime_r(localtime_r(&session->stats.connect, &result), timebuf));
|
||||
printf("\tRouter Session: %p\n", session->router_session);
|
||||
}
|
||||
|
||||
bool printAllSessions_cb(DCB *dcb, void *data)
|
||||
{
|
||||
if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER)
|
||||
{
|
||||
printSession(dcb->session);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Print all sessions
|
||||
*
|
||||
* Designed to be called within a debugger session in order
|
||||
* to display all active sessions within the gateway
|
||||
*/
|
||||
void
|
||||
printAllSessions()
|
||||
{
|
||||
dcb_foreach(printAllSessions_cb, NULL);
|
||||
}
|
||||
|
||||
/** Callback for dprintAllSessions */
|
||||
bool dprintAllSessions_cb(DCB *dcb, void *data)
|
||||
{
|
||||
if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER)
|
||||
{
|
||||
DCB *out_dcb = (DCB*)data;
|
||||
dprintSession(out_dcb, dcb->session);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Print all sessions to a DCB
|
||||
*
|
||||
* Designed to be called within a debugger session in order
|
||||
* to display all active sessions within the gateway
|
||||
*
|
||||
* @param dcb The DCB to print to
|
||||
*/
|
||||
void
|
||||
dprintAllSessions(DCB *dcb)
|
||||
{
|
||||
dcb_foreach(dprintAllSessions_cb, dcb);
|
||||
}
|
||||
|
||||
/**
|
||||
* Print a particular session to a DCB
|
||||
*
|
||||
* Designed to be called within a debugger session in order
|
||||
* to display all active sessions within the gateway
|
||||
*
|
||||
* @param dcb The DCB to print to
|
||||
* @param print_session The session to print
|
||||
*/
|
||||
void
|
||||
dprintSession(DCB *dcb, MXS_SESSION *print_session)
|
||||
{
|
||||
struct tm result;
|
||||
char buf[30];
|
||||
int i;
|
||||
|
||||
dcb_printf(dcb, "Session %lu\n", print_session->ses_id);
|
||||
dcb_printf(dcb, "\tState: %s\n", session_state(print_session->state));
|
||||
dcb_printf(dcb, "\tService: %s\n", print_session->service->name);
|
||||
|
||||
if (print_session->client_dcb && print_session->client_dcb->remote)
|
||||
{
|
||||
double idle = (hkheartbeat - print_session->client_dcb->last_read);
|
||||
idle = idle > 0 ? idle / 10.f : 0;
|
||||
dcb_printf(dcb, "\tClient Address: %s%s%s\n",
|
||||
print_session->client_dcb->user ? print_session->client_dcb->user : "",
|
||||
print_session->client_dcb->user ? "@" : "",
|
||||
print_session->client_dcb->remote);
|
||||
dcb_printf(dcb, "\tConnected: %s\n",
|
||||
asctime_r(localtime_r(&print_session->stats.connect, &result), buf));
|
||||
if (print_session->client_dcb->state == DCB_STATE_POLLING)
|
||||
{
|
||||
dcb_printf(dcb, "\tIdle: %.0f seconds\n", idle);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (print_session->n_filters)
|
||||
{
|
||||
for (i = 0; i < print_session->n_filters; i++)
|
||||
{
|
||||
dcb_printf(dcb, "\tFilter: %s\n",
|
||||
print_session->filters[i].filter->name);
|
||||
print_session->filters[i].filter->obj->diagnostics(print_session->filters[i].instance,
|
||||
print_session->filters[i].session,
|
||||
dcb);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool dListSessions_cb(DCB *dcb, void *data)
|
||||
{
|
||||
if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER)
|
||||
{
|
||||
DCB *out_dcb = (DCB*)data;
|
||||
MXS_SESSION *session = dcb->session;
|
||||
dcb_printf(out_dcb, "%-16lu | %-15s | %-14s | %s\n", session->ses_id,
|
||||
session->client_dcb && session->client_dcb->remote ?
|
||||
session->client_dcb->remote : "",
|
||||
session->service && session->service->name ?
|
||||
session->service->name : "",
|
||||
session_state(session->state));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* List all sessions in tabular form to a DCB
|
||||
*
|
||||
* Designed to be called within a debugger session in order
|
||||
* to display all active sessions within the gateway
|
||||
*
|
||||
* @param dcb The DCB to print to
|
||||
*/
|
||||
void
|
||||
dListSessions(DCB *dcb)
|
||||
{
|
||||
dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n");
|
||||
dcb_printf(dcb, "Session | Client | Service | State\n");
|
||||
dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n");
|
||||
|
||||
dcb_foreach(dListSessions_cb, dcb);
|
||||
|
||||
dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n\n");
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a session state to a string representation
|
||||
*
|
||||
* @param state The session state
|
||||
* @return A string representation of the session state
|
||||
*/
|
||||
const char *
|
||||
session_state(mxs_session_state_t state)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case SESSION_STATE_ALLOC:
|
||||
return "Session Allocated";
|
||||
case SESSION_STATE_DUMMY:
|
||||
return "Dummy Session";
|
||||
case SESSION_STATE_READY:
|
||||
return "Session Ready";
|
||||
case SESSION_STATE_ROUTER_READY:
|
||||
return "Session ready for routing";
|
||||
case SESSION_STATE_LISTENER:
|
||||
return "Listener Session";
|
||||
case SESSION_STATE_LISTENER_STOPPED:
|
||||
return "Stopped Listener Session";
|
||||
case SESSION_STATE_STOPPING:
|
||||
return "Stopping session";
|
||||
case SESSION_STATE_TO_BE_FREED:
|
||||
return "Session to be freed";
|
||||
case SESSION_STATE_FREE:
|
||||
return "Freed session";
|
||||
default:
|
||||
return "Invalid State";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the filter chain for this session.
|
||||
*
|
||||
* Filters must be setup in reverse order, starting with the last
|
||||
* filter in the chain and working back towards the client connection
|
||||
* Each filter is passed the current session head of the filter chain
|
||||
* this head becomes the destination for the filter. The newly created
|
||||
* filter becomes the new head of the filter chain.
|
||||
*
|
||||
* @param session The session that requires the chain
|
||||
* @return 0 if filter creation fails
|
||||
*/
|
||||
static int
|
||||
session_setup_filters(MXS_SESSION *session)
|
||||
{
|
||||
SERVICE *service = session->service;
|
||||
MXS_DOWNSTREAM *head;
|
||||
MXS_UPSTREAM *tail;
|
||||
int i;
|
||||
|
||||
if ((session->filters = (SESSION_FILTER*)MXS_CALLOC(service->n_filters,
|
||||
sizeof(SESSION_FILTER))) == NULL)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
session->n_filters = service->n_filters;
|
||||
for (i = service->n_filters - 1; i >= 0; i--)
|
||||
{
|
||||
if (service->filters[i] == NULL)
|
||||
{
|
||||
MXS_ERROR("Service '%s' contians an unresolved filter.", service->name);
|
||||
return 0;
|
||||
}
|
||||
if ((head = filter_apply(service->filters[i], session,
|
||||
&session->head)) == NULL)
|
||||
{
|
||||
MXS_ERROR("Failed to create filter '%s' for "
|
||||
"service '%s'.\n",
|
||||
service->filters[i]->name,
|
||||
service->name);
|
||||
return 0;
|
||||
}
|
||||
session->filters[i].filter = service->filters[i];
|
||||
session->filters[i].session = head->session;
|
||||
session->filters[i].instance = head->instance;
|
||||
session->head = *head;
|
||||
MXS_FREE(head);
|
||||
}
|
||||
|
||||
for (i = 0; i < service->n_filters; i++)
|
||||
{
|
||||
if ((tail = filter_upstream(service->filters[i],
|
||||
session->filters[i].session,
|
||||
&session->tail)) == NULL)
|
||||
{
|
||||
MXS_ERROR("Failed to create filter '%s' for service '%s'.",
|
||||
service->filters[i]->name,
|
||||
service->name);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* filter_upstream may simply return the 3 parameter if
|
||||
* the filter has no upstream entry point. So no need
|
||||
* to copy the contents or free tail in this case.
|
||||
*/
|
||||
if (tail != &session->tail)
|
||||
{
|
||||
session->tail = *tail;
|
||||
MXS_FREE(tail);
|
||||
}
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Entry point for the final element in the upstream filter, i.e. the writing
|
||||
* of the data to the client.
|
||||
*
|
||||
* Looks like a filter `clientReply`, but in this case both the instance and
|
||||
* the session argument will be a MXS_SESSION*.
|
||||
*
|
||||
* @param instance The "instance" data
|
||||
* @param session The session
|
||||
* @param data The buffer chain to write
|
||||
*/
|
||||
int
|
||||
session_reply(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *data)
|
||||
{
|
||||
MXS_SESSION *the_session = (MXS_SESSION*)session;
|
||||
|
||||
return the_session->client_dcb->func.write(the_session->client_dcb, data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the client connection address or name
|
||||
*
|
||||
* @param session The session whose client address to return
|
||||
*/
|
||||
const char *
|
||||
session_get_remote(const MXS_SESSION *session)
|
||||
{
|
||||
if (session && session->client_dcb)
|
||||
{
|
||||
return session->client_dcb->remote;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
bool session_route_query(MXS_SESSION* ses, GWBUF* buf)
|
||||
{
|
||||
bool succp;
|
||||
|
||||
if (ses->head.routeQuery == NULL ||
|
||||
ses->head.instance == NULL ||
|
||||
ses->head.session == NULL)
|
||||
{
|
||||
succp = false;
|
||||
goto return_succp;
|
||||
}
|
||||
|
||||
if (ses->head.routeQuery(ses->head.instance, ses->head.session, buf) == 1)
|
||||
{
|
||||
succp = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
succp = false;
|
||||
}
|
||||
return_succp:
|
||||
return succp;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return the username of the user connected to the client side of the
|
||||
* session.
|
||||
*
|
||||
* @param session The session pointer.
|
||||
* @return The user name or NULL if it can not be determined.
|
||||
*/
|
||||
const char *
|
||||
session_get_user(const MXS_SESSION *session)
|
||||
{
|
||||
return (session && session->client_dcb) ? session->client_dcb->user : NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* Callback structure for the session list extraction
|
||||
*/
|
||||
typedef struct
|
||||
{
|
||||
int index;
|
||||
int current;
|
||||
SESSIONLISTFILTER filter;
|
||||
RESULT_ROW *row;
|
||||
RESULTSET *set;
|
||||
} SESSIONFILTER;
|
||||
|
||||
bool dcb_iter_cb(DCB *dcb, void *data)
|
||||
{
|
||||
SESSIONFILTER *cbdata = (SESSIONFILTER*)data;
|
||||
|
||||
if (cbdata->current < cbdata->index)
|
||||
{
|
||||
if (cbdata->filter == SESSION_LIST_ALL ||
|
||||
(cbdata->filter == SESSION_LIST_CONNECTION &&
|
||||
(dcb->session->state != SESSION_STATE_LISTENER)))
|
||||
{
|
||||
cbdata->current++;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
char buf[20];
|
||||
MXS_SESSION *list_session = dcb->session;
|
||||
|
||||
cbdata->index++;
|
||||
cbdata->row = resultset_make_row(cbdata->set);
|
||||
snprintf(buf, sizeof(buf), "%p", list_session);
|
||||
resultset_row_set(cbdata->row, 0, buf);
|
||||
resultset_row_set(cbdata->row, 1, ((list_session->client_dcb && list_session->client_dcb->remote)
|
||||
? list_session->client_dcb->remote : ""));
|
||||
resultset_row_set(cbdata->row, 2, (list_session->service && list_session->service->name
|
||||
? list_session->service->name : ""));
|
||||
resultset_row_set(cbdata->row, 3, session_state(list_session->state));
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide a row to the result set that defines the set of sessions
|
||||
*
|
||||
* @param set The result set
|
||||
* @param data The index of the row to send
|
||||
* @return The next row or NULL
|
||||
*/
|
||||
static RESULT_ROW *
|
||||
sessionRowCallback(RESULTSET *set, void *data)
|
||||
{
|
||||
SESSIONFILTER *cbdata = (SESSIONFILTER*)data;
|
||||
RESULT_ROW *row = NULL;
|
||||
|
||||
cbdata->current = 0;
|
||||
dcb_foreach(dcb_iter_cb, cbdata);
|
||||
|
||||
if (cbdata->row)
|
||||
{
|
||||
row = cbdata->row;
|
||||
cbdata->row = NULL;
|
||||
}
|
||||
|
||||
return row;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a resultset that has the current set of sessions in it
|
||||
*
|
||||
* @return A Result set
|
||||
*/
|
||||
/* Lint is not convinced that the new memory for data is always tracked
|
||||
* because it does not see what happens within the resultset_create function,
|
||||
* so we suppress the warning. In fact, the function call results in return
|
||||
* of the set structure which includes a pointer to data
|
||||
*/
|
||||
|
||||
/*lint -e429 */
|
||||
RESULTSET *
|
||||
sessionGetList(SESSIONLISTFILTER filter)
|
||||
{
|
||||
RESULTSET *set;
|
||||
SESSIONFILTER *data;
|
||||
|
||||
if ((data = (SESSIONFILTER *)MXS_MALLOC(sizeof(SESSIONFILTER))) == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
data->index = 0;
|
||||
data->filter = filter;
|
||||
data->current = 0;
|
||||
data->row = NULL;
|
||||
|
||||
if ((set = resultset_create(sessionRowCallback, data)) == NULL)
|
||||
{
|
||||
MXS_FREE(data);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
data->set = set;
|
||||
resultset_add_column(set, "Session", 16, COL_TYPE_VARCHAR);
|
||||
resultset_add_column(set, "Client", 15, COL_TYPE_VARCHAR);
|
||||
resultset_add_column(set, "Service", 15, COL_TYPE_VARCHAR);
|
||||
resultset_add_column(set, "State", 15, COL_TYPE_VARCHAR);
|
||||
|
||||
return set;
|
||||
}
|
||||
/*lint +e429 */
|
||||
|
||||
mxs_session_trx_state_t session_get_trx_state(const MXS_SESSION* ses)
|
||||
{
|
||||
return ses->trx_state;
|
||||
}
|
||||
|
||||
mxs_session_trx_state_t session_set_trx_state(MXS_SESSION* ses, mxs_session_trx_state_t new_state)
|
||||
{
|
||||
mxs_session_trx_state_t prev_state = ses->trx_state;
|
||||
|
||||
ses->trx_state = new_state;
|
||||
|
||||
return prev_state;
|
||||
}
|
||||
|
||||
const char* session_trx_state_to_string(mxs_session_trx_state_t state)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case SESSION_TRX_INACTIVE:
|
||||
return "SESSION_TRX_INACTIVE";
|
||||
case SESSION_TRX_ACTIVE:
|
||||
return "SESSION_TRX_ACTIVE";
|
||||
case SESSION_TRX_READ_ONLY:
|
||||
return "SESSION_TRX_READ_ONLY";
|
||||
case SESSION_TRX_READ_WRITE:
|
||||
return "SESSION_TRX_READ_WRITE";
|
||||
case SESSION_TRX_READ_ONLY_ENDING:
|
||||
return "SESSION_TRX_READ_ONLY_ENDING";
|
||||
case SESSION_TRX_READ_WRITE_ENDING:
|
||||
return "SESSION_TRX_READ_WRITE_ENDING";
|
||||
}
|
||||
|
||||
MXS_ERROR("Unknown session_trx_state_t value: %d", (int)state);
|
||||
return "UNKNOWN";
|
||||
}
|
||||
|
||||
static bool ses_find_id(DCB *dcb, void *data)
|
||||
{
|
||||
void **params = (void**)data;
|
||||
MXS_SESSION **ses = (MXS_SESSION**)params[0];
|
||||
size_t *id = (size_t*)params[1];
|
||||
bool rval = true;
|
||||
|
||||
if (dcb->session->ses_id == *id)
|
||||
{
|
||||
*ses = session_get_ref(dcb->session);
|
||||
rval = false;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
MXS_SESSION* session_get_by_id(int id)
|
||||
{
|
||||
MXS_SESSION *session = NULL;
|
||||
void *params[] = {&session, &id};
|
||||
|
||||
dcb_foreach(ses_find_id, params);
|
||||
|
||||
return session;
|
||||
}
|
||||
|
||||
MXS_SESSION* session_get_ref(MXS_SESSION *session)
|
||||
{
|
||||
atomic_add(&session->refcount, 1);
|
||||
return session;
|
||||
}
|
||||
|
||||
void session_put_ref(MXS_SESSION *session)
|
||||
{
|
||||
if (session && session->state != SESSION_STATE_DUMMY)
|
||||
{
|
||||
/** Remove one reference. If there are no references left, free session */
|
||||
if (atomic_add(&session->refcount, -1) == 1)
|
||||
{
|
||||
session_free(session);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool session_store_stmt(MXS_SESSION *session, GWBUF *buf, const SERVER *server)
|
||||
{
|
||||
bool rval = false;
|
||||
|
||||
if (session->stmt.buffer)
|
||||
{
|
||||
/** This should not happen with proper use */
|
||||
ss_dassert(false);
|
||||
gwbuf_free(session->stmt.buffer);
|
||||
}
|
||||
|
||||
if ((session->stmt.buffer = gwbuf_clone(buf)))
|
||||
{
|
||||
session->stmt.target = server;
|
||||
/** No old statements were stored and we successfully cloned the buffer */
|
||||
rval = true;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
bool session_take_stmt(MXS_SESSION *session, GWBUF **buffer, const SERVER **target)
|
||||
{
|
||||
bool rval = false;
|
||||
|
||||
if (session->stmt.buffer && session->stmt.target)
|
||||
{
|
||||
*buffer = session->stmt.buffer;
|
||||
*target = session->stmt.target;
|
||||
session->stmt.buffer = NULL;
|
||||
session->stmt.target = NULL;
|
||||
rval = true;
|
||||
}
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
void session_clear_stmt(MXS_SESSION *session)
|
||||
{
|
||||
gwbuf_free(session->stmt.buffer);
|
||||
session->stmt.buffer = NULL;
|
||||
session->stmt.target = NULL;
|
||||
}
|
||||
Reference in New Issue
Block a user