From 2e60d5fd714824abdd5ca35fc74e065312f3a319 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 3 Aug 2018 12:55:17 +0300 Subject: [PATCH] MXS-1929: Store filters inside Session The Session now holds a reference to the filters it uses. This makes the use of filters safe even if they are destroyed mid-session. --- include/maxscale/session.h | 13 -- server/core/internal/session.hh | 32 +++++ server/core/session.cc | 224 ++++++++++++++------------------ 3 files changed, 133 insertions(+), 136 deletions(-) diff --git a/include/maxscale/session.h b/include/maxscale/session.h index dfbff5515..f416ae5c2 100644 --- a/include/maxscale/session.h +++ b/include/maxscale/session.h @@ -90,17 +90,6 @@ typedef struct time_t connect; /**< Time when the session was started */ } MXS_SESSION_STATS; -/** - * Structure used to track the filter instances and sessions of the filters - * that are in use within a session. - */ -typedef struct -{ - struct mxs_filter_def *filter; - struct mxs_filter *instance; - struct mxs_filter_session *session; -} SESSION_FILTER; - /** * The downstream element in the filter chain. This may refer to * another filter or to a router. @@ -184,8 +173,6 @@ typedef struct session struct mxs_router_session *router_session; /*< The router instance data */ MXS_SESSION_STATS stats; /*< Session statistics */ struct service *service; /*< The service this session is using */ - int n_filters; /*< Number of filter sessions */ - SESSION_FILTER *filters; /*< The filters in use within this session */ MXS_DOWNSTREAM head; /*< Head of the filter chain */ MXS_UPSTREAM tail; /*< The tail of the filter chain */ int refcount; /*< Reference count on the session */ diff --git a/server/core/internal/session.hh b/server/core/internal/session.hh index a8055736a..36ada4075 100644 --- a/server/core/internal/session.hh +++ b/server/core/internal/session.hh @@ -24,6 +24,9 @@ #include #include +#include "filter.hh" +#include "service.hh" + namespace maxscale { /** @@ -57,9 +60,37 @@ typedef std::unordered_map SessionVarsByName; typedef std::deque> SessionStmtQueue; typedef std::unordered_set DCBSet; +// Class that holds the session specific filter data +class SessionFilter +{ +public: + + SessionFilter(const SFilterDef& f): + filter(f), + instance(nullptr), + session(nullptr) + { + } + + SFilterDef filter; + MXS_FILTER* instance; + MXS_FILTER_SESSION* session; +}; + class Session: public MXS_SESSION { public: + using FilterList = std::vector; + + ~Session(); + + bool setup_filters(Service* service); + + const FilterList& get_filters() const + { + return m_filters; + } + bool add_variable(const char* name, session_variable_handler_t handler, void* context); char* set_variable_value(const char* name_begin, const char* name_end, const char* value_begin, const char* value_end); @@ -85,6 +116,7 @@ public: } private: + FilterList m_filters; SessionVarsByName m_variables; SessionStmtQueue m_last_statements; /*< The N last statements by the client */ DCBSet m_dcb_set; /*< Set of associated backend DCBs */ diff --git a/server/core/session.cc b/server/core/session.cc index db143ad84..9ef4c1fe9 100644 --- a/server/core/session.cc +++ b/server/core/session.cc @@ -244,7 +244,6 @@ session_set_dummy(DCB *client_dcb) session->ses_chk_tail = CHK_NUM_SESSION; session->service = NULL; session->client_dcb = NULL; - session->n_filters = 0; memset(&session->stats, 0, sizeof(MXS_SESSION_STATS)); session->stats.connect = 0; session->state = SESSION_STATE_DUMMY; @@ -350,54 +349,9 @@ private: */ static void session_free(MXS_SESSION *session) { - CHK_SESSION(session); - ss_dassert(session->refcount == 0); - - session->state = SESSION_STATE_TO_BE_FREED; - atomic_add(&session->service->stats.n_current, -1); - - if (session->client_dcb) - { - dcb_free_all_memory(session->client_dcb); - session->client_dcb = NULL; - } - /** - * If session is not child of some other session, free router_session. - * Otherwise let the parent free it. - */ - if (session->router_session) - { - session->service->router->freeSession(session->service->router_instance, - session->router_session); - } - if (session->n_filters) - { - int i; - for (i = 0; i < session->n_filters; i++) - { - if (session->filters[i].filter) - { - FilterDef* filter = static_cast(session->filters[i].filter); - filter->obj->closeSession(session->filters[i].instance, - session->filters[i].session); - } - } - for (i = 0; i < session->n_filters; i++) - { - if (session->filters[i].filter) - { - FilterDef* filter = static_cast(session->filters[i].filter); - filter->obj->freeSession(session->filters[i].instance, - session->filters[i].session); - } - } - MXS_FREE(session->filters); - } - MXS_INFO("Stopped %s client session [%" PRIu64 "]", session->service->name, session->ses_id); Service* service = static_cast(session->service); - session->state = SESSION_STATE_FREE; session_final_free(session); bool should_destroy = !atomic_load_int(&service->active); @@ -409,14 +363,28 @@ static void session_free(MXS_SESSION *session) } } -static void -session_final_free(MXS_SESSION *session) +static void session_final_free(MXS_SESSION *session) { + CHK_SESSION(session); + ss_dassert(session->refcount == 0); + + session->state = SESSION_STATE_TO_BE_FREED; + + atomic_add(&session->service->stats.n_current, -1); + + if (session->client_dcb) + { + dcb_free_all_memory(session->client_dcb); + session->client_dcb = NULL; + } + if (dump_statements == SESSION_DUMP_STATEMENTS_ON_CLOSE) { session_dump_statements(session); } + session->state = SESSION_STATE_FREE; + delete session; } @@ -537,15 +505,12 @@ dprintSession(DCB *dcb, MXS_SESSION *print_session) } - if (print_session->n_filters) + Session* session = static_cast(print_session); + + for (const auto& f : session->get_filters()) { - for (i = 0; i < print_session->n_filters; i++) - { - FilterDef* filter = static_cast(print_session->filters[i].filter); - dcb_printf(dcb, "\tFilter: %s\n", filter->name.c_str()); - filter->obj->diagnostics(print_session->filters[i].instance, - print_session->filters[i].session, dcb); - } + dcb_printf(dcb, "\tFilter: %s\n", f.filter->name.c_str()); + f.filter->obj->diagnostics(f.instance, f.session, dcb); } } @@ -631,66 +596,11 @@ session_state(mxs_session_state_t state) * @param session The session that requires the chain * @return 0 if filter creation fails */ -static int -session_setup_filters(MXS_SESSION *session) +static int session_setup_filters(MXS_SESSION *ses) { - Service* service = static_cast(session->service); - - auto filters = service->get_filters(); - - if (filters.empty()) - { - return 1; - } - - if ((session->filters = (SESSION_FILTER*)MXS_CALLOC(filters.size(), sizeof(SESSION_FILTER))) == NULL) - { - return 0; - } - - session->n_filters = filters.size(); - - for (ssize_t i = (ssize_t)filters.size() - 1; i >= 0; i--) - { - MXS_DOWNSTREAM* head = filter_apply(filters[i], session, &session->head); - - if (head == NULL) - { - MXS_ERROR("Failed to create filter '%s' for service '%s'.\n", - filter_def_get_name(filters[i].get()), service->name); - return 0; - } - - session->filters[i].filter = filters[i].get(); - session->filters[i].session = head->session; - session->filters[i].instance = head->instance; - session->head = *head; - MXS_FREE(head); - } - - for (size_t i = 0; i < filters.size(); i++) - { - MXS_UPSTREAM* tail = filter_upstream(filters[i], session->filters[i].session, &session->tail); - - if (tail == NULL) - { - MXS_ERROR("Failed to create filter '%s' for service '%s'.", - filter_def_get_name(filters[i].get()), service->name); - return 0; - } - - /** - * filter_upstream may simply return the 3 parameter if the filter has no - * upstream entry point. So no need to copy the contents or free tail in this case. - */ - if (tail != &session->tail) - { - session->tail = *tail; - MXS_FREE(tail); - } - } - - return 1; + Service* service = static_cast(ses->service); + Session* session = static_cast(ses); + return session->setup_filters(service); } /** @@ -903,7 +813,7 @@ uint64_t session_get_next_id() return atomic_add_uint64(&next_session_id, 1); } -json_t* session_json_data(const MXS_SESSION *session, const char *host) +json_t* session_json_data(const Session* session, const char *host) { json_t* data = json_object(); @@ -924,13 +834,15 @@ json_t* session_json_data(const MXS_SESSION *session, const char *host) json_object_set_new(rel, CN_SERVICES, services); /** Filter relationships (one-to-many) */ - if (session->n_filters) + auto filter_list = session->get_filters(); + + if (!filter_list.empty()) { json_t* filters = mxs_json_relationship(host, MXS_JSON_API_FILTERS); - for (int i = 0; i < session->n_filters; i++) + for (const auto& f : filter_list) { - mxs_json_add_relation(filters, filter_def_get_name(session->filters[i].filter), CN_FILTERS); + mxs_json_add_relation(filters, f.filter->name.c_str(), CN_FILTERS); } json_object_set_new(rel, CN_FILTERS, filters); } @@ -967,10 +879,11 @@ json_t* session_json_data(const MXS_SESSION *session, const char *host) } json_t* dcb_arr = json_array(); + const Session* pSession = static_cast(session); - for (auto it = session->dcb_set->begin(); it != session->dcb_set->end(); it++) + for (auto d : pSession->dcb_set()) { - json_array_append_new(dcb_arr, dcb_to_json(*it)); + json_array_append_new(dcb_arr, dcb_to_json(d)); } json_object_set_new(attr, "connections", dcb_arr); @@ -985,7 +898,8 @@ json_t* session_to_json(const MXS_SESSION *session, const char *host) { stringstream ss; ss << MXS_JSON_API_SESSIONS << session->ses_id; - return mxs_json_resource(host, ss.str().c_str(), session_json_data(session, host)); + const Session* s = static_cast(session); + return mxs_json_resource(host, ss.str().c_str(), session_json_data(s, host)); } struct SessionListData @@ -999,7 +913,8 @@ bool seslist_cb(DCB* dcb, void* data) if (dcb->dcb_role == DCB_ROLE_CLIENT_HANDLER) { SessionListData* d = (SessionListData*)data; - json_array_append_new(d->json, session_json_data(dcb->session, d->host)); + Session* session = static_cast(dcb->session); + json_array_append_new(d->json, session_json_data(session, d->host)); } return true; @@ -1249,6 +1164,20 @@ const char* session_get_close_reason(const MXS_SESSION* session) } } +Session::~Session() +{ + if (router_session) + { + service->router->freeSession(service->router_instance, router_session); + } + + for (auto& f : m_filters) + { + f.filter->obj->closeSession(f.instance, f.session); + f.filter->obj->freeSession(f.instance, f.session); + } +} + void Session::dump_statements() const { if (retain_last_statements) @@ -1286,6 +1215,55 @@ void Session::dump_statements() const } } +bool Session::setup_filters(Service* service) +{ + for (const auto& a : service->get_filters()) + { + m_filters.emplace_back(a); + } + + for (auto it = m_filters.rbegin(); it != m_filters.rend(); it++) + { + MXS_DOWNSTREAM* my_head = filter_apply(it->filter, this, &head); + + if (my_head == NULL) + { + MXS_ERROR("Failed to create filter '%s' for service '%s'.\n", + filter_def_get_name(it->filter.get()), service->name); + return false; + } + + it->session = my_head->session; + it->instance = my_head->instance; + head = *my_head; + MXS_FREE(my_head); + } + + for (auto it = m_filters.begin(); it != m_filters.end(); it++) + { + MXS_UPSTREAM* my_tail = filter_upstream(it->filter, it->session, &tail); + + if (my_tail == NULL) + { + MXS_ERROR("Failed to create filter '%s' for service '%s'.", + filter_def_get_name(it->filter.get()), service->name); + return false; + } + + /** + * filter_upstream may simply return the 3 parameters if the filter has no + * upstream entry point. So no need to copy the contents or free tail in this case. + */ + if (my_tail != &tail) + { + tail = *my_tail; + MXS_FREE(my_tail); + } + } + + return true; +} + bool Session::add_variable(const char* name, session_variable_handler_t handler, void* context) { bool added = false;