From 7fab3f5d1d26caeb8101cbe38e6d4d4d4cf4f25a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Tue, 23 May 2017 13:51:29 +0300 Subject: [PATCH] MXS-1267: Use local client in tee The tee filter now uses the local client class to clone the queries. This imposes some restrictions on how the filter can be used but also makes MaxScale as a whole more robust. --- Documentation/Filters/Tee-Filter.md | 7 +- server/modules/filter/tee/CMakeLists.txt | 4 +- server/modules/filter/tee/local_client.cc | 14 +- server/modules/filter/tee/local_client.hh | 4 +- server/modules/filter/tee/tee.cc | 1198 ++++++--------------- 5 files changed, 365 insertions(+), 862 deletions(-) diff --git a/Documentation/Filters/Tee-Filter.md b/Documentation/Filters/Tee-Filter.md index 8e1155351..72e7f4424 100644 --- a/Documentation/Filters/Tee-Filter.md +++ b/Documentation/Filters/Tee-Filter.md @@ -6,12 +6,15 @@ The tee filter is a "plumbing" fitting in the MariaDB MaxScale filter toolkit. It can be used in a filter pipeline of a service to make copies of requests from the client and send the copies to another service within MariaDB MaxScale. +**Please Note:** Starting with MaxScale 2.2.0, any client that connects to a + service which uses a tee filter will require a grant for the loopback address, + i.e. `127.0.0.1`. + ## Configuration The configuration block for the TEE filter requires the minimal filter parameters in its section within the MaxScale configuration file. The service to -send the duplicates to must be defined. Currently the tee filter does not -support multi-statements. +send the duplicates to must be defined. ``` [DataMartFilter] diff --git a/server/modules/filter/tee/CMakeLists.txt b/server/modules/filter/tee/CMakeLists.txt index 22bb43c08..d0ee80e21 100644 --- a/server/modules/filter/tee/CMakeLists.txt +++ b/server/modules/filter/tee/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(tee SHARED tee.cc) -target_link_libraries(tee maxscale-common) +add_library(tee SHARED tee.cc local_client.cc) +target_link_libraries(tee maxscale-common MySQLCommon) set_target_properties(tee PROPERTIES VERSION "1.0.0") install_module(tee core) diff --git a/server/modules/filter/tee/local_client.cc b/server/modules/filter/tee/local_client.cc index fb1926db6..517505a4b 100644 --- a/server/modules/filter/tee/local_client.cc +++ b/server/modules/filter/tee/local_client.cc @@ -34,10 +34,10 @@ LocalClient::LocalClient(MXS_SESSION* session, int fd): { MXS_POLL_DATA::handler = LocalClient::poll_handler; MySQLProtocol* client = (MySQLProtocol*)m_session->client_dcb->protocol; - m_proto = {}; - m_proto.charset = client->charset; - m_proto.client_capabilities = client->client_capabilities; - m_proto.extra_capabilities = client->extra_capabilities; + m_protocol = {}; + m_protocol.charset = client->charset; + m_protocol.client_capabilities = client->client_capabilities; + m_protocol.extra_capabilities = client->extra_capabilities; } LocalClient::~LocalClient() @@ -48,7 +48,7 @@ LocalClient::~LocalClient() } } -bool LocalClient::query(GWBUF* buffer) +bool LocalClient::queue_query(GWBUF* buffer) { GWBUF* my_buf = gwbuf_clone(buffer); @@ -82,9 +82,9 @@ void LocalClient::process(uint32_t events) { if (m_state == VC_WAITING_HANDSHAKE) { - if (gw_decode_mysql_server_handshake(&m_proto, GWBUF_DATA(buf) + MYSQL_HEADER_LEN) == 0) + if (gw_decode_mysql_server_handshake(&m_protocol, GWBUF_DATA(buf) + MYSQL_HEADER_LEN) == 0) { - GWBUF* response = gw_generate_auth_response(m_session, &m_proto, false, false); + GWBUF* response = gw_generate_auth_response(m_session, &m_protocol, false, false); m_queue.push_front(response); m_state = VC_RESPONSE_SENT; } diff --git a/server/modules/filter/tee/local_client.hh b/server/modules/filter/tee/local_client.hh index 6a3b21ea4..a9eb060f8 100644 --- a/server/modules/filter/tee/local_client.hh +++ b/server/modules/filter/tee/local_client.hh @@ -46,7 +46,7 @@ public: * * @return True if query was successfully queued */ - bool query(GWBUF* buffer); + bool queue_query(GWBUF* buffer); private: LocalClient(MXS_SESSION* session, int fd); @@ -71,5 +71,5 @@ private: size_t m_expected_bytes; std::deque m_queue; MXS_SESSION* m_session; - MySQLProtocol m_proto; + MySQLProtocol m_protocol; }; diff --git a/server/modules/filter/tee/tee.cc b/server/modules/filter/tee/tee.cc index 0c05a1659..391f5dc96 100644 --- a/server/modules/filter/tee/tee.cc +++ b/server/modules/filter/tee/tee.cc @@ -17,72 +17,19 @@ #define MXS_MODULE_NAME "tee" -#include -#include +#include + +#include +#include +#include + #include #include -#include #include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include #include -#define MYSQL_COM_QUIT 0x01 -#define MYSQL_COM_INITDB 0x02 -#define MYSQL_COM_FIELD_LIST 0x04 -#define MYSQL_COM_CHANGE_USER 0x11 -#define MYSQL_COM_STMT_PREPARE 0x16 -#define MYSQL_COM_STMT_EXECUTE 0x17 -#define MYSQL_COM_STMT_SEND_LONG_DATA 0x18 -#define MYSQL_COM_STMT_CLOSE 0x19 -#define MYSQL_COM_STMT_RESET 0x1a -#define MYSQL_COM_CONNECT 0x1b - -#define REPLY_TIMEOUT_SECOND 5 -#define REPLY_TIMEOUT_MILLISECOND 1 -#define PARENT 0 -#define CHILD 1 - -#ifdef SS_DEBUG -static int debug_seq = 0; -#endif - -static unsigned char required_packets[] = -{ - MYSQL_COM_QUIT, - MYSQL_COM_INITDB, - MYSQL_COM_CHANGE_USER, - MYSQL_COM_STMT_PREPARE, - MYSQL_COM_STMT_EXECUTE, - MYSQL_COM_STMT_SEND_LONG_DATA, - MYSQL_COM_STMT_CLOSE, - MYSQL_COM_STMT_RESET, - MYSQL_COM_CONNECT, - 0 -}; - -/* - * The filter entry points - */ -static MXS_FILTER *createInstance(const char* name, char **options, MXS_CONFIG_PARAMETER *); -static MXS_FILTER_SESSION *newSession(MXS_FILTER *instance, MXS_SESSION *session); -static void closeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session); -static void freeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session); -static void setDownstream(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, MXS_DOWNSTREAM *downstream); -static void setUpstream(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, MXS_UPSTREAM *upstream); -static int routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, GWBUF *queue); -static int clientReply(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, GWBUF *queue); -static void diagnostic(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, DCB *dcb); -static json_t* diagnostic_json(const MXS_FILTER *instance, const MXS_FILTER_SESSION *fsession); -static uint64_t getCapabilities(MXS_FILTER* instance); +#include "local_client.hh" /** * The instance structure for the TEE filter - this holds the configuration @@ -92,7 +39,7 @@ typedef struct { SERVICE *service; /* The service to duplicate requests to */ char *source; /* The source of the client connection */ - char *userName; /* The user name to filter on */ + char *user; /* The user name to filter on */ char *match; /* Optional text to match against */ regex_t re; /* Compiled regex text */ char *nomatch; /* Optional text to match against for exclusion */ @@ -109,153 +56,12 @@ typedef struct */ typedef struct { - MXS_DOWNSTREAM down; /* The downstream filter */ - MXS_UPSTREAM up; /* The upstream filter */ - int active; /* filter is active? */ - bool use_ok; - int client_multistatement; - bool multipacket[2]; - unsigned char command; - bool waiting[2]; /* if the client is waiting for a reply */ - int eof[2]; - int replies[2]; /* Number of queries received */ - int reply_packets[2]; /* Number of OK, ERR, LOCAL_INFILE_REQUEST or RESULT_SET packets received */ - DCB *branch_dcb; /* Client DCB for "branch" service */ - MXS_SESSION *branch_session; /* The branch service session */ - TEE_INSTANCE *instance; - int n_duped; /* Number of duplicated queries */ - int n_rejected; /* Number of rejected queries */ - int residual; /* Any outstanding SQL text */ - GWBUF* tee_replybuf; /* Buffer for reply */ - GWBUF* tee_partials[2]; - GWBUF* queue; - SPINLOCK tee_lock; - DCB* client_dcb; - -#ifdef SS_DEBUG - long d_id; -#endif + MXS_DOWNSTREAM down; /**< The downstream filter */ + MXS_UPSTREAM up; /**< The upstream filter */ + bool passive; /**< Whether to clone queries */ + LocalClient* client; /**< The client connection to the local service */ } TEE_SESSION; -typedef struct orphan_session_tt -{ - MXS_SESSION* session; /*< The child branch session whose parent was freed before - * the child session was in a suitable state. */ - struct orphan_session_tt* next; -} orphan_session_t; - -#ifdef SS_DEBUG -static SPINLOCK debug_lock; -static long debug_id = 0; -#endif - -static orphan_session_t* allOrphans = NULL; - -static SPINLOCK orphanLock; -static int packet_is_required(GWBUF *queue); -static int detect_loops(TEE_INSTANCE *instance, HASHTABLE* ht, SERVICE* session); -int internal_route(DCB* dcb); -GWBUF* clone_query(TEE_INSTANCE* my_instance, TEE_SESSION* my_session, GWBUF* buffer); -int route_single_query(TEE_INSTANCE* my_instance, - TEE_SESSION* my_session, - GWBUF* buffer, - GWBUF* clone); -int reset_session_state(TEE_SESSION* my_session, GWBUF* buffer); -void create_orphan(MXS_SESSION* ses); - -static void -orphan_free(void* data) -{ - spinlock_acquire(&orphanLock); - orphan_session_t *ptr = allOrphans, *finished = NULL, *tmp = NULL; -#ifdef SS_DEBUG - int o_stopping = 0, o_ready = 0, o_freed = 0; -#endif - while (ptr) - { - if (ptr->session->state == SESSION_STATE_TO_BE_FREED) - { - if (ptr == allOrphans) - { - tmp = ptr; - allOrphans = ptr->next; - } - else - { - tmp = allOrphans; - while (tmp && tmp->next != ptr) - { - tmp = tmp->next; - } - if (tmp) - { - tmp->next = ptr->next; - tmp = ptr; - } - } - } - - /* - * The session has been unlinked from all the DCBs and it is ready to be freed. - */ - - if (ptr->session->state == SESSION_STATE_STOPPING && - ptr->session->refcount == 0 && ptr->session->client_dcb == NULL) - { - ptr->session->state = SESSION_STATE_TO_BE_FREED; - } -#ifdef SS_DEBUG - else if (ptr->session->state == SESSION_STATE_STOPPING) - { - o_stopping++; - } - else if (ptr->session->state == SESSION_STATE_ROUTER_READY) - { - o_ready++; - } -#endif - ptr = ptr->next; - if (tmp) - { - tmp->next = finished; - finished = tmp; - tmp = NULL; - } - } - - spinlock_release(&orphanLock); - -#ifdef SS_DEBUG - if (o_stopping + o_ready > 0) - { - MXS_DEBUG("%d orphans in " - "SESSION_STATE_STOPPING, %d orphans in " - "SESSION_STATE_ROUTER_READY. ", o_stopping, o_ready); - } -#endif - - while (finished) - { -#ifdef SS_DEBUG - o_freed++; -#endif - tmp = finished; - finished = finished->next; - - tmp->session->service->router->freeSession( - tmp->session->service->router_instance, - tmp->session->router_session); - - tmp->session->state = SESSION_STATE_FREE; - MXS_FREE(tmp->session); - MXS_FREE(tmp); - } - -#ifdef SS_DEBUG - MXS_DEBUG("%d orphans freed.", o_freed); -#endif -} - static const MXS_ENUM_VALUE option_values[] = { {"ignorecase", REG_ICASE}, @@ -264,6 +70,342 @@ static const MXS_ENUM_VALUE option_values[] = {NULL} }; +bool recursive_tee_usage(std::set& services, SERVICE* service); + +/** + * Create an instance of the filter for a particular service + * within MaxScale. + * + * @param name The name of the instance (as defined in the config file). + * @param options The options for this filter + * @param params The array of name/value pair parameters for the filter + * + * @return The instance data for this new instance + */ +static MXS_FILTER * +createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *params) +{ + TEE_INSTANCE *my_instance = (TEE_INSTANCE*)MXS_CALLOC(1, sizeof(TEE_INSTANCE)); + + if (my_instance) + { + my_instance->service = config_get_service(params, "service"); + my_instance->source = config_copy_string(params, "source"); + my_instance->user = config_copy_string(params, "user"); + my_instance->match = config_copy_string(params, "match"); + my_instance->nomatch = config_copy_string(params, "exclude"); + + int cflags = config_get_enum(params, "options", option_values); + + if (my_instance->match && regcomp(&my_instance->re, my_instance->match, cflags)) + { + MXS_ERROR("Invalid regular expression '%s' for the match parameter.", + my_instance->match); + MXS_FREE(my_instance->match); + MXS_FREE(my_instance->nomatch); + MXS_FREE(my_instance->source); + MXS_FREE(my_instance->user); + MXS_FREE(my_instance); + return NULL; + } + + if (my_instance->nomatch && regcomp(&my_instance->nore, my_instance->nomatch, cflags)) + { + MXS_ERROR("Invalid regular expression '%s' for the nomatch paramter.", + my_instance->nomatch); + if (my_instance->match) + { + regfree(&my_instance->re); + MXS_FREE(my_instance->match); + } + MXS_FREE(my_instance->nomatch); + MXS_FREE(my_instance->source); + MXS_FREE(my_instance->user); + MXS_FREE(my_instance); + return NULL; + } + } + + return (MXS_FILTER*) my_instance; +} + +/** + * Create a filter new session + * + * @param instance The filter instance data + * @param session The session itself + * + * @return Session specific data for this session + */ +static MXS_FILTER_SESSION* newSession(MXS_FILTER *instance, MXS_SESSION *session) +{ + std::set services; + + if (recursive_tee_usage(services, session->service)) + { + MXS_ERROR("%s: Recursive use of tee filter in service.", + session->service->name); + return NULL; + } + + TEE_SESSION* my_session = new (std::nothrow) TEE_SESSION; + + if (my_session) + { + TEE_INSTANCE *my_instance = (TEE_INSTANCE *) instance; + const char* remote = session_get_remote(session); + const char* user = session_get_user(session); + + if ((my_instance->source && remote && strcmp(remote, my_instance->source) != 0) || + (my_instance->user && user && strcmp(user, my_instance->user) != 0)) + { + my_session->passive = true; + my_session->client = NULL; + } + else + { + my_session->client = LocalClient::create(session, my_instance->service); + my_session->passive = false; + + if (my_session->client == NULL) + { + delete my_session; + my_session = NULL; + } + } + } + + return reinterpret_cast(my_session); +} + +/** + * Close the filter session + * + * @param instance The filter instance data + * @param session The session being closed + */ +static void closeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session) +{ +} + +/** + * Free the memory associated with the session + * + * @param instance The filter instance + * @param session The filter session + */ +static void freeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session) +{ + TEE_SESSION *my_session = reinterpret_cast(session); + delete my_session->client; + delete my_session; +} + +/** + * Set the downstream filter or router to which queries will be + * passed from this filter. + * + * @param instance The filter instance data + * @param session The filter session + * @param downstream The downstream filter or router. + */ +static void setDownstream(MXS_FILTER *instance, MXS_FILTER_SESSION *session, MXS_DOWNSTREAM *downstream) +{ + TEE_SESSION *my_session = (TEE_SESSION *) session; + my_session->down = *downstream; +} + +/** + * Set the downstream filter or router to which queries will be + * passed from this filter. + * + * @param instance The filter instance data + * @param session The filter session + * @param downstream The downstream filter or router. + */ +static void setUpstream(MXS_FILTER *instance, MXS_FILTER_SESSION *session, MXS_UPSTREAM *upstream) +{ + TEE_SESSION *my_session = (TEE_SESSION *) session; + my_session->up = *upstream; +} + +/** + * Route a query + * + * @param instance Filter instance + * @param session Filter session + * @param queue The query itself + * + * @retrn 1 on success, 0 on failure + */ +static int routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *queue) +{ + TEE_SESSION *my_session = (TEE_SESSION *)session; + int rval = 0; + + if (my_session->passive || my_session->client->queue_query(queue)) + { + rval = my_session->down.routeQuery(my_session->down.instance, + my_session->down.session, + queue); + } + + return rval; +} + +/** + * The clientReply entry point. This is passed the response buffer + * to which the filter should be applied. Once processed the + * query is passed to the upstream component + * (filter or router) in the filter chain. + * + * @param instance The filter instance data + * @param session The filter session + * @param reply The response data + */ +static int +clientReply(MXS_FILTER* instance, MXS_FILTER_SESSION *session, GWBUF *reply) +{ + TEE_SESSION *my_session = (TEE_SESSION *) session; + + return my_session->up.clientReply(my_session->up.instance, + my_session->up.session, + reply); +} + +/** + * Diagnostics routine + * + * If fsession is NULL then print diagnostics on the filter + * instance as a whole, otherwise print diagnostics for the + * particular session. + * + * @param instance The filter instance + * @param fsession Filter session, may be NULL + * @param dcb The DCB for diagnostic output + */ +static void +diagnostic(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, DCB *dcb) +{ + TEE_INSTANCE *my_instance = (TEE_INSTANCE *) instance; + + if (my_instance->source) + { + dcb_printf(dcb, "\t\tLimit to connections from %s\n", + my_instance->source); + } + dcb_printf(dcb, "\t\tDuplicate statements to service %s\n", + my_instance->service->name); + if (my_instance->user) + { + dcb_printf(dcb, "\t\tLimit to user %s\n", + my_instance->user); + } + if (my_instance->match) + { + dcb_printf(dcb, "\t\tInclude queries that match %s\n", + my_instance->match); + } + if (my_instance->nomatch) + { + dcb_printf(dcb, "\t\tExclude queries that match %s\n", + my_instance->nomatch); + } +} + +/** + * Diagnostics routine + * + * If fsession is NULL then print diagnostics on the filter + * instance as a whole, otherwise print diagnostics for the + * particular session. + * + * @param instance The filter instance + * @param fsession Filter session, may be NULL + */ +static json_t* diagnostic_json(const MXS_FILTER *instance, const MXS_FILTER_SESSION *fsession) +{ + TEE_INSTANCE *my_instance = (TEE_INSTANCE*)instance; + + json_t* rval = json_object(); + + if (my_instance->source) + { + json_object_set_new(rval, "source", json_string(my_instance->source)); + } + + json_object_set_new(rval, "service", json_string(my_instance->service->name)); + + if (my_instance->user) + { + json_object_set_new(rval, "user", json_string(my_instance->user)); + } + + if (my_instance->match) + { + json_object_set_new(rval, "match", json_string(my_instance->match)); + } + + if (my_instance->nomatch) + { + json_object_set_new(rval, "exclude", json_string(my_instance->nomatch)); + } + + return rval; +} + +/** + * Capability routine. + * + * @return The capabilities of the filter. + */ +static uint64_t getCapabilities(MXS_FILTER* instance) +{ + return RCAP_TYPE_CONTIGUOUS_INPUT; +} + +/** + * Detect loops in the filter chain. + */ +bool recursive_tee_usage(std::set& services, SERVICE* service) +{ + if (!services.insert(service->name).second) + { + /** The service name was already in the set */ + return true; + } + + for (int i = 0; i < service->n_filters; i++) + { + const char* module = filter_def_get_module_name(service->filters[i]); + + if (strcmp(module, "tee") == 0) + { + /* + * Found a Tee filter, recurse down its path + * if the service name isn't already in the hashtable. + */ + TEE_INSTANCE* inst = (TEE_INSTANCE*)filter_def_get_instance(service->filters[i]); + + if (inst == NULL) + { + /** + * This tee instance hasn't been initialized yet and full + * resolution of recursion cannot be done now. + */ + } + else if (recursive_tee_usage(services, inst->service)) + { + return true; + } + } + } + + return false; +} + +MXS_BEGIN_DECLS + /** * The module entry point routine. It is this routine that * must populate the structure that is referred to as the @@ -274,11 +416,6 @@ static const MXS_ENUM_VALUE option_values[] = */ MXS_MODULE* MXS_CREATE_MODULE() { - spinlock_init(&orphanLock); -#ifdef SS_DEBUG - spinlock_init(&debug_lock); -#endif - static MXS_FILTER_OBJECT MyObject = { createInstance, @@ -328,641 +465,4 @@ MXS_MODULE* MXS_CREATE_MODULE() return &info; } -/** - * Create an instance of the filter for a particular service - * within MaxScale. - * - * @param name The name of the instance (as defined in the config file). - * @param options The options for this filter - * @param params The array of name/value pair parameters for the filter - * - * @return The instance data for this new instance - */ -static MXS_FILTER * -createInstance(const char *name, char **options, MXS_CONFIG_PARAMETER *params) -{ - TEE_INSTANCE *my_instance = (TEE_INSTANCE*)MXS_CALLOC(1, sizeof(TEE_INSTANCE)); - - if (my_instance) - { - my_instance->service = config_get_service(params, "service"); - my_instance->source = config_copy_string(params, "source"); - my_instance->userName = config_copy_string(params, "user"); - my_instance->match = config_copy_string(params, "match"); - my_instance->nomatch = config_copy_string(params, "exclude"); - - int cflags = config_get_enum(params, "options", option_values); - - if (my_instance->match && regcomp(&my_instance->re, my_instance->match, cflags)) - { - MXS_ERROR("Invalid regular expression '%s' for the match parameter.", - my_instance->match); - MXS_FREE(my_instance->match); - MXS_FREE(my_instance->nomatch); - MXS_FREE(my_instance->source); - MXS_FREE(my_instance->userName); - MXS_FREE(my_instance); - return NULL; - } - - if (my_instance->nomatch && regcomp(&my_instance->nore, my_instance->nomatch, cflags)) - { - MXS_ERROR("Invalid regular expression '%s' for the nomatch paramter.", - my_instance->nomatch); - if (my_instance->match) - { - regfree(&my_instance->re); - MXS_FREE(my_instance->match); - } - MXS_FREE(my_instance->nomatch); - MXS_FREE(my_instance->source); - MXS_FREE(my_instance->userName); - MXS_FREE(my_instance); - return NULL; - } - } - - return (MXS_FILTER *) my_instance; -} - -/** - * Associate a new session with this instance of the filter. - * - * Create the file to log to and open it. - * - * @param instance The filter instance data - * @param session The session itself - * @return Session specific data for this session - */ -static MXS_FILTER_SESSION * -newSession(MXS_FILTER *instance, MXS_SESSION *session) -{ - TEE_INSTANCE *my_instance = (TEE_INSTANCE *) instance; - TEE_SESSION *my_session; - const char *remote, *userName; - - if (strcmp(my_instance->service->name, session->service->name) == 0) - { - MXS_ERROR("%s: Recursive use of tee filter in service.", - session->service->name); - return NULL; - } - - HASHTABLE* ht = (HASHTABLE*)hashtable_alloc(100, hashtable_item_strhash, hashtable_item_strcmp); - bool is_loop = detect_loops(my_instance, ht, session->service); - hashtable_free(ht); - - if (is_loop) - { - MXS_ERROR("%s: Recursive use of tee filter in service.", - session->service->name); - return NULL; - } - - if ((my_session = (TEE_SESSION*)MXS_CALLOC(1, sizeof(TEE_SESSION))) != NULL) - { - my_session->active = 1; - my_session->residual = 0; - my_session->tee_replybuf = NULL; - my_session->client_dcb = session->client_dcb; - my_session->instance = my_instance; - my_session->client_multistatement = false; - my_session->queue = NULL; - spinlock_init(&my_session->tee_lock); - if (my_instance->source && - (remote = session_get_remote(session)) != NULL) - { - if (strcmp(remote, my_instance->source)) - { - my_session->active = 0; - - MXS_WARNING("Tee filter is not active."); - } - } - userName = session_get_user(session); - - if (my_instance->userName && - userName && - strcmp(userName, my_instance->userName)) - { - my_session->active = 0; - - MXS_WARNING("Tee filter is not active."); - } - - if (my_session->active) - { - DCB* dcb; - MXS_SESSION* ses; - if ((dcb = dcb_clone(session->client_dcb)) == NULL) - { - freeSession(instance, (MXS_FILTER_SESSION *) my_session); - MXS_ERROR("Creating client DCB for Tee " - "filter failed. Terminating session."); - return NULL; - } - - dcb->service = my_instance->service; - - if ((ses = session_alloc(my_instance->service, dcb)) == NULL) - { - dcb_close(dcb); - freeSession(instance, (MXS_FILTER_SESSION *) my_session); - MXS_ERROR("Creating client session for Tee " - "filter failed. Terminating session."); - return NULL; - } - - ss_dassert(ses->ses_is_child); - - my_session->branch_session = ses; - my_session->branch_dcb = dcb; - } - } - - return (MXS_FILTER_SESSION*)my_session; -} - -/** - * Close a session with the filter, this is the mechanism - * by which a filter may cleanup data structure etc. - * In the case of the tee filter we need to close down the - * "branch" session. - * - * @param instance The filter instance data - * @param session The session being closed - */ -static void -closeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session) -{ - TEE_SESSION *my_session = (TEE_SESSION *) session; - MXS_ROUTER_OBJECT *router; - void *router_instance, *rsession; - MXS_SESSION *bsession; -#ifdef SS_DEBUG - MXS_INFO("Tee close: %d", atomic_add(&debug_seq, 1)); -#endif - if (my_session->active) - { - - if ((bsession = my_session->branch_session) != NULL) - { - CHK_SESSION(bsession); - bsession->ses_is_child = false; - session_close(bsession); - } - - if (my_session->waiting[PARENT]) - { - if (my_session->command != 0x01 && - my_session->client_dcb && - my_session->client_dcb->state == DCB_STATE_POLLING) - { - MXS_INFO("Tee session closed mid-query."); - GWBUF* errbuf = modutil_create_mysql_err_msg(1, 0, 1, "00000", "Session closed."); - my_session->client_dcb->func.write(my_session->client_dcb, errbuf); - } - } - - - my_session->active = 0; - } -} - -/** - * Free the memory associated with the session - * - * @param instance The filter instance - * @param session The filter session - */ -static void -freeSession(MXS_FILTER *instance, MXS_FILTER_SESSION *session) -{ - TEE_SESSION *my_session = (TEE_SESSION *) session; - MXS_SESSION* ses = my_session->branch_session; - mxs_session_state_t state; -#ifdef SS_DEBUG - MXS_INFO("Tee free: %d", atomic_add(&debug_seq, 1)); -#endif - if (ses != NULL) - { - state = ses->state; - - if (state == SESSION_STATE_ROUTER_READY) - { - session_put_ref(ses); - } - else if (state == SESSION_STATE_TO_BE_FREED) - { - /** Free branch router session */ - ses->service->router->freeSession( - ses->service->router_instance, - ses->router_session); - /** Free memory of branch client session */ - ses->state = SESSION_STATE_FREE; - MXS_FREE(ses); - /** This indicates that branch session is not available anymore */ - my_session->branch_session = NULL; - } - else if (state == SESSION_STATE_STOPPING) - { - create_orphan(ses); - } - } - if (my_session->tee_replybuf) - { - gwbuf_free(my_session->tee_replybuf); - } - MXS_FREE(session); - - orphan_free(NULL); - - return; -} - -/** - * Set the downstream filter or router to which queries will be - * passed from this filter. - * - * @param instance The filter instance data - * @param session The filter session - * @param downstream The downstream filter or router. - */ -static void -setDownstream(MXS_FILTER *instance, MXS_FILTER_SESSION *session, MXS_DOWNSTREAM *downstream) -{ - TEE_SESSION *my_session = (TEE_SESSION *) session; - my_session->down = *downstream; -} - -/** - * Set the downstream filter or router to which queries will be - * passed from this filter. - * - * @param instance The filter instance data - * @param session The filter session - * @param downstream The downstream filter or router. - */ -static void -setUpstream(MXS_FILTER *instance, MXS_FILTER_SESSION *session, MXS_UPSTREAM *upstream) -{ - TEE_SESSION *my_session = (TEE_SESSION *) session; - my_session->up = *upstream; -} - -/** - * The routeQuery entry point. This is passed the query buffer - * to which the filter should be applied. Once applied the - * query should normally be passed to the downstream component - * (filter or router) in the filter chain. - * - * If my_session->residual is set then duplicate that many bytes - * and send them to the branch. - * - * If my_session->residual is zero then this must be a new request - * Extract the SQL text if possible, match against that text and forward - * the request. If the requets is not contained witin the packet we have - * then set my_session->residual to the number of outstanding bytes - * - * @param instance The filter instance data - * @param session The filter session - * @param queue The query data - */ -static int -routeQuery(MXS_FILTER *instance, MXS_FILTER_SESSION *session, GWBUF *queue) -{ - TEE_INSTANCE *my_instance = (TEE_INSTANCE *) instance; - TEE_SESSION *my_session = (TEE_SESSION *) session; - GWBUF *clone = clone_query(my_instance, my_session, queue); - - return route_single_query(my_instance, my_session, queue, clone); -} - -/** - * The clientReply entry point. This is passed the response buffer - * to which the filter should be applied. Once processed the - * query is passed to the upstream component - * (filter or router) in the filter chain. - * - * @param instance The filter instance data - * @param session The filter session - * @param reply The response data - */ -static int -clientReply(MXS_FILTER* instance, MXS_FILTER_SESSION *session, GWBUF *reply) -{ - int rc = 1, branch, eof; - TEE_SESSION *my_session = (TEE_SESSION *) session; - - return my_session->up.clientReply(my_session->up.instance, - my_session->up.session, - reply); -} - -/** - * Diagnostics routine - * - * If fsession is NULL then print diagnostics on the filter - * instance as a whole, otherwise print diagnostics for the - * particular session. - * - * @param instance The filter instance - * @param fsession Filter session, may be NULL - * @param dcb The DCB for diagnostic output - */ -static void -diagnostic(MXS_FILTER *instance, MXS_FILTER_SESSION *fsession, DCB *dcb) -{ - TEE_INSTANCE *my_instance = (TEE_INSTANCE *) instance; - TEE_SESSION *my_session = (TEE_SESSION *) fsession; - - if (my_instance->source) - { - dcb_printf(dcb, "\t\tLimit to connections from %s\n", - my_instance->source); - } - dcb_printf(dcb, "\t\tDuplicate statements to service %s\n", - my_instance->service->name); - if (my_instance->userName) - { - dcb_printf(dcb, "\t\tLimit to user %s\n", - my_instance->userName); - } - if (my_instance->match) - { - dcb_printf(dcb, "\t\tInclude queries that match %s\n", - my_instance->match); - } - if (my_instance->nomatch) - { - dcb_printf(dcb, "\t\tExclude queries that match %s\n", - my_instance->nomatch); - } - if (my_session) - { - dcb_printf(dcb, "\t\tNo. of statements duplicated: %d.\n", - my_session->n_duped); - dcb_printf(dcb, "\t\tNo. of statements rejected: %d.\n", - my_session->n_rejected); - } -} - -/** - * Diagnostics routine - * - * If fsession is NULL then print diagnostics on the filter - * instance as a whole, otherwise print diagnostics for the - * particular session. - * - * @param instance The filter instance - * @param fsession Filter session, may be NULL - */ -static json_t* diagnostic_json(const MXS_FILTER *instance, const MXS_FILTER_SESSION *fsession) -{ - TEE_INSTANCE *my_instance = (TEE_INSTANCE*)instance; - TEE_SESSION *my_session = (TEE_SESSION*)fsession; - - json_t* rval = json_object(); - - if (my_instance->source) - { - json_object_set_new(rval, "source", json_string(my_instance->source)); - } - - json_object_set_new(rval, "service", json_string(my_instance->service->name)); - - if (my_instance->userName) - { - json_object_set_new(rval, "user", json_string(my_instance->userName)); - } - - if (my_instance->match) - { - json_object_set_new(rval, "match", json_string(my_instance->match)); - } - - if (my_instance->nomatch) - { - json_object_set_new(rval, "exclude", json_string(my_instance->nomatch)); - } - - if (my_session) - { - json_object_set_new(rval, "duplicated", json_integer(my_session->n_duped)); - json_object_set_new(rval, "rejected", json_integer(my_session->n_duped)); - } - - return rval; -} - -/** - * Capability routine. - * - * @return The capabilities of the filter. - */ -static uint64_t getCapabilities(MXS_FILTER* instance) -{ - return RCAP_TYPE_NONE; -} - -/** - * Determine if the packet is a command that must be sent to the branch - * to maintain the session consistancy. These are COM_INIT_DB, - * COM_CHANGE_USER and COM_QUIT packets. - * - * @param queue The buffer to check - * @return non-zero if the packet should be sent to the branch - */ -static int -packet_is_required(GWBUF *queue) -{ - uint8_t *ptr; - int i; - - ptr = GWBUF_DATA(queue); - if (GWBUF_LENGTH(queue) > 4) - { - for (i = 0; required_packets[i]; i++) - { - if (ptr[4] == required_packets[i]) - { - return 1; - } - } - } - return 0; -} - -/** - * Detects possible loops in the query cloning chain. - */ -int detect_loops(TEE_INSTANCE *instance, HASHTABLE* ht, SERVICE* service) -{ - SERVICE* svc = service; - int i; - - if (ht == NULL) - { - return -1; - } - - if (hashtable_add(ht, (void*) service->name, (void*) true) == 0) - { - return true; - } - - for (i = 0; i < svc->n_filters; i++) - { - const char* module = filter_def_get_module_name(svc->filters[i]); - if (strcmp(module, "tee") == 0) - { - /* - * Found a Tee filter, recurse down its path - * if the service name isn't already in the hashtable. - */ - - TEE_INSTANCE* ninst = (TEE_INSTANCE*)filter_def_get_instance(svc->filters[i]); - if (ninst == NULL) - { - /** - * This tee instance hasn't been initialized yet and full - * resolution of recursion cannot be done now. - */ - continue; - } - SERVICE* tgt = ninst->service; - - if (detect_loops(ninst, ht, tgt)) - { - return true; - } - - } - } - - return false; -} - -GWBUF* clone_query(TEE_INSTANCE* my_instance, TEE_SESSION* my_session, GWBUF* buffer) -{ - GWBUF* clone = NULL; - - if ((!my_instance->match && !my_instance->nomatch) || packet_is_required(buffer)) - { - clone = gwbuf_clone(buffer); - } - else - { - char *ptr = modutil_get_SQL(buffer); - - if (ptr) - { - if ((my_instance->match && regexec(&my_instance->re, ptr, 0, NULL, 0) == 0) || - (my_instance->nomatch && regexec(&my_instance->nore, ptr, 0, NULL, 0) != 0)) - { - clone = gwbuf_clone(buffer); - } - MXS_FREE(ptr); - } - } - - return clone; -} - -/** - * Route the main query downstream along the main filter chain and possibly route - * a clone of the buffer to the branch session. If the clone buffer is NULL, nothing - * is routed to the branch session. - * @param my_instance Tee instance - * @param my_session Tee session - * @param buffer Main buffer - * @param clone Cloned buffer - * @return 1 on success, 0 on failure. - */ -int route_single_query(TEE_INSTANCE* my_instance, TEE_SESSION* my_session, GWBUF* buffer, GWBUF* clone) -{ - int rval = 0; - - if (my_session->active && my_session->branch_session && - my_session->branch_session->state == SESSION_STATE_ROUTER_READY) - { - - rval = my_session->down.routeQuery(my_session->down.instance, - my_session->down.session, - buffer); - if (clone) - { - my_session->n_duped++; - - if (my_session->branch_session->state == SESSION_STATE_ROUTER_READY) - { - MXS_SESSION_ROUTE_QUERY(my_session->branch_session, clone); - } - else - { - /** Close tee session */ - my_session->active = 0; - rval = 0; - MXS_INFO("Closed tee filter session: Child session in invalid state."); - gwbuf_free(clone); - } - } - } - - return rval; -} - -/** - * Reset the session's internal counters. - * @param my_session Tee session - * @param buffer Buffer with the query of the main branch in it - * @return 1 on success, 0 on error - */ -int reset_session_state(TEE_SESSION* my_session, GWBUF* buffer) -{ - if (gwbuf_length(buffer) < 5) - { - return 0; - } - - unsigned char command = *((unsigned char*) buffer->start + 4); - - switch (command) - { - case 0x1b: - my_session->client_multistatement = *((unsigned char*) buffer->start + 5); - MXS_INFO("client %s multistatements", - my_session->client_multistatement ? "enabled" : "disabled"); - case 0x03: - case 0x16: - case 0x17: - case 0x04: - case 0x0a: - memset(my_session->multipacket, (char) true, 2 * sizeof(bool)); - break; - default: - memset(my_session->multipacket, (char) false, 2 * sizeof(bool)); - break; - } - - memset(my_session->replies, 0, 2 * sizeof(int)); - memset(my_session->reply_packets, 0, 2 * sizeof(int)); - memset(my_session->eof, 0, 2 * sizeof(int)); - memset(my_session->waiting, 1, 2 * sizeof(bool)); - my_session->command = command; - - return 1; -} - -void create_orphan(MXS_SESSION* ses) -{ - orphan_session_t* orphan = (orphan_session_t*)MXS_MALLOC(sizeof(orphan_session_t)); - if (orphan) - { - orphan->session = ses; - spinlock_acquire(&orphanLock); - orphan->next = allOrphans; - allOrphans = orphan; - spinlock_release(&orphanLock); - } -} +MXS_END_DECLS