MXS-1929: Store filters inside Session

The Session now holds a reference to the filters it uses. This makes the
use of filters safe even if they are destroyed mid-session.
This commit is contained in:
Markus Mäkelä
2018-08-03 12:55:17 +03:00
parent 7e64d6b3df
commit 2e60d5fd71
3 changed files with 133 additions and 136 deletions

View File

@ -90,17 +90,6 @@ typedef struct
time_t connect; /**< Time when the session was started */ time_t connect; /**< Time when the session was started */
} MXS_SESSION_STATS; } MXS_SESSION_STATS;
/**
* Structure used to track the filter instances and sessions of the filters
* that are in use within a session.
*/
typedef struct
{
struct mxs_filter_def *filter;
struct mxs_filter *instance;
struct mxs_filter_session *session;
} SESSION_FILTER;
/** /**
* The downstream element in the filter chain. This may refer to * The downstream element in the filter chain. This may refer to
* another filter or to a router. * another filter or to a router.
@ -184,8 +173,6 @@ typedef struct session
struct mxs_router_session *router_session; /*< The router instance data */ struct mxs_router_session *router_session; /*< The router instance data */
MXS_SESSION_STATS stats; /*< Session statistics */ MXS_SESSION_STATS stats; /*< Session statistics */
struct service *service; /*< The service this session is using */ struct service *service; /*< The service this session is using */
int n_filters; /*< Number of filter sessions */
SESSION_FILTER *filters; /*< The filters in use within this session */
MXS_DOWNSTREAM head; /*< Head of the filter chain */ MXS_DOWNSTREAM head; /*< Head of the filter chain */
MXS_UPSTREAM tail; /*< The tail of the filter chain */ MXS_UPSTREAM tail; /*< The tail of the filter chain */
int refcount; /*< Reference count on the session */ int refcount; /*< Reference count on the session */

View File

