1690 lines
		
	
	
		
			45 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			1690 lines
		
	
	
		
			45 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /*
 | |
|  * Copyright (c) 2016 MariaDB Corporation Ab
 | |
|  *
 | |
|  * Use of this software is governed by the Business Source License included
 | |
|  * in the LICENSE.TXT file and at www.mariadb.com/bsl11.
 | |
|  *
 | |
|  * Change Date: 2024-10-14
 | |
|  *
 | |
|  * On the date above, in accordance with the Business Source License, use
 | |
|  * of this software will be governed by version 2 or later of the General
 | |
|  * Public License.
 | |
|  */
 | |
| 
 | |
| /**
 | |
|  * @file session.c  - A representation of the session within the gateway.
 | |
|  */
 | |
| #include <maxscale/session.hh>
 | |
| 
 | |
| #include <inttypes.h>
 | |
| #include <stdio.h>
 | |
| #include <stdlib.h>
 | |
| #include <unistd.h>
 | |
| #include <string.h>
 | |
| #include <errno.h>
 | |
| #include <algorithm>
 | |
| #include <string>
 | |
| #include <sstream>
 | |
| 
 | |
| #include <maxbase/atomic.hh>
 | |
| #include <maxbase/host.hh>
 | |
| #include <maxbase/alloc.h>
 | |
| #include <maxscale/clock.h>
 | |
| #include <maxscale/dcb.hh>
 | |
| #include <maxscale/housekeeper.h>
 | |
| #include <maxscale/json_api.hh>
 | |
| #include <maxscale/modutil.hh>
 | |
| #include <maxscale/poll.hh>
 | |
| #include <maxscale/router.hh>
 | |
| #include <maxscale/routingworker.hh>
 | |
| #include <maxscale/service.hh>
 | |
| #include <maxscale/utils.h>
 | |
| #include <maxscale/protocol/mysql.hh>
 | |
| 
 | |
| #include "internal/dcb.hh"
 | |
| #include "internal/filter.hh"
 | |
| #include "internal/session.hh"
 | |
| #include "internal/service.hh"
 | |
| 
 | |
| using std::string;
 | |
| using std::stringstream;
 | |
| using maxbase::Worker;
 | |
| using namespace maxscale;
 | |
| 
 | |
| namespace
 | |
| {
 | |
| 
 | |
| struct
 | |
| {
 | |
|     /* Global session id counter. Must be updated atomically. Value 0 is reserved for
 | |
|      *  dummy/unused sessions.
 | |
|      */
 | |
|     uint64_t                  next_session_id;
 | |
|     uint32_t                  retain_last_statements;
 | |
|     session_dump_statements_t dump_statements;
 | |
|     uint32_t                  session_trace;
 | |
| } this_unit =
 | |
| {
 | |
|     1,
 | |
|     0,
 | |
|     SESSION_DUMP_STATEMENTS_NEVER,
 | |
|     0
 | |
| };
 | |
| }
 | |
| 
 | |
| static void         session_initialize(void* session);
 | |
| static int          session_setup_filters(MXS_SESSION* session);
 | |
| static void         session_simple_free(MXS_SESSION* session, DCB* dcb);
 | |
| static void         session_add_to_all_list(MXS_SESSION* session);
 | |
| static MXS_SESSION* session_find_free();
 | |
| static void         session_final_free(MXS_SESSION* session);
 | |
| static void         session_deliver_response(MXS_SESSION* session);
 | |
| 
 | |
| /**
 | |
|  * The clientReply of the session.
 | |
|  *
 | |
|  * @param inst     In reality an MXS_SESSION*.
 | |
|  * @param session  In reality an MXS_SESSION*.
 | |
|  * @param data     The data to send to the client.
 | |
|  */
 | |
| static int session_reply(MXS_FILTER* inst, MXS_FILTER_SESSION* session, GWBUF* data);
 | |
| 
 | |
| MXS_SESSION::MXS_SESSION(const SListener& listener)
 | |
|     : state(SESSION_STATE_CREATED)
 | |
|     , ses_id(session_get_next_id())
 | |
|     , client_dcb(nullptr)
 | |
|     , listener(listener)
 | |
|     , router_session(nullptr)
 | |
|     , stats{time(0)}
 | |
|     , service(listener ? listener->service() : nullptr)
 | |
|     , head{}
 | |
|     , tail{}
 | |
|     , refcount(1)
 | |
|     , trx_state(SESSION_TRX_INACTIVE)
 | |
|     , autocommit(config_get_global_options()->qc_sql_mode == QC_SQL_MODE_ORACLE ? false : true)
 | |
|     , client_protocol_data(0)
 | |
|     , qualifies_for_pooling(false)
 | |
|     , response{}
 | |
|     , close_reason(SESSION_CLOSE_NONE)
 | |
|     , load_active(false)
 | |
| {
 | |
| }
 | |
| 
 | |
| MXS_SESSION::~MXS_SESSION()
 | |
| {
 | |
| }
 | |
| 
 | |
| bool session_start(MXS_SESSION* session)
 | |
| {
 | |
|     session->router_session = session->service->router->newSession(session->service->router_instance,
 | |
|                                                                    session);
 | |
| 
 | |
|     if (session->router_session == NULL)
 | |
|     {
 | |
|         session->state = SESSION_STATE_TO_BE_FREED;
 | |
|         MXS_ERROR("Failed to create new router session for service '%s'. "
 | |
|                   "See previous errors for more details.", session->service->name());
 | |
|         return false;
 | |
|     }
 | |
| 
 | |
|     /*
 | |
|      * Pending filter chain being setup set the head of the chain to
 | |
|      * be the router. As filters are inserted the current head will
 | |
|      * be pushed to the filter and the head updated.
 | |
|      *
 | |
|      * NB This dictates that filters are created starting at the end
 | |
|      * of the chain nearest the router working back to the client
 | |
|      * protocol end of the chain.
 | |
|      */
 | |
|     // NOTE: Here we cast the router instance into a MXS_FILTER and
 | |
|     // NOTE: the router session into a MXS_FILTER_SESSION and
 | |
|     // NOTE: the router routeQuery into a filter routeQuery. That
 | |
|     // NOTE: is in order to be able to treat the router as the first
 | |
|     // NOTE: filter.
 | |
|     session->head = router_as_downstream(session);
 | |
| 
 | |
|     // NOTE: Here we cast the session into a MXS_FILTER and MXS_FILTER_SESSION
 | |
|     // NOTE: and session_reply into a filter clientReply. That's dubious but ok
 | |
|     // NOTE: as session_reply will know what to do. In practice, the session
 | |
|     // NOTE: will be called as if it would be the last filter.
 | |
|     session->tail.instance = (MXS_FILTER*)session;
 | |
|     session->tail.session = (MXS_FILTER_SESSION*)session;
 | |
|     session->tail.clientReply = session_reply;
 | |
| 
 | |
|     if (!session_setup_filters(session))
 | |
|     {
 | |
|         session->state = SESSION_STATE_TO_BE_FREED;
 | |
|         MXS_ERROR("Setting up filters failed. Terminating session %s.", session->service->name());
 | |
|         return false;
 | |
|     }
 | |
| 
 | |
|     session->state = SESSION_STATE_STARTED;
 | |
|     mxb::atomic::add(&session->service->stats.n_sessions, 1, mxb::atomic::RELAXED);
 | |
| 
 | |
|     MXS_INFO("Started %s client session [%" PRIu64 "] for '%s' from %s",
 | |
|              session->service->name(), session->ses_id,
 | |
|              session->client_dcb->user ? session->client_dcb->user : "<no user>",
 | |
|              session->client_dcb->remote);
 | |
| 
 | |
|     return true;
 | |
| }
 | |
| 
 | |
| void session_link_backend_dcb(MXS_SESSION* session, DCB* dcb)
 | |
| {
 | |
|     mxb_assert(dcb->owner == session->client_dcb->owner);
 | |
|     mxb_assert(dcb->role == DCB::Role::BACKEND);
 | |
| 
 | |
|     mxb::atomic::add(&session->refcount, 1);
 | |
|     dcb->session = session;
 | |
| 
 | |
|     Session* ses = static_cast<Session*>(session);
 | |
|     ses->link_backend_dcb(dcb);
 | |
| }
 | |
| 
 | |
| void session_unlink_backend_dcb(MXS_SESSION* session, DCB* dcb)
 | |
| {
 | |
|     Session* ses = static_cast<Session*>(session);
 | |
|     ses->unlink_backend_dcb(dcb);
 | |
|     session_put_ref(session);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Deallocate the specified session, minimal actions during session_alloc
 | |
|  * Since changes to keep new session in existence until all related DCBs
 | |
|  * have been destroyed, this function is redundant.  Just left until we are
 | |
|  * sure of the direction taken.
 | |
|  *
 | |
|  * @param session       The session to deallocate
 | |
|  */
 | |
| static void session_simple_free(MXS_SESSION* session, DCB* dcb)
 | |
| {
 | |
|     /* Does this possibly need a lock? */
 | |
|     if (dcb->data)
 | |
|     {
 | |
|         void* clientdata = dcb->data;
 | |
|         dcb->data = NULL;
 | |
|         MXS_FREE(clientdata);
 | |
|     }
 | |
|     if (session)
 | |
|     {
 | |
|         if (session->router_session)
 | |
|         {
 | |
|             session->service->router->freeSession(session->service->router_instance,
 | |
|                                                   session->router_session);
 | |
|         }
 | |
|         session->state = SESSION_STATE_STOPPING;
 | |
|     }
 | |
| 
 | |
|     session_final_free(session);
 | |
| }
 | |
| 
 | |
| void session_close(MXS_SESSION* session)
 | |
| {
 | |
|     if (session->router_session)
 | |
|     {
 | |
|         session->state = SESSION_STATE_STOPPING;
 | |
| 
 | |
|         MXS_ROUTER_OBJECT* router = session->service->router;
 | |
|         MXS_ROUTER* router_instance = session->service->router_instance;
 | |
| 
 | |
|         /** Close router session and all its connections */
 | |
|         router->closeSession(router_instance, session->router_session);
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Deallocate the specified session
 | |
|  *
 | |
|  * @param session       The session to deallocate
 | |
|  */
 | |
| static void session_free(MXS_SESSION* session)
 | |
| {
 | |
|     MXS_INFO("Stopped %s client session [%" PRIu64 "]", session->service->name(), session->ses_id);
 | |
|     session_final_free(session);
 | |
| }
 | |
| 
 | |
| static void session_final_free(MXS_SESSION* ses)
 | |
| {
 | |
|     Session* session = static_cast<Session*>(ses);
 | |
|     mxb_assert(session->refcount == 0);
 | |
| 
 | |
|     session->state = SESSION_STATE_TO_BE_FREED;
 | |
| 
 | |
|     if (session->client_dcb)
 | |
|     {
 | |
|         dcb_free_all_memory(session->client_dcb);
 | |
|         session->client_dcb = NULL;
 | |
|     }
 | |
| 
 | |
|     if (this_unit.dump_statements == SESSION_DUMP_STATEMENTS_ON_CLOSE)
 | |
|     {
 | |
|         session_dump_statements(session);
 | |
|     }
 | |
| 
 | |
|     session->state = SESSION_STATE_FREE;
 | |
| 
 | |
|     delete session;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Check to see if a session is valid, i.e. in the list of all sessions
 | |
|  *
 | |
|  * @param session       Session to check
 | |
|  * @return              1 if the session is valid otherwise 0
 | |
|  */
 | |
| int session_isvalid(MXS_SESSION* session)
 | |
| {
 | |
|     return session != NULL;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Print details of an individual session
 | |
|  *
 | |
|  * @param session       Session to print
 | |
|  */
 | |
| void printSession(MXS_SESSION* session)
 | |
| {
 | |
|     struct tm result;
 | |
|     char timebuf[40];
 | |
| 
 | |
|     printf("Session %p\n", session);
 | |
|     printf("\tState:        %s\n", session_state_to_string(session->state));
 | |
|     printf("\tService:      %s (%p)\n", session->service->name(), session->service);
 | |
|     printf("\tClient DCB:   %p\n", session->client_dcb);
 | |
|     printf("\tConnected:    %s\n",
 | |
|            asctime_r(localtime_r(&session->stats.connect, &result), timebuf));
 | |
|     printf("\tRouter Session: %p\n", session->router_session);
 | |
| }
 | |
| 
 | |
| bool printAllSessions_cb(DCB* dcb, void* data)
 | |
| {
 | |
|     if (dcb->role == DCB::Role::CLIENT)
 | |
|     {
 | |
|         printSession(dcb->session);
 | |
|     }
 | |
| 
 | |
|     return true;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Print all sessions
 | |
|  *
 | |
|  * Designed to be called within a debugger session in order
 | |
|  * to display all active sessions within the gateway
 | |
|  */
 | |
| void printAllSessions()
 | |
| {
 | |
|     dcb_foreach(printAllSessions_cb, NULL);
 | |
| }
 | |
| 
 | |
| /** Callback for dprintAllSessions */
 | |
| bool dprintAllSessions_cb(DCB* dcb, void* data)
 | |
| {
 | |
|     if (dcb->role == DCB::Role::CLIENT)
 | |
|     {
 | |
|         DCB* out_dcb = (DCB*)data;
 | |
|         dprintSession(out_dcb, dcb->session);
 | |
|     }
 | |
|     return true;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Print all sessions to a DCB
 | |
|  *
 | |
|  * Designed to be called within a debugger session in order
 | |
|  * to display all active sessions within the gateway
 | |
|  *
 | |
|  * @param dcb   The DCB to print to
 | |
|  */
 | |
| void dprintAllSessions(DCB* dcb)
 | |
| {
 | |
|     dcb_foreach(dprintAllSessions_cb, dcb);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Print a particular session to a DCB
 | |
|  *
 | |
|  * Designed to be called within a debugger session in order
 | |
|  * to display all active sessions within the gateway
 | |
|  *
 | |
|  * @param dcb   The DCB to print to
 | |
|  * @param print_session   The session to print
 | |
|  */
 | |
| void dprintSession(DCB* dcb, MXS_SESSION* print_session)
 | |
| {
 | |
|     struct tm result;
 | |
|     char buf[30];
 | |
|     int i;
 | |
| 
 | |
|     dcb_printf(dcb, "Session %" PRIu64 "\n", print_session->ses_id);
 | |
|     dcb_printf(dcb, "\tState:               %s\n", session_state_to_string(print_session->state));
 | |
|     dcb_printf(dcb, "\tService:             %s\n", print_session->service->name());
 | |
| 
 | |
|     if (print_session->client_dcb && print_session->client_dcb->remote)
 | |
|     {
 | |
|         double idle = (mxs_clock() - print_session->client_dcb->last_read);
 | |
|         idle = idle > 0 ? idle / 10.f : 0;
 | |
|         dcb_printf(dcb,
 | |
|                    "\tClient Address:          %s%s%s\n",
 | |
|                    print_session->client_dcb->user ? print_session->client_dcb->user : "",
 | |
|                    print_session->client_dcb->user ? "@" : "",
 | |
|                    print_session->client_dcb->remote);
 | |
|         dcb_printf(dcb,
 | |
|                    "\tConnected:               %s\n",
 | |
|                    asctime_r(localtime_r(&print_session->stats.connect, &result), buf));
 | |
|         if (print_session->client_dcb->state == DCB_STATE_POLLING)
 | |
|         {
 | |
|             dcb_printf(dcb, "\tIdle:                %.0f seconds\n", idle);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     Session* session = static_cast<Session*>(print_session);
 | |
| 
 | |
|     for (const auto& f : session->get_filters())
 | |
|     {
 | |
|         dcb_printf(dcb, "\tFilter: %s\n", f.filter->name.c_str());
 | |
|         f.filter->obj->diagnostics(f.instance, f.session, dcb);
 | |
|     }
 | |
| }
 | |
| 
 | |
| bool dListSessions_cb(DCB* dcb, void* data)
 | |
| {
 | |
|     if (dcb->role == DCB::Role::CLIENT)
 | |
|     {
 | |
|         DCB* out_dcb = (DCB*)data;
 | |
|         MXS_SESSION* session = dcb->session;
 | |
|         dcb_printf(out_dcb,
 | |
|                    "%-16" PRIu64 " | %-15s | %-14s | %s\n",
 | |
|                    session->ses_id,
 | |
|                    session->client_dcb && session->client_dcb->remote ?
 | |
|                    session->client_dcb->remote : "",
 | |
|                    session->service && session->service->name() ?
 | |
|                    session->service->name() : "",
 | |
|                    session_state_to_string(session->state));
 | |
|     }
 | |
| 
 | |
|     return true;
 | |
| }
 | |
| /**
 | |
|  * List all sessions in tabular form to a DCB
 | |
|  *
 | |
|  * Designed to be called within a debugger session in order
 | |
|  * to display all active sessions within the gateway
 | |
|  *
 | |
|  * @param dcb   The DCB to print to
 | |
|  */
 | |
| void dListSessions(DCB* dcb)
 | |
| {
 | |
|     dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n");
 | |
|     dcb_printf(dcb, "Session          | Client          | Service        | State\n");
 | |
|     dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n");
 | |
| 
 | |
|     dcb_foreach(dListSessions_cb, dcb);
 | |
| 
 | |
|     dcb_printf(dcb, "-----------------+-----------------+----------------+--------------------------\n\n");
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Convert a session state to a string representation
 | |
|  *
 | |
|  * @param state         The session state
 | |
|  * @return A string representation of the session state
 | |
|  */
 | |
| const char* session_state_to_string(mxs_session_state_t state)
 | |
| {
 | |
|     switch (state)
 | |
|     {
 | |
|     case SESSION_STATE_CREATED:
 | |
|         return "Session created";
 | |
| 
 | |
|     case SESSION_STATE_STARTED:
 | |
|         return "Session started";
 | |
| 
 | |
|     case SESSION_STATE_STOPPING:
 | |
|         return "Stopping session";
 | |
| 
 | |
|     case SESSION_STATE_TO_BE_FREED:
 | |
|         return "Session to be freed";
 | |
| 
 | |
|     case SESSION_STATE_FREE:
 | |
|         return "Freed session";
 | |
| 
 | |
|     default:
 | |
|         return "Invalid State";
 | |
|     }
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Create the filter chain for this session.
 | |
|  *
 | |
|  * Filters must be setup in reverse order, starting with the last
 | |
|  * filter in the chain and working back towards the client connection
 | |
|  * Each filter is passed the current session head of the filter chain
 | |
|  * this head becomes the destination for the filter. The newly created
 | |
|  * filter becomes the new head of the filter chain.
 | |
|  *
 | |
|  * @param       session         The session that requires the chain
 | |
|  * @return      0 if filter creation fails
 | |
|  */
 | |
| static int session_setup_filters(MXS_SESSION* ses)
 | |
| {
 | |
|     Service* service = static_cast<Service*>(ses->service);
 | |
|     Session* session = static_cast<Session*>(ses);
 | |
|     return session->setup_filters(service);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Entry point for the final element in the upstream filter, i.e. the writing
 | |
|  * of the data to the client.
 | |
|  *
 | |
|  * Looks like a filter `clientReply`, but in this case both the instance and
 | |
|  * the session argument will be a MXS_SESSION*.
 | |
|  *
 | |
|  * @param       instance        The "instance" data
 | |
|  * @param       session         The session
 | |
|  * @param       data            The buffer chain to write
 | |
|  */
 | |
| int session_reply(MXS_FILTER* instance, MXS_FILTER_SESSION* session, GWBUF* data)
 | |
| {
 | |
|     MXS_SESSION* the_session = (MXS_SESSION*)session;
 | |
| 
 | |
|     return the_session->client_dcb->func.write(the_session->client_dcb, data);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Return the client connection address or name
 | |
|  *
 | |
|  * @param session       The session whose client address to return
 | |
|  */
 | |
| const char* session_get_remote(const MXS_SESSION* session)
 | |
| {
 | |
|     if (session && session->client_dcb)
 | |
|     {
 | |
|         return session->client_dcb->remote;
 | |
|     }
 | |
|     return NULL;
 | |
| }
 | |
| 
 | |
| bool session_route_query(MXS_SESSION* session, GWBUF* buffer)
 | |
| {
 | |
|     mxb_assert(session);
 | |
|     mxb_assert(session->head.routeQuery);
 | |
|     mxb_assert(session->head.instance);
 | |
|     mxb_assert(session->head.session);
 | |
| 
 | |
|     bool rv;
 | |
| 
 | |
|     if (session->head.routeQuery(session->head.instance, session->head.session, buffer) == 1)
 | |
|     {
 | |
|         rv = true;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         rv = false;
 | |
|     }
 | |
| 
 | |
|     // In case some filter has short-circuited the request processing we need
 | |
|     // to deliver that now to the client.
 | |
|     session_deliver_response(session);
 | |
| 
 | |
|     return rv;
 | |
| }
 | |
| 
 | |
| bool session_route_reply(MXS_SESSION* session, GWBUF* buffer)
 | |
| {
 | |
|     mxb_assert(session);
 | |
|     mxb_assert(session->tail.clientReply);
 | |
|     mxb_assert(session->tail.instance);
 | |
|     mxb_assert(session->tail.session);
 | |
| 
 | |
|     bool rv;
 | |
| 
 | |
|     if (session->tail.clientReply(session->tail.instance, session->tail.session, buffer) == 1)
 | |
|     {
 | |
|         rv = true;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         rv = false;
 | |
|     }
 | |
| 
 | |
|     return rv;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Return the username of the user connected to the client side of the
 | |
|  * session.
 | |
|  *
 | |
|  * @param session               The session pointer.
 | |
|  * @return      The user name or NULL if it can not be determined.
 | |
|  */
 | |
| const char* session_get_user(const MXS_SESSION* session)
 | |
| {
 | |
|     return (session && session->client_dcb) ? session->client_dcb->user : NULL;
 | |
| }
 | |
| 
 | |
| bool dcb_iter_cb(DCB* dcb, void* data)
 | |
| {
 | |
|     if (dcb->role == DCB::Role::CLIENT)
 | |
|     {
 | |
|         ResultSet* set = static_cast<ResultSet*>(data);
 | |
|         MXS_SESSION* ses = dcb->session;
 | |
|         char buf[20];
 | |
|         snprintf(buf, sizeof(buf), "%p", ses);
 | |
| 
 | |
|         set->add_row({buf, ses->client_dcb->remote, ses->service->name(),
 | |
|                       session_state_to_string(ses->state)});
 | |
|     }
 | |
| 
 | |
|     return true;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Return a resultset that has the current set of sessions in it
 | |
|  *
 | |
|  * @return A Result set
 | |
|  */
 | |
| /* Lint is not convinced that the new memory for data is always tracked
 | |
|  * because it does not see what happens within the resultset_create function,
 | |
|  * so we suppress the warning. In fact, the function call results in return
 | |
|  * of the set structure which includes a pointer to data
 | |
|  */
 | |
| std::unique_ptr<ResultSet> sessionGetList()
 | |
| {
 | |
|     std::unique_ptr<ResultSet> set = ResultSet::create({"Session", "Client", "Service", "State"});
 | |
|     dcb_foreach(dcb_iter_cb, set.get());
 | |
|     return set;
 | |
| }
 | |
| 
 | |
| mxs_session_trx_state_t session_get_trx_state(const MXS_SESSION* ses)
 | |
| {
 | |
|     return ses->trx_state;
 | |
| }
 | |
| 
 | |
| mxs_session_trx_state_t session_set_trx_state(MXS_SESSION* ses, mxs_session_trx_state_t new_state)
 | |
| {
 | |
|     mxs_session_trx_state_t prev_state = ses->trx_state;
 | |
| 
 | |
|     ses->trx_state = new_state;
 | |
| 
 | |
|     return prev_state;
 | |
| }
 | |
| 
 | |
| const char* session_trx_state_to_string(mxs_session_trx_state_t state)
 | |
| {
 | |
|     switch (state)
 | |
|     {
 | |
|     case SESSION_TRX_INACTIVE:
 | |
|         return "SESSION_TRX_INACTIVE";
 | |
| 
 | |
|     case SESSION_TRX_ACTIVE:
 | |
|         return "SESSION_TRX_ACTIVE";
 | |
| 
 | |
|     case SESSION_TRX_READ_ONLY:
 | |
|         return "SESSION_TRX_READ_ONLY";
 | |
| 
 | |
|     case SESSION_TRX_READ_WRITE:
 | |
|         return "SESSION_TRX_READ_WRITE";
 | |
| 
 | |
|     case SESSION_TRX_READ_ONLY_ENDING:
 | |
|         return "SESSION_TRX_READ_ONLY_ENDING";
 | |
| 
 | |
|     case SESSION_TRX_READ_WRITE_ENDING:
 | |
|         return "SESSION_TRX_READ_WRITE_ENDING";
 | |
|     }
 | |
| 
 | |
|     MXS_ERROR("Unknown session_trx_state_t value: %d", (int)state);
 | |
|     return "UNKNOWN";
 | |
| }
 | |
| 
 | |
| static bool ses_find_id(DCB* dcb, void* data)
 | |
| {
 | |
|     void** params = (void**)data;
 | |
|     MXS_SESSION** ses = (MXS_SESSION**)params[0];
 | |
|     uint64_t* id = (uint64_t*)params[1];
 | |
|     bool rval = true;
 | |
| 
 | |
|     if (dcb->session->ses_id == *id)
 | |
|     {
 | |
|         *ses = session_get_ref(dcb->session);
 | |
|         rval = false;
 | |
|     }
 | |
| 
 | |
|     return rval;
 | |
| }
 | |
| 
 | |
| MXS_SESSION* session_get_by_id(uint64_t id)
 | |
| {
 | |
|     MXS_SESSION* session = NULL;
 | |
|     void* params[] = {&session, &id};
 | |
| 
 | |
|     dcb_foreach(ses_find_id, params);
 | |
| 
 | |
|     return session;
 | |
| }
 | |
| 
 | |
| MXS_SESSION* session_get_ref(MXS_SESSION* session)
 | |
| {
 | |
|     mxb::atomic::add(&session->refcount, 1);
 | |
|     return session;
 | |
| }
 | |
| 
 | |
| void session_put_ref(MXS_SESSION* session)
 | |
| {
 | |
|     if (session)
 | |
|     {
 | |
|         /** Remove one reference. If there are no references left, free session */
 | |
|         if (mxb::atomic::add(&session->refcount, -1) == 1)
 | |
|         {
 | |
|             session_free(session);
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| uint64_t session_get_next_id()
 | |
| {
 | |
|     return mxb::atomic::add(&this_unit.next_session_id, 1, mxb::atomic::RELAXED);
 | |
| }
 | |
| 
 | |
| json_t* session_json_data(const Session* session, const char* host, bool rdns)
 | |
| {
 | |
|     json_t* data = json_object();
 | |
| 
 | |
|     /** ID must be a string */
 | |
|     stringstream ss;
 | |
|     ss << session->ses_id;
 | |
| 
 | |
|     /** ID and type */
 | |
|     json_object_set_new(data, CN_ID, json_string(ss.str().c_str()));
 | |
|     json_object_set_new(data, CN_TYPE, json_string(CN_SESSIONS));
 | |
| 
 | |
|     /** Relationships */
 | |
|     json_t* rel = json_object();
 | |
| 
 | |
|     /** Service relationship (one-to-one) */
 | |
|     json_t* services = mxs_json_relationship(host, MXS_JSON_API_SERVICES);
 | |
|     mxs_json_add_relation(services, session->service->name(), CN_SERVICES);
 | |
|     json_object_set_new(rel, CN_SERVICES, services);
 | |
| 
 | |
|     /** Filter relationships (one-to-many) */
 | |
|     auto filter_list = session->get_filters();
 | |
| 
 | |
|     if (!filter_list.empty())
 | |
|     {
 | |
|         json_t* filters = mxs_json_relationship(host, MXS_JSON_API_FILTERS);
 | |
| 
 | |
|         for (const auto& f : filter_list)
 | |
|         {
 | |
|             mxs_json_add_relation(filters, f.filter->name.c_str(), CN_FILTERS);
 | |
|         }
 | |
|         json_object_set_new(rel, CN_FILTERS, filters);
 | |
|     }
 | |
| 
 | |
|     json_object_set_new(data, CN_RELATIONSHIPS, rel);
 | |
| 
 | |
|     /** Session attributes */
 | |
|     json_t* attr = json_object();
 | |
|     json_object_set_new(attr, "state", json_string(session_state_to_string(session->state)));
 | |
| 
 | |
|     if (session->client_dcb->user)
 | |
|     {
 | |
|         json_object_set_new(attr, CN_USER, json_string(session->client_dcb->user));
 | |
|     }
 | |
| 
 | |
|     if (session->client_dcb->remote)
 | |
|     {
 | |
|         string result_address;
 | |
|         auto remote = session->client_dcb->remote;
 | |
|         if (rdns)
 | |
|         {
 | |
|             maxbase::reverse_name_lookup(remote, &result_address);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             result_address = remote;
 | |
|         }
 | |
|         json_object_set_new(attr, "remote", json_string(result_address.c_str()));
 | |
|     }
 | |
| 
 | |
|     struct tm result;
 | |
|     char buf[60];
 | |
| 
 | |
|     asctime_r(localtime_r(&session->stats.connect, &result), buf);
 | |
|     mxb::trim(buf);
 | |
| 
 | |
|     json_object_set_new(attr, "connected", json_string(buf));
 | |
| 
 | |
|     if (session->client_dcb->state == DCB_STATE_POLLING)
 | |
|     {
 | |
|         double idle = (mxs_clock() - session->client_dcb->last_read);
 | |
|         idle = idle > 0 ? idle / 10.f : 0;
 | |
|         json_object_set_new(attr, "idle", json_real(idle));
 | |
|     }
 | |
| 
 | |
|     json_t* dcb_arr = json_array();
 | |
|     const Session* pSession = static_cast<const Session*>(session);
 | |
| 
 | |
|     for (auto d : pSession->dcb_set())
 | |
|     {
 | |
|         json_array_append_new(dcb_arr, dcb_to_json(d));
 | |
|     }
 | |
| 
 | |
|     json_object_set_new(attr, "connections", dcb_arr);
 | |
|     json_object_set_new(attr, "client", dcb_to_json(session->client_dcb));
 | |
| 
 | |
|     json_t* queries = session->queries_as_json();
 | |
|     json_object_set_new(attr, "queries", queries);
 | |
| 
 | |
|     json_t* log = session->log_as_json();
 | |
|     json_object_set_new(attr, "log", log);
 | |
| 
 | |
|     json_object_set_new(data, CN_ATTRIBUTES, attr);
 | |
|     json_object_set_new(data, CN_LINKS, mxs_json_self_link(host, CN_SESSIONS, ss.str().c_str()));
 | |
| 
 | |
|     return data;
 | |
| }
 | |
| 
 | |
| json_t* session_to_json(const MXS_SESSION* session, const char* host, bool rdns)
 | |
| {
 | |
|     stringstream ss;
 | |
|     ss << MXS_JSON_API_SESSIONS << session->ses_id;
 | |
|     const Session* s = static_cast<const Session*>(session);
 | |
|     return mxs_json_resource(host, ss.str().c_str(), session_json_data(s, host, rdns));
 | |
| }
 | |
| 
 | |
| struct SessionListData
 | |
| {
 | |
|     SessionListData(const char* host, bool rdns)
 | |
|         : json(json_array())
 | |
|         , host(host)
 | |
|         , rdns(rdns)
 | |
|     {
 | |
|     }
 | |
| 
 | |
|     json_t*     json {nullptr};
 | |
|     const char* host {nullptr};
 | |
|     bool        rdns {false};
 | |
| };
 | |
| 
 | |
| bool seslist_cb(DCB* dcb, void* data)
 | |
| {
 | |
|     if (dcb->role == DCB::Role::CLIENT)
 | |
|     {
 | |
|         SessionListData* d = (SessionListData*)data;
 | |
|         Session* session = static_cast<Session*>(dcb->session);
 | |
|         json_array_append_new(d->json, session_json_data(session, d->host, d->rdns));
 | |
|     }
 | |
| 
 | |
|     return true;
 | |
| }
 | |
| 
 | |
| json_t* session_list_to_json(const char* host, bool rdns)
 | |
| {
 | |
|     SessionListData data(host, rdns);
 | |
|     dcb_foreach(seslist_cb, &data);
 | |
|     return mxs_json_resource(host, MXS_JSON_API_SESSIONS, data.json);
 | |
| }
 | |
| 
 | |
| void session_qualify_for_pool(MXS_SESSION* session)
 | |
| {
 | |
|     session->qualifies_for_pooling = true;
 | |
| }
 | |
| 
 | |
| bool session_valid_for_pool(const MXS_SESSION* session)
 | |
| {
 | |
|     return session->qualifies_for_pooling;
 | |
| }
 | |
| 
 | |
| MXS_SESSION* session_get_current()
 | |
| {
 | |
|     DCB* dcb = dcb_get_current();
 | |
| 
 | |
|     return dcb ? dcb->session : NULL;
 | |
| }
 | |
| 
 | |
| uint64_t session_get_current_id()
 | |
| {
 | |
|     MXS_SESSION* session = session_get_current();
 | |
| 
 | |
|     return session ? session->ses_id : 0;
 | |
| }
 | |
| 
 | |
| bool session_add_variable(MXS_SESSION* session,
 | |
|                           const char* name,
 | |
|                           session_variable_handler_t handler,
 | |
|                           void* context)
 | |
| {
 | |
|     Session* pSession = static_cast<Session*>(session);
 | |
|     return pSession->add_variable(name, handler, context);
 | |
| }
 | |
| 
 | |
| char* session_set_variable_value(MXS_SESSION* session,
 | |
|                                  const char* name_begin,
 | |
|                                  const char* name_end,
 | |
|                                  const char* value_begin,
 | |
|                                  const char* value_end)
 | |
| {
 | |
|     Session* pSession = static_cast<Session*>(session);
 | |
|     return pSession->set_variable_value(name_begin, name_end, value_begin, value_end);
 | |
| }
 | |
| 
 | |
| bool session_remove_variable(MXS_SESSION* session,
 | |
|                              const char* name,
 | |
|                              void** context)
 | |
| {
 | |
|     Session* pSession = static_cast<Session*>(session);
 | |
|     return pSession->remove_variable(name, context);
 | |
| }
 | |
| 
 | |
| void session_set_response(MXS_SESSION* session, const MXS_UPSTREAM* up, GWBUF* buffer)
 | |
| {
 | |
|     // Valid arguments.
 | |
|     mxb_assert(session && up && buffer);
 | |
| 
 | |
|     // Valid state. Only one filter may terminate the execution and exactly once.
 | |
|     mxb_assert(!session->response.up.instance
 | |
|                && !session->response.up.session
 | |
|                && !session->response.buffer);
 | |
| 
 | |
|     session->response.up = *up;
 | |
|     session->response.buffer = buffer;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Delivers a provided response to the upstream filter that should
 | |
|  * receive it.
 | |
|  *
 | |
|  * @param session  The session.
 | |
|  */
 | |
| static void session_deliver_response(MXS_SESSION* session)
 | |
| {
 | |
|     MXS_FILTER* filter_instance = session->response.up.instance;
 | |
| 
 | |
|     if (filter_instance)
 | |
|     {
 | |
|         MXS_FILTER_SESSION* filter_session = session->response.up.session;
 | |
|         GWBUF* buffer = session->response.buffer;
 | |
| 
 | |
|         mxb_assert(filter_session);
 | |
|         mxb_assert(buffer);
 | |
| 
 | |
|         session->response.up.clientReply(filter_instance, filter_session, buffer);
 | |
| 
 | |
|         session->response.up.instance = NULL;
 | |
|         session->response.up.session = NULL;
 | |
|         session->response.up.clientReply = NULL;
 | |
|         session->response.buffer = NULL;
 | |
| 
 | |
|         // If some filter short-circuits the routing, then there will
 | |
|         // be no response from a server and we need to ensure that
 | |
|         // subsequent book-keeping targets the right statement.
 | |
|         static_cast<Session*>(session)->book_last_as_complete();
 | |
|     }
 | |
| 
 | |
|     mxb_assert(!session->response.up.instance);
 | |
|     mxb_assert(!session->response.up.session);
 | |
|     mxb_assert(!session->response.up.clientReply);
 | |
|     mxb_assert(!session->response.buffer);
 | |
| }
 | |
| 
 | |
| void session_set_retain_last_statements(uint32_t n)
 | |
| {
 | |
|     this_unit.retain_last_statements = n;
 | |
| }
 | |
| 
 | |
| uint32_t session_get_retain_last_statements()
 | |
| {
 | |
|     return this_unit.retain_last_statements;
 | |
| }
 | |
| 
 | |
| void session_set_dump_statements(session_dump_statements_t value)
 | |
| {
 | |
|     this_unit.dump_statements = value;
 | |
| }
 | |
| 
 | |
| session_dump_statements_t session_get_dump_statements()
 | |
| {
 | |
|     return this_unit.dump_statements;
 | |
| }
 | |
| 
 | |
| const char* session_get_dump_statements_str()
 | |
| {
 | |
|     switch (this_unit.dump_statements)
 | |
|     {
 | |
|     case SESSION_DUMP_STATEMENTS_NEVER:
 | |
|         return "never";
 | |
| 
 | |
|     case SESSION_DUMP_STATEMENTS_ON_CLOSE:
 | |
|         return "on_close";
 | |
| 
 | |
|     case SESSION_DUMP_STATEMENTS_ON_ERROR:
 | |
|         return "on_error";
 | |
| 
 | |
|     default:
 | |
|         mxb_assert(!true);
 | |
|         return "unknown";
 | |
|     }
 | |
| }
 | |
| 
 | |
| void session_retain_statement(MXS_SESSION* pSession, GWBUF* pBuffer)
 | |
| {
 | |
|     static_cast<Session*>(pSession)->retain_statement(pBuffer);
 | |
| }
 | |
| 
 | |
| void session_book_server_response(MXS_SESSION* pSession, SERVER* pServer, bool final_response)
 | |
| {
 | |
|     static_cast<Session*>(pSession)->book_server_response(pServer, final_response);
 | |
| }
 | |
| 
 | |
| void session_reset_server_bookkeeping(MXS_SESSION* pSession)
 | |
| {
 | |
|     static_cast<Session*>(pSession)->reset_server_bookkeeping();
 | |
| }
 | |
| 
 | |
| void session_dump_statements(MXS_SESSION* session)
 | |
| {
 | |
|     Session* pSession = static_cast<Session*>(session);
 | |
|     pSession->dump_statements();
 | |
| }
 | |
| 
 | |
| void session_set_session_trace(uint32_t value)
 | |
| {
 | |
|     this_unit.session_trace = value;
 | |
| }
 | |
| 
 | |
| uint32_t session_get_session_trace()
 | |
| {
 | |
|     return this_unit.session_trace;
 | |
| }
 | |
| 
 | |
| void session_append_log(MXS_SESSION* pSession, std::string log)
 | |
| {
 | |
|     {
 | |
|         static_cast<Session*>(pSession)->append_session_log(log);
 | |
|     }
 | |
| }
 | |
| 
 | |
| void session_dump_log(MXS_SESSION* pSession)
 | |
| {
 | |
|     static_cast<Session*>(pSession)->dump_session_log();
 | |
| }
 | |
| 
 | |
| class DelayedRoutingTask
 | |
| {
 | |
|     DelayedRoutingTask(const DelayedRoutingTask&) = delete;
 | |
|     DelayedRoutingTask& operator=(const DelayedRoutingTask&) = delete;
 | |
| 
 | |
| public:
 | |
|     DelayedRoutingTask(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buffer)
 | |
|         : m_session(session_get_ref(session))
 | |
|         , m_down(down)
 | |
|         , m_buffer(buffer)
 | |
|     {
 | |
|     }
 | |
| 
 | |
|     ~DelayedRoutingTask()
 | |
|     {
 | |
|         session_put_ref(m_session);
 | |
|         gwbuf_free(m_buffer);
 | |
|     }
 | |
| 
 | |
|     void execute()
 | |
|     {
 | |
|         if (m_session->state == SESSION_STATE_STARTED)
 | |
|         {
 | |
|             GWBUF* buffer = m_buffer;
 | |
|             m_buffer = NULL;
 | |
| 
 | |
|             if (m_down.routeQuery(m_down.instance, m_down.session, buffer) == 0)
 | |
|             {
 | |
|                 // Routing failed, send a hangup to the client.
 | |
|                 poll_fake_hangup_event(m_session->client_dcb);
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
| private:
 | |
|     MXS_SESSION*   m_session;
 | |
|     MXS_DOWNSTREAM m_down;
 | |
|     GWBUF*         m_buffer;
 | |
| };
 | |
| 
 | |
| static bool delayed_routing_cb(Worker::Call::action_t action, DelayedRoutingTask* task)
 | |
| {
 | |
|     if (action == Worker::Call::EXECUTE)
 | |
|     {
 | |
|         task->execute();
 | |
|     }
 | |
| 
 | |
|     delete task;
 | |
|     return false;
 | |
| }
 | |
| 
 | |
| bool session_delay_routing(MXS_SESSION* session, MXS_DOWNSTREAM down, GWBUF* buffer, int seconds)
 | |
| {
 | |
|     bool success = false;
 | |
| 
 | |
|     try
 | |
|     {
 | |
|         Worker* worker = Worker::get_current();
 | |
|         mxb_assert(worker == session->client_dcb->owner);
 | |
|         std::unique_ptr<DelayedRoutingTask> task(new DelayedRoutingTask(session, down, buffer));
 | |
| 
 | |
|         // Delay the routing for at least a millisecond
 | |
|         int32_t delay = 1 + seconds * 1000;
 | |
|         worker->delayed_call(delay, delayed_routing_cb, task.release());
 | |
| 
 | |
|         success = true;
 | |
|     }
 | |
|     catch (std::bad_alloc&)
 | |
|     {
 | |
|         MXS_OOM();
 | |
|     }
 | |
| 
 | |
|     return success;
 | |
| }
 | |
| 
 | |
| MXS_DOWNSTREAM router_as_downstream(MXS_SESSION* session)
 | |
| {
 | |
|     MXS_DOWNSTREAM head;
 | |
|     head.instance = (MXS_FILTER*)session->service->router_instance;
 | |
|     head.session = (MXS_FILTER_SESSION*)session->router_session;
 | |
|     head.routeQuery = (DOWNSTREAMFUNC)session->service->router->routeQuery;
 | |
|     return head;
 | |
| }
 | |
| 
 | |
| const char* session_get_close_reason(const MXS_SESSION* session)
 | |
| {
 | |
|     switch (session->close_reason)
 | |
|     {
 | |
|     case SESSION_CLOSE_NONE:
 | |
|         return "";
 | |
| 
 | |
|     case SESSION_CLOSE_TIMEOUT:
 | |
|         return "Timed out by MaxScale";
 | |
| 
 | |
|     case SESSION_CLOSE_HANDLEERROR_FAILED:
 | |
|         return "Router could not recover from connection errors";
 | |
| 
 | |
|     case SESSION_CLOSE_ROUTING_FAILED:
 | |
|         return "Router could not route query";
 | |
| 
 | |
|     case SESSION_CLOSE_KILLED:
 | |
|         return "Killed by another connection";
 | |
| 
 | |
|     case SESSION_CLOSE_TOO_MANY_CONNECTIONS:
 | |
|         return "Too many connections";
 | |
| 
 | |
|     default:
 | |
|         mxb_assert(!true);
 | |
|         return "Internal error";
 | |
|     }
 | |
| }
 | |
| 
 | |
| Session::Session(const SListener& listener)
 | |
|     : MXS_SESSION(listener)
 | |
| {
 | |
|     if (service->retain_last_statements != -1)      // Explicitly set for the service
 | |
|     {
 | |
|         m_retain_last_statements = service->retain_last_statements;
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         m_retain_last_statements = this_unit.retain_last_statements;
 | |
|     }
 | |
| 
 | |
|     mxb::atomic::add(&service->stats.n_current, 1, mxb::atomic::RELAXED);
 | |
|     mxb_assert(service->stats.n_current >= 0);
 | |
|     mxb::atomic::add(&service->client_count, 1, mxb::atomic::RELAXED);
 | |
| }
 | |
| 
 | |
| Session::~Session()
 | |
| {
 | |
|     if (router_session)
 | |
|     {
 | |
|         service->router->freeSession(service->router_instance, router_session);
 | |
|     }
 | |
| 
 | |
|     for (auto& f : m_filters)
 | |
|     {
 | |
|         f.filter->obj->closeSession(f.instance, f.session);
 | |
|         f.filter->obj->freeSession(f.instance, f.session);
 | |
|     }
 | |
| 
 | |
|     mxb::atomic::add(&service->stats.n_current, -1, mxb::atomic::RELAXED);
 | |
|     mxb_assert(service->stats.n_current >= 0);
 | |
|     bool should_destroy = !mxb::atomic::load(&service->active);
 | |
| 
 | |
|     if (mxb::atomic::add(&service->client_count, -1) == 1 && should_destroy)
 | |
|     {
 | |
|         // Destroy the service in the main routing worker thread
 | |
|         auto svc = static_cast<Service*>(service);
 | |
|         mxs::RoutingWorker* main_worker = mxs::RoutingWorker::get(mxs::RoutingWorker::MAIN);
 | |
|         main_worker->execute([svc]() {
 | |
|                                  service_free(svc);
 | |
|                              }, Worker::EXECUTE_AUTO);
 | |
|     }
 | |
| }
 | |
| 
 | |
| void Session::set_client_dcb(DCB* dcb)
 | |
| {
 | |
|     mxb_assert(client_dcb == nullptr);
 | |
|     mxb_assert(dcb->role == DCB::Role::CLIENT);
 | |
|     client_dcb = dcb;
 | |
| }
 | |
| 
 | |
| namespace
 | |
| {
 | |
| 
 | |
| bool get_cmd_and_stmt(GWBUF* pBuffer, const char** ppCmd, char** ppStmt, int* pLen)
 | |
| {
 | |
|     *ppCmd = nullptr;
 | |
|     *ppStmt = nullptr;
 | |
|     *pLen = 0;
 | |
| 
 | |
|     bool deallocate = false;
 | |
|     int len = gwbuf_length(pBuffer);
 | |
| 
 | |
|     if (len > MYSQL_HEADER_LEN)
 | |
|     {
 | |
|         uint8_t header[MYSQL_HEADER_LEN + 1];
 | |
|         uint8_t* pHeader = NULL;
 | |
| 
 | |
|         if (GWBUF_LENGTH(pBuffer) > MYSQL_HEADER_LEN)
 | |
|         {
 | |
|             pHeader = GWBUF_DATA(pBuffer);
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             gwbuf_copy_data(pBuffer, 0, MYSQL_HEADER_LEN + 1, header);
 | |
|             pHeader = header;
 | |
|         }
 | |
| 
 | |
|         int cmd = MYSQL_GET_COMMAND(pHeader);
 | |
| 
 | |
|         *ppCmd = STRPACKETTYPE(cmd);
 | |
| 
 | |
|         if (cmd == MXS_COM_QUERY)
 | |
|         {
 | |
|             if (GWBUF_IS_CONTIGUOUS(pBuffer))
 | |
|             {
 | |
|                 modutil_extract_SQL(pBuffer, ppStmt, pLen);
 | |
|             }
 | |
|             else
 | |
|             {
 | |
|                 *ppStmt = modutil_get_SQL(pBuffer);
 | |
|                 *pLen = strlen(*ppStmt);
 | |
|                 deallocate = true;
 | |
|             }
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return deallocate;
 | |
| }
 | |
| }
 | |
| 
 | |
| void Session::dump_statements() const
 | |
| {
 | |
|     if (m_retain_last_statements)
 | |
|     {
 | |
|         int n = m_last_queries.size();
 | |
| 
 | |
|         uint64_t id = session_get_current_id();
 | |
| 
 | |
|         if ((id != 0) && (id != ses_id))
 | |
|         {
 | |
|             MXS_WARNING("Current session is %" PRIu64 ", yet statements are dumped for %" PRIu64 ". "
 | |
|                                                                                                  "The session id in the subsequent dumped statements is the wrong one.",
 | |
|                         id,
 | |
|                         ses_id);
 | |
|         }
 | |
| 
 | |
|         for (auto i = m_last_queries.rbegin(); i != m_last_queries.rend(); ++i)
 | |
|         {
 | |
|             const QueryInfo& info = *i;
 | |
|             GWBUF* pBuffer = info.query().get();
 | |
|             timespec ts = info.time_completed();
 | |
|             struct tm* tm = localtime(&ts.tv_sec);
 | |
|             char timestamp[20];
 | |
|             strftime(timestamp, 20, "%Y-%m-%d %H:%M:%S", tm);
 | |
| 
 | |
|             const char* pCmd;
 | |
|             char* pStmt;
 | |
|             int len;
 | |
|             bool deallocate = get_cmd_and_stmt(pBuffer, &pCmd, &pStmt, &len);
 | |
| 
 | |
|             if (pStmt)
 | |
|             {
 | |
|                 if (id != 0)
 | |
|                 {
 | |
|                     MXS_NOTICE("Stmt %d(%s): %.*s", n, timestamp, len, pStmt);
 | |
|                 }
 | |
|                 else
 | |
|                 {
 | |
|                     // We are in a context where we do not have a current session, so we need to
 | |
|                     // log the session id ourselves.
 | |
| 
 | |
|                     MXS_NOTICE("(%" PRIu64 ") Stmt %d(%s): %.*s", ses_id, n, timestamp, len, pStmt);
 | |
|                 }
 | |
| 
 | |
|                 if (deallocate)
 | |
|                 {
 | |
|                     MXS_FREE(pStmt);
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             --n;
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| json_t* Session::queries_as_json() const
 | |
| {
 | |
|     json_t* pQueries = json_array();
 | |
| 
 | |
|     for (auto i = m_last_queries.rbegin(); i != m_last_queries.rend(); ++i)
 | |
|     {
 | |
|         const QueryInfo& info = *i;
 | |
| 
 | |
|         json_array_append_new(pQueries, info.as_json());
 | |
|     }
 | |
| 
 | |
|     return pQueries;
 | |
| }
 | |
| 
 | |
| json_t* Session::log_as_json() const
 | |
| {
 | |
|     json_t* pLog = json_array();
 | |
| 
 | |
|     for (const auto& i : m_log)
 | |
|     {
 | |
|         json_array_append_new(pLog, json_string(i.c_str()));
 | |
|     }
 | |
| 
 | |
|     return pLog;
 | |
| }
 | |
| 
 | |
| bool Session::setup_filters(Service* service)
 | |
| {
 | |
|     for (const auto& a : service->get_filters())
 | |
|     {
 | |
|         m_filters.emplace_back(a);
 | |
|     }
 | |
| 
 | |
|     for (auto it = m_filters.rbegin(); it != m_filters.rend(); it++)
 | |
|     {
 | |
|         MXS_DOWNSTREAM* my_head = filter_apply(it->filter, this, &head);
 | |
| 
 | |
|         if (my_head == NULL)
 | |
|         {
 | |
|             MXS_ERROR("Failed to create filter '%s' for service '%s'.\n",
 | |
|                       filter_def_get_name(it->filter.get()),
 | |
|                       service->name());
 | |
|             return false;
 | |
|         }
 | |
| 
 | |
|         it->session = my_head->session;
 | |
|         it->instance = my_head->instance;
 | |
|         head = *my_head;
 | |
|         MXS_FREE(my_head);
 | |
|     }
 | |
| 
 | |
|     for (auto it = m_filters.begin(); it != m_filters.end(); it++)
 | |
|     {
 | |
|         MXS_UPSTREAM* my_tail = filter_upstream(it->filter, it->session, &tail);
 | |
| 
 | |
|         if (my_tail == NULL)
 | |
|         {
 | |
|             MXS_ERROR("Failed to create filter '%s' for service '%s'.",
 | |
|                       filter_def_get_name(it->filter.get()),
 | |
|                       service->name());
 | |
|             return false;
 | |
|         }
 | |
| 
 | |
|         /**
 | |
|          * filter_upstream may simply return the 3 parameters if the filter has no
 | |
|          * upstream entry point. So no need to copy the contents or free tail in this case.
 | |
|          */
 | |
|         if (my_tail != &tail)
 | |
|         {
 | |
|             tail = *my_tail;
 | |
|             MXS_FREE(my_tail);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     return true;
 | |
| }
 | |
| 
 | |
| bool Session::add_variable(const char* name, session_variable_handler_t handler, void* context)
 | |
| {
 | |
|     bool added = false;
 | |
| 
 | |
|     static const char PREFIX[] = "@MAXSCALE.";
 | |
| 
 | |
|     if (strncasecmp(name, PREFIX, sizeof(PREFIX) - 1) == 0)
 | |
|     {
 | |
|         string key(name);
 | |
| 
 | |
|         std::transform(key.begin(), key.end(), key.begin(), tolower);
 | |
| 
 | |
|         if (m_variables.find(key) == m_variables.end())
 | |
|         {
 | |
|             SESSION_VARIABLE variable;
 | |
|             variable.handler = handler;
 | |
|             variable.context = context;
 | |
| 
 | |
|             m_variables.insert(std::make_pair(key, variable));
 | |
|             added = true;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             MXS_ERROR("Session variable '%s' has been added already.", name);
 | |
|         }
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         MXS_ERROR("Session variable '%s' is not of the correct format.", name);
 | |
|     }
 | |
| 
 | |
|     return added;
 | |
| }
 | |
| 
 | |
| char* Session::set_variable_value(const char* name_begin,
 | |
|                                   const char* name_end,
 | |
|                                   const char* value_begin,
 | |
|                                   const char* value_end)
 | |
| {
 | |
|     char* rv = NULL;
 | |
| 
 | |
|     string key(name_begin, name_end - name_begin);
 | |
| 
 | |
|     transform(key.begin(), key.end(), key.begin(), tolower);
 | |
| 
 | |
|     auto it = m_variables.find(key);
 | |
| 
 | |
|     if (it != m_variables.end())
 | |
|     {
 | |
|         rv = it->second.handler(it->second.context, key.c_str(), value_begin, value_end);
 | |
|     }
 | |
|     else
 | |
|     {
 | |
|         const char FORMAT[] = "Attempt to set unknown MaxScale user variable %.*s";
 | |
| 
 | |
|         int name_length = name_end - name_begin;
 | |
|         int len = snprintf(NULL, 0, FORMAT, name_length, name_begin);
 | |
| 
 | |
|         rv = static_cast<char*>(MXS_MALLOC(len + 1));
 | |
| 
 | |
|         if (rv)
 | |
|         {
 | |
|             sprintf(rv, FORMAT, name_length, name_begin);
 | |
|         }
 | |
| 
 | |
|         MXS_WARNING(FORMAT, name_length, name_begin);
 | |
|     }
 | |
| 
 | |
|     return rv;
 | |
| }
 | |
| 
 | |
| bool Session::remove_variable(const char* name, void** context)
 | |
| {
 | |
|     bool removed = false;
 | |
|     string key(name);
 | |
| 
 | |
|     transform(key.begin(), key.end(), key.begin(), toupper);
 | |
|     auto it = m_variables.find(key);
 | |
| 
 | |
|     if (it != m_variables.end())
 | |
|     {
 | |
|         if (context)
 | |
|         {
 | |
|             *context = it->second.context;
 | |
|         }
 | |
| 
 | |
|         m_variables.erase(it);
 | |
|         removed = true;
 | |
|     }
 | |
| 
 | |
|     return removed;
 | |
| }
 | |
| 
 | |
| void Session::retain_statement(GWBUF* pBuffer)
 | |
| {
 | |
|     if (m_retain_last_statements)
 | |
|     {
 | |
|         mxb_assert(m_last_queries.size() <= m_retain_last_statements);
 | |
| 
 | |
|         std::shared_ptr<GWBUF> sBuffer(gwbuf_clone(pBuffer), std::default_delete<GWBUF>());
 | |
| 
 | |
|         m_last_queries.push_front(QueryInfo(sBuffer));
 | |
| 
 | |
|         if (m_last_queries.size() > m_retain_last_statements)
 | |
|         {
 | |
|             m_last_queries.pop_back();
 | |
|         }
 | |
| 
 | |
|         if (m_last_queries.size() == 1)
 | |
|         {
 | |
|             mxb_assert(m_current_query == -1);
 | |
|             m_current_query = 0;
 | |
|         }
 | |
|         else
 | |
|         {
 | |
|             // If requests are streamed, without the response being waited for,
 | |
|             // then this may cause the index to grow past the length of the array.
 | |
|             // That's ok and is dealt with in book_server_response() and friends.
 | |
|             ++m_current_query;
 | |
|             mxb_assert(m_current_query >= 0);
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| void Session::book_server_response(SERVER* pServer, bool final_response)
 | |
| {
 | |
|     if (m_retain_last_statements && !m_last_queries.empty())
 | |
|     {
 | |
|         mxb_assert(m_current_query >= 0);
 | |
|         // If enough queries have been sent by the client, without it waiting
 | |
|         // for the responses, then at this point it may be so that the query
 | |
|         // object has been popped from the size limited queue. That's apparent
 | |
|         // by the index pointing past the end of the queue. In that case
 | |
|         // we simply ignore the result.
 | |
|         if (m_current_query < static_cast<int>(m_last_queries.size()))
 | |
|         {
 | |
|             auto i = m_last_queries.begin() + m_current_query;
 | |
|             QueryInfo& info = *i;
 | |
| 
 | |
|             mxb_assert(!info.complete());
 | |
| 
 | |
|             info.book_server_response(pServer, final_response);
 | |
|         }
 | |
| 
 | |
|         if (final_response)
 | |
|         {
 | |
|             // In case what is described in the comment above has occurred,
 | |
|             // this will eventually take the index back into the queue.
 | |
|             --m_current_query;
 | |
|             mxb_assert(m_current_query >= -1);
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| void Session::book_last_as_complete()
 | |
| {
 | |
|     if (m_retain_last_statements && !m_last_queries.empty())
 | |
|     {
 | |
|         mxb_assert(m_current_query >= 0);
 | |
|         // See comment in book_server_response().
 | |
|         if (m_current_query < static_cast<int>(m_last_queries.size()))
 | |
|         {
 | |
|             auto i = m_last_queries.begin() + m_current_query;
 | |
|             QueryInfo& info = *i;
 | |
| 
 | |
|             info.book_as_complete();
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| void Session::reset_server_bookkeeping()
 | |
| {
 | |
|     if (m_retain_last_statements && !m_last_queries.empty())
 | |
|     {
 | |
|         mxb_assert(m_current_query >= 0);
 | |
|         // See comment in book_server_response().
 | |
|         if (m_current_query < static_cast<int>(m_last_queries.size()))
 | |
|         {
 | |
|             auto i = m_last_queries.begin() + m_current_query;
 | |
|             QueryInfo& info = *i;
 | |
|             info.reset_server_bookkeeping();
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| Session::QueryInfo::QueryInfo(const std::shared_ptr<GWBUF>& sQuery)
 | |
|     : m_sQuery(sQuery)
 | |
| {
 | |
|     clock_gettime(CLOCK_REALTIME_COARSE, &m_received);
 | |
|     m_completed.tv_sec = 0;
 | |
|     m_completed.tv_nsec = 0;
 | |
| }
 | |
| 
 | |
| namespace
 | |
| {
 | |
| 
 | |
| static const char ISO_TEMPLATE[] = "2018-11-05T16:47:49.123";
 | |
| static const int ISO_TIME_LEN = sizeof(ISO_TEMPLATE) - 1;
 | |
| 
 | |
| void timespec_to_iso(char* zIso, const timespec& ts)
 | |
| {
 | |
|     tm tm;
 | |
|     localtime_r(&ts.tv_sec, &tm);
 | |
| 
 | |
|     size_t i = strftime(zIso, ISO_TIME_LEN + 1, "%G-%m-%dT%H:%M:%S", &tm);
 | |
|     mxb_assert(i == 19);
 | |
|     long int ms = ts.tv_nsec / 1000000;
 | |
|     i = sprintf(zIso + i, ".%03ld", ts.tv_nsec / 1000000);
 | |
|     mxb_assert(i == 4);
 | |
| }
 | |
| }
 | |
| 
 | |
| json_t* Session::QueryInfo::as_json() const
 | |
| {
 | |
|     json_t* pQuery = json_object();
 | |
| 
 | |
|     const char* pCmd;
 | |
|     char* pStmt;
 | |
|     int len;
 | |
|     bool deallocate = get_cmd_and_stmt(m_sQuery.get(), &pCmd, &pStmt, &len);
 | |
| 
 | |
|     if (pCmd)
 | |
|     {
 | |
|         json_object_set_new(pQuery, "command", json_string(pCmd));
 | |
|     }
 | |
| 
 | |
|     if (pStmt)
 | |
|     {
 | |
|         json_object_set_new(pQuery, "statement", json_stringn(pStmt, len));
 | |
| 
 | |
|         if (deallocate)
 | |
|         {
 | |
|             MXS_FREE(pStmt);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     char iso_time[ISO_TIME_LEN + 1];
 | |
| 
 | |
|     timespec_to_iso(iso_time, m_received);
 | |
|     json_object_set_new(pQuery, "received", json_stringn(iso_time, ISO_TIME_LEN));
 | |
| 
 | |
|     if (m_complete)
 | |
|     {
 | |
|         timespec_to_iso(iso_time, m_completed);
 | |
|         json_object_set_new(pQuery, "completed", json_stringn(iso_time, ISO_TIME_LEN));
 | |
|     }
 | |
| 
 | |
|     json_t* pResponses = json_array();
 | |
| 
 | |
|     for (auto& info : m_server_infos)
 | |
|     {
 | |
|         json_t* pResponse = json_object();
 | |
| 
 | |
|         // Calculate and report in milliseconds.
 | |
|         long int received = m_received.tv_sec * 1000 + m_received.tv_nsec / 1000000;
 | |
|         long int processed = info.processed.tv_sec * 1000 + info.processed.tv_nsec / 1000000;
 | |
|         mxb_assert(processed >= received);
 | |
| 
 | |
|         long int duration = processed - received;
 | |
| 
 | |
|         json_object_set_new(pResponse, "server", json_string(info.pServer->name()));
 | |
|         json_object_set_new(pResponse, "duration", json_integer(duration));
 | |
| 
 | |
|         json_array_append_new(pResponses, pResponse);
 | |
|     }
 | |
| 
 | |
|     json_object_set_new(pQuery, "responses", pResponses);
 | |
| 
 | |
|     return pQuery;
 | |
| }
 | |
| 
 | |
| void Session::QueryInfo::book_server_response(SERVER* pServer, bool final_response)
 | |
| {
 | |
|     // If the information has been completed, no more information may be provided.
 | |
|     mxb_assert(!m_complete);
 | |
|     // A particular server may be reported only exactly once.
 | |
|     mxb_assert(find_if(m_server_infos.begin(), m_server_infos.end(), [pServer](const ServerInfo& info) {
 | |
|                            return info.pServer == pServer;
 | |
|                        }) == m_server_infos.end());
 | |
| 
 | |
|     timespec now;
 | |
|     clock_gettime(CLOCK_REALTIME_COARSE, &now);
 | |
| 
 | |
|     m_server_infos.push_back(ServerInfo {pServer, now});
 | |
| 
 | |
|     m_complete = final_response;
 | |
| 
 | |
|     if (m_complete)
 | |
|     {
 | |
|         m_completed = now;
 | |
|     }
 | |
| }
 | |
| 
 | |
| void Session::QueryInfo::book_as_complete()
 | |
| {
 | |
|     timespec now;
 | |
|     clock_gettime(CLOCK_REALTIME_COARSE, &m_completed);
 | |
|     m_complete = true;
 | |
| }
 | |
| 
 | |
| void Session::QueryInfo::reset_server_bookkeeping()
 | |
| {
 | |
|     m_server_infos.clear();
 | |
|     m_completed.tv_sec = 0;
 | |
|     m_completed.tv_nsec = 0;
 | |
|     m_complete = false;
 | |
| }
 | |
| 
 | |
| void Session::append_session_log(std::string log)
 | |
| {
 | |
|     m_log.push_front(log);
 | |
| 
 | |
|     if (m_log.size() >= this_unit.session_trace)
 | |
|     {
 | |
|         m_log.pop_back();
 | |
|     }
 | |
| }
 | |
| 
 | |
| void Session::dump_session_log()
 | |
| {
 | |
|     if (!(m_log.empty()))
 | |
|     {
 | |
|         std::string log;
 | |
| 
 | |
|         for (const auto& s : m_log)
 | |
|         {
 | |
|             log += s;
 | |
|         }
 | |
| 
 | |
|         MXS_NOTICE("Session log for session (%" PRIu64 "): \n%s ", ses_id, log.c_str());
 | |
|     }
 | |
| }
 | 