@ -24,6 +24,9 @@
#include <maxscale/resultset.hh> #include <maxscale/resultset.hh>
#include <maxscale/utils.hh> #include <maxscale/utils.hh>
#include "filter.hh"
#include "service.hh"
namespace maxscale namespace maxscale
{ {
/** /**
@ -57,9 +60,37 @@ typedef std::unordered_map<std::string, SESSION_VARIABLE> SessionVarsByName;
typedef std::deque<std::vector<uint8_t>> SessionStmtQueue; typedef std::deque<std::vector<uint8_t>> SessionStmtQueue;
typedef std::unordered_set<DCB*> DCBSet; typedef std::unordered_set<DCB*> DCBSet;
// Class that holds the session specific filter data
class SessionFilter
{
public:
SessionFilter(const SFilterDef& f):
filter(f),
instance(nullptr),
session(nullptr)
{
}
SFilterDef filter;
MXS_FILTER* instance;
MXS_FILTER_SESSION* session;
};
class Session: public MXS_SESSION class Session: public MXS_SESSION
{ {
public: public:
using FilterList = std::vector<SessionFilter>;
~Session();
bool setup_filters(Service* service);
const FilterList& get_filters() const
{
return m_filters;
}
bool add_variable(const char* name, session_variable_handler_t handler, void* context); bool add_variable(const char* name, session_variable_handler_t handler, void* context);
char* set_variable_value(const char* name_begin, const char* name_end, char* set_variable_value(const char* name_begin, const char* name_end,
const char* value_begin, const char* value_end); const char* value_begin, const char* value_end);
@ -85,6 +116,7 @@ public:
} }
private: private:
FilterList m_filters;
SessionVarsByName m_variables; SessionVarsByName m_variables;
SessionStmtQueue m_last_statements; /*< The N last statements by the client */ SessionStmtQueue m_last_statements; /*< The N last statements by the client */
DCBSet m_dcb_set; /*< Set of associated backend DCBs */ DCBSet m_dcb_set; /*< Set of associated backend DCBs */

View File

@ -244,7 +244,6 @@ session_set_dummy(DCB *client_dcb)
session->ses_chk_tail = CHK_NUM_SESSION; session->ses_chk_tail = CHK_NUM_SESSION;
session->service = NULL; session->service = NULL;
session->client_dcb = NULL; session->client_dcb = NULL;
session->n_filters = 0;
memset(&session->stats, 0, sizeof(MXS_SESSION_STATS)); memset(&session->stats, 0, sizeof(MXS_SESSION_STATS));
session->stats.connect = 0; session->stats.connect = 0;
session->state = SESSION_STATE_DUMMY; session->state = SESSION_STATE_DUMMY;
@ -350,54 +349,9 @@ private:
*/ */
static void session_free(MXS_SESSION *session) static void session_free(MXS_SESSION *session)
{ {
CHK_SESSION(session);
ss_dassert(session->refcount == 0);
session->state = SESSION_STATE_TO_BE_FREED;
atomic_add(&session->service->stats.n_current, -1);
if (session->client_dcb)
{
dcb_free_all_memory(session->client_dcb);
session->client_dcb = NULL;
}
/**
* If session is not child of some other session, free router_session.
* Otherwise let the parent free it.
*/
if (session->router_session)
{
session->service->router->freeSession(session->service->router_instance,
session->router_session);
}
if (session->n_filters)
{
int i;
for (i = 0; i < session->n_filters; i++)
{
if (session->filters[i].filter)
{
FilterDef* filter = static_cast<FilterDef*>(session->filters[i].filter);
filter->obj->closeSession(session->filters[i].instance,
session->filters[i].session);
}
}
for (i = 0; i < session->n_filters; i++)
{
if (session->filters[i].filter)
{
FilterDef* filter = static_cast<FilterDef*>(session->filters[i].filter);
filter->obj->freeSession(session->filters[i].instance,
session->filters[i].session);
}
}
MXS_FREE(session->filters);
}
MXS_INFO("Stopped %s client session [%" PRIu64 "]", session->service->name, session->ses_id); MXS_INFO("Stopped %s client session [%" PRIu64 "]", session->service->name, session->ses_id);
Service* service = static_cast<Service*>(session->service); Service* service = static_cast<Service*>(session->service);
session->state = SESSION_STATE_FREE;
session_final_free(session); session_final_free(session);
bool should_destroy = !atomic_load_int(&service->active); bool should_destroy = !atomic_load_int(&service->active);
@ -409,14 +363,28 @@ static void session_free(MXS_SESSION *session)
} }
} }
static void static void session_final_free(MXS_SESSION *session)
session_final_free(MXS_SESSION *session)
{ {
CHK_SESSION(session);
ss_dassert(session->refcount == 0);
session->state = SESSION_STATE_TO_BE_FREED;
atomic_add(&session->service->stats.n_current, -1);
if (session->client_dcb)
{
dcb_free_all_memory(session->client_dcb);
session->client_dcb = NULL;
}
if (dump_statements == SESSION_DUMP_STATEMENTS_ON_CLOSE) if (dump_statements == SESSION_DUMP_STATEMENTS_ON_CLOSE)
{ {
session_dump_statements(session); session_dump_statements(session);
} }
session->state = SESSION_STATE_FREE;
delete session; delete session;
} }
@ -537,15 +505,12 @@ dprintSession(DCB *dcb, MXS_SESSION *print_session)
} }
if (print_session->n_filters) Session* session = static_cast<Session*>(print_session);
for (const auto& f : session->get_filters())
{ {
for (i = 0; i < print_session->n_filters; i++) dcb_printf(dcb, "\tFilter: %s\n", f.filter->name.c_str());
{ f.filter->obj->diagnostics(f.instance, f.session, dcb);
FilterDef* filter = static_cast<FilterDef*>(print_session->filters[i].filter);
dcb_printf(dcb, "\tFilter: %s\n", filter->name.c_str());
filter->obj->diagnostics(print_session->filters[i].instance,
print_session->filters[i].session, dcb);
}
} }
} }
@ -631,66 +596,11 @@ session_state(mxs_session_state_t state)
* @param session The session that requires the chain * @param session The session that requires the chain
* @return 0 if filter creation fails * @return 0 if filter creation fails
*/ */
static int static int session_setup_filters(MXS_SESSION *ses)
session_setup_filters(MXS_SESSION *session)
{ {
Service* service = static_cast<Service*>(session->service); Service* service = static_cast<Service*>(ses->service);
Session* session = static_cast<Session*>(ses);
auto filters = service->get_filters(); return session->setup_filters(service);
if (filters.empty())
{
return 1;
}
if ((session->filters = (SESSION_FILTER*)MXS_CALLOC(filters.size(), sizeof(SESSION_FILTER))) == NULL)
{
return 0;
}
session->n_filters = filters.size();
for (ssize_t i = (ssize_t)filters.size() - 1; i >= 0; i--)
{
MXS_DOWNSTREAM* head = filter_apply(filters[i], session, &session->head);
if (head == NULL)
{
MXS_ERROR("Failed to create filter '%s' for service '%s'.\n",
filter_def_get_name(filters[i].get()), service->name);
return 0;
}
session->filters[i].filter = filters[i].get();
session->filters[i].session = head->session;
session->filters[i].instance = head->instance;
session->head = *head;
MXS_FREE(head);
}
for (size_t i = 0; i < filters.size(); i++)
{
MXS_UPSTREAM* tail = filter_upstream(filters[i], session->filters[i].session, &session->tail);
if (tail == NULL)
{
MXS_ERROR("Failed to create filter '%s' for service '%s'.",
filter_def_get_name(filters[i].get()), service->name);
return 0;
}
/**
* filter_upstream may simply return the 3 parameter if the filter has no
* upstream entry point. So no need to copy the contents or free tail in this case.
*/
if (tail != &session->tail)
{
session->tail = *tail;
MXS_FREE(tail);
}
}
return 1;
} }
/** /**
@ -903,7 +813,7 @@ uint64_t session_get_next_id()
return atomic_add_uint64(&next_session_id, 1); return atomic_add_uint64(&next_session_id, 1);
} }
json_t* session_json_data(const MXS_SESSION *session, const char *host) json_t* session_json_data(const Session* session, const char *host)
{ {
json_t* data = json_object(); json_t* data = json_object();
@ -924,13 +834,15 @@ json_t* session_json_data(const MXS_SESSION *session, const char *host)
json_object_set_new(rel, CN_SERVICES, services); json_object_set_new(rel, CN_SERVICES, services);
/** Filter relationships (one-to-many) */ /** Filter relationships (one-to-many) */
if (session->n_filters) auto filter_list = session->get_filters();
if (!filter_list.empty())
{ {
json_t* filters = mxs_json_relationship(host, MXS_JSON_API_FILTERS); json_t* filters = mxs_json_relationship(host, MXS_JSON_API_FILTERS);
for (int i = 0; i < session->n_filters; i++) for (const auto& f : filter_list)
{ {
mxs_json_add_relation(filters, filter_def_get_name(session->filters[i].filter), CN_FILTERS); mxs_json_add_relation(filters, f.filter->name.c_str(), CN_FILTERS);
} }
json_object_set_new(rel, CN_FILTERS, filters); json_object_set_new(rel, CN_FILTERS, filters);
} }
@ -967,10 +879,11 @@ json_t* session_json_data(const MXS_SESSION *session, const char *host)
} }
json_t* dcb_arr = json_array(); json_t* dcb_arr = json_array();
const Session* pSession = static_cast<const Session*>(session);
for (auto it = session->dcb_set->begin(); it != session->dcb_set->end(); it++) for (auto d : pSession->dcb_set())
{ {
json_array_append_new(dcb_arr, dcb_to_json(*it)); json_array_append_new(dcb_arr, dcb_to_json(d));
} }
json_object_set_new(attr, "connections", dcb_arr); json_object_set_new(attr, "connections", dcb_arr);
@ -985,7 +898,8 @@ json_t* session_to_json(const MXS_SESSION *session, const char *host)
{ {
stringstream ss; stringstream ss;
ss << MXS_JSON_API_SESSIONS << session->ses_id; ss << MXS_JSON_API_SESSIONS << session->ses_id;
return mxs_json_resource(host, ss.str().c_str(), session_json_data(session, host)); const Session* s = static_cast<const Session*>(session);
return mxs_json_resource(host, ss.str().c_str(), session_json_data(s, host));
} }
struct SessionListData struct SessionListData
@ -999,7 +913,8 @@ bool seslist_cb(DCB* dcb, void* data)
if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER) if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER)
{ {
SessionListData* d = (SessionListData*)data; SessionListData* d = (SessionListData*)data;
json_array_append_new(d->json, session_json_data(dcb->session, d->host)); Session* session = static_cast<Session*>(dcb->session);
json_array_append_new(d->json, session_json_data(session, d->host));
} }
return true; return true;
@ -1249,6 +1164,20 @@ const char* session_get_close_reason(const MXS_SESSION* session)
} }
} }
Session::~Session()
{
if (router_session)
{
service->router->freeSession(service->router_instance, router_session);
}
for (auto& f : m_filters)
{
f.filter->obj->closeSession(f.instance, f.session);
f.filter->obj->freeSession(f.instance, f.session);
}
}
void Session::dump_statements() const void Session::dump_statements() const
{ {
if (retain_last_statements) if (retain_last_statements)
@ -1286,6 +1215,55 @@ void Session::dump_statements() const
} }
} }
bool Session::setup_filters(Service* service)
{
for (const auto& a : service->get_filters())
{
m_filters.emplace_back(a);
}
for (auto it = m_filters.rbegin(); it != m_filters.rend(); it++)
{
MXS_DOWNSTREAM* my_head = filter_apply(it->filter, this, &head);
if (my_head == NULL)
{
MXS_ERROR("Failed to create filter '%s' for service '%s'.\n",
filter_def_get_name(it->filter.get()), service->name);
return false;
}
it->session = my_head->session;
it->instance = my_head->instance;
head = *my_head;
MXS_FREE(my_head);
}
for (auto it = m_filters.begin(); it != m_filters.end(); it++)
{
MXS_UPSTREAM* my_tail = filter_upstream(it->filter, it->session, &tail);
if (my_tail == NULL)
{
MXS_ERROR("Failed to create filter '%s' for service '%s'.",
filter_def_get_name(it->filter.get()), service->name);
return false;
}
/**
* filter_upstream may simply return the 3 parameters if the filter has no
* upstream entry point. So no need to copy the contents or free tail in this case.
*/
if (my_tail != &tail)
{
tail = *my_tail;
MXS_FREE(my_tail);
}
}
return true;
}
bool Session::add_variable(const char* name, session_variable_handler_t handler, void* context) bool Session::add_variable(const char* name, session_variable_handler_t handler, void* context)
{ {
bool added = false; bool added = false;