diff --git a/include/maxscale/session.h b/include/maxscale/session.h index e8746d8fe..03f5f4963 100644 --- a/include/maxscale/session.h +++ b/include/maxscale/session.h @@ -413,6 +413,15 @@ bool session_take_stmt(MXS_SESSION *session, GWBUF **buffer, const struct server */ void session_clear_stmt(MXS_SESSION *session); +/** + * Try to kill a specific session. This function only sends messages to + * worker threads without waiting for the result. + * + * @param issuer The session where the command originates. + * @param target_id Target session id. + */ +void session_broadcast_kill_command(MXS_SESSION* issuer, uint64_t target_id); + /** * @brief Convert a session to JSON * diff --git a/include/maxscale/worker.h b/include/maxscale/worker.h index 9c0619afe..711c5fbe4 100644 --- a/include/maxscale/worker.h +++ b/include/maxscale/worker.h @@ -116,9 +116,30 @@ bool mxs_worker_post_message(MXS_WORKER* worker, uint32_t msg_id, intptr_t arg1, */ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); -// These automatically act on the currently executing worker thread. Not implemented yet. -void mxs_add_to_session_map(uint32_t id, MXS_SESSION* session); -void mxs_remove_from_session_map(uint32_t id); -MXS_SESSION* mxs_find_in_session_map(uint32_t id); +/** + * Add a session to the current worker's session container. Currently only + * required for some special commands e.g. "KILL " to work. + * + * @param session Session to add. + * @return true if successful, false if id already existed in map. + */ +bool mxs_worker_register_session(MXS_SESSION* session); + +/** + * Remove a session from the current worker's session container. Does not actually + * remove anything from an epoll-set or affect the session in any way. + * + * @param id Which id to remove. + * @return The removed session or NULL if not found. + */ +MXS_SESSION* mxs_worker_deregister_session(uint64_t id); + +/** + * Find a session in the current worker's session container. + * + * @param id Which id to find. + * @return The found session or NULL if not found. + */ +MXS_SESSION* mxs_worker_find_session(uint64_t id); MXS_END_DECLS diff --git a/server/core/atomic.cc b/server/core/atomic.cc index 979756555..afff9c25d 100644 --- a/server/core/atomic.cc +++ b/server/core/atomic.cc @@ -94,7 +94,7 @@ void atomic_store_int32(int *variable, int value) #ifdef MXS_USE_ATOMIC_BUILTINS __atomic_store_n(variable, value, __ATOMIC_SEQ_CST); #else - __sync_lock_test_and_set(variable, value); + (void)__sync_lock_test_and_set(variable, value); #endif } @@ -103,7 +103,7 @@ void atomic_store_int64(int64_t *variable, int64_t value) #ifdef MXS_USE_ATOMIC_BUILTINS __atomic_store_n(variable, value, __ATOMIC_SEQ_CST); #else - __sync_lock_test_and_set(variable, value); + (void)__sync_lock_test_and_set(variable, value); #endif } @@ -112,7 +112,7 @@ void atomic_store_uint64(uint64_t *variable, uint64_t value) #ifdef MXS_USE_ATOMIC_BUILTINS __atomic_store_n(variable, value, __ATOMIC_SEQ_CST); #else - __sync_lock_test_and_set(variable, value); + (void)__sync_lock_test_and_set(variable, value); #endif } @@ -121,6 +121,6 @@ void atomic_store_ptr(void **variable, void *value) #ifdef MXS_USE_ATOMIC_BUILTINS __atomic_store_n(variable, value, __ATOMIC_SEQ_CST); #else - __sync_lock_test_and_set(variable, value); + (void)__sync_lock_test_and_set(variable, value); #endif } diff --git a/server/core/maxscale/worker.hh b/server/core/maxscale/worker.hh index e1822d498..900b5f17c 100644 --- a/server/core/maxscale/worker.hh +++ b/server/core/maxscale/worker.hh @@ -69,7 +69,7 @@ public: typedef WORKER_STATISTICS STATISTICS; typedef WorkerTask Task; typedef WorkerDisposableTask DisposableTask; - typedef std::tr1::unordered_map SessionsById; + typedef std::tr1::unordered_map SessionsById; enum state_t { @@ -298,6 +298,12 @@ public: */ bool post(std::auto_ptr sTask, enum execute_mode_t mode = EXECUTE_AUTO); + template + bool post(std::auto_ptr sTask, enum execute_mode_t mode = EXECUTE_AUTO) + { + return post(std::auto_ptr(sTask.release()), mode); + } + /** * Posts a task to all workers for execution. * @@ -331,6 +337,12 @@ public: */ static size_t broadcast(std::auto_ptr sTask); + template + static size_t broadcast(std::auto_ptr sTask) + { + return broadcast(std::auto_ptr(sTask.release())); + } + /** * Executes a task on all workers in serial mode (the task is executed * on at most one worker thread at a time). When the function returns @@ -374,6 +386,30 @@ public: */ bool post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); + /** + * Add a session to the session container. + * + * @param session The session to add + * @return true if successful + */ + bool register_session(MXS_SESSION* session); + + /** + * Remove a session from the session container. + * + * @param id Session id + * @return The removed session, or NULL if not found + */ + MXS_SESSION* deregister_session(uint64_t id); + + /** + * Find a session in the session container. + * + * @param id Session id + * @return The found session, or NULL if not found + */ + MXS_SESSION* find_session(uint64_t id); + /** * Broadcast a message to all worker. * diff --git a/server/core/maxscale/workertask.hh b/server/core/maxscale/workertask.hh index ee05aa620..408533d39 100644 --- a/server/core/maxscale/workertask.hh +++ b/server/core/maxscale/workertask.hh @@ -49,17 +49,18 @@ public: */ class WorkerDisposableTask { +public: + /** + * Destructor + */ + virtual ~WorkerDisposableTask(); + protected: /** * Constructor */ WorkerDisposableTask(); - /** - * Destructor - */ - virtual ~WorkerDisposableTask(); - /** * @brief Called in the context of a specific worker. * diff --git a/server/core/messagequeue.cc b/server/core/messagequeue.cc index c6680c72a..32ba5c46a 100644 --- a/server/core/messagequeue.cc +++ b/server/core/messagequeue.cc @@ -25,7 +25,7 @@ namespace { -struct +static struct { bool initialized; int pipe_flags; diff --git a/server/core/session.cc b/server/core/session.cc index 5793ce78a..26e803452 100644 --- a/server/core/session.cc +++ b/server/core/session.cc @@ -50,6 +50,8 @@ #include "maxscale/session.h" #include "maxscale/filter.h" +#include "maxscale/worker.hh" +#include "maxscale/workertask.hh" using std::string; @@ -68,6 +70,55 @@ static MXS_SESSION *session_find_free(); static void session_final_free(MXS_SESSION *session); static MXS_SESSION* session_alloc_body(SERVICE* service, DCB* client_dcb, MXS_SESSION* session); + +namespace +{ +/** + * Checks if issuer_user@issuer_host has the privilege to kill the target session. + * Currently just checks that the user and host are the same. + * + * This function should only be called in the worker thread normally handling + * the target session, otherwise target session could be freed while function is + * running. + * + * @param issuer_user User name of command issuer + * @param issuer_host Host/ip of command issuer + * @param target Target session + * @return + */ +bool issuer_can_kill_target(const string& issuer_user, const string& issuer_host, + const MXS_SESSION* target) +{ + DCB* target_dcb = target->client_dcb; + return ((strcmp(issuer_user.c_str(), target_dcb->user) == 0) && + (strcmp(issuer_host.c_str(), target_dcb->remote) == 0)); +} + +class KillCmdTask : public maxscale::Worker::DisposableTask +{ +public: + KillCmdTask(MXS_SESSION* issuer, uint64_t target_id) + : m_issuer_user(issuer->client_dcb->user) + , m_issuer_host(issuer->client_dcb->remote) + , m_target_id(target_id) + { + } + + void execute(maxscale::Worker& worker) + { + MXS_SESSION* target = worker.find_session(m_target_id); + if (target && issuer_can_kill_target(m_issuer_user, m_issuer_host, target)) + { + poll_fake_hangup_event(target->client_dcb); + } + } +private: + std::string m_issuer_user; + std::string m_issuer_host; + uint64_t m_target_id; +}; +} + /** * The clientReply of the session. * @@ -999,6 +1050,31 @@ uint32_t session_get_next_id() return atomic_add_uint32(&next_session_id, 1); } +void session_broadcast_kill_command(MXS_SESSION* issuer, uint64_t target_id) +{ + /* First, check if the target id belongs to the current worker. If it does, + * send hangup event. Otherwise, use a worker task to send a message to all + * workers. + */ + MXS_SESSION* target = mxs_worker_find_session(target_id); + if (target && + issuer_can_kill_target(issuer->client_dcb->user, + issuer->client_dcb->remote, + target)) + { + poll_fake_hangup_event(target->client_dcb); + } + else + { + KillCmdTask* kill_task = new (std::nothrow) KillCmdTask(issuer, target_id); + if (kill_task) + { + std::auto_ptr sKillTask(kill_task); + maxscale::Worker::broadcast(sKillTask); + } + } +} + json_t* session_to_json(const MXS_SESSION *session, const char *host) { json_t* rval = json_object(); diff --git a/server/core/test/testconfig.cc b/server/core/test/testconfig.cc index 0448ec024..69370c4fc 100644 --- a/server/core/test/testconfig.cc +++ b/server/core/test/testconfig.cc @@ -43,7 +43,8 @@ int test_validity() {MXS_END_MODULE_PARAMS} }; - CONFIG_CONTEXT ctx = {.object = (char*)""}; + CONFIG_CONTEXT ctx = {}; + ctx.object = (char*)""; /** Int parameter */ TEST(config_param_is_valid(params, "p1", "1", &ctx)); @@ -90,7 +91,8 @@ int test_validity() TEST(!config_param_is_valid(params, "p6", "This is not a valid path", &ctx)); /** Service parameter */ - CONFIG_CONTEXT svc = {.object = (char*)"test-service"}; + CONFIG_CONTEXT svc = {}; + svc.object = (char*)"test-service"; ctx.next = &svc; config_add_param(&svc, "type", "service"); TEST(config_param_is_valid(params, "p7", "test-service", &ctx)); @@ -193,7 +195,8 @@ int test_required_parameters() {MXS_END_MODULE_PARAMS} }; - CONFIG_CONTEXT ctx = {.object = (char*)""}; + CONFIG_CONTEXT ctx = {}; + ctx.object = (char*)""; TEST(missing_required_parameters(params, ctx.parameters)); config_add_defaults(&ctx, params); diff --git a/server/core/worker.cc b/server/core/worker.cc index 7d8e6444f..9112a65ed 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -712,6 +712,61 @@ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg return Worker::broadcast_message(msg_id, arg1, arg2); } +bool mxs_worker_register_session(MXS_SESSION* session) +{ + Worker* worker = Worker::get_current(); + ss_dassert(worker); + return worker->register_session(session); +} + +MXS_SESSION* mxs_worker_deregister_session(uint64_t id) +{ + MXS_SESSION* rval = NULL; + Worker* worker = Worker::get_current(); + if (worker) + { + rval = worker->deregister_session(id); + } + return rval; +} + +MXS_SESSION* mxs_worker_find_session(uint64_t id) +{ + MXS_SESSION* rval = NULL; + Worker* worker = Worker::get_current(); + if (worker) + { + rval = worker->find_session(id); + } + return rval; +} + +bool Worker::register_session(MXS_SESSION* session) +{ + return m_sessions.insert(SessionsById::value_type(session->ses_id, session)).second; +} + +MXS_SESSION* Worker::deregister_session(uint64_t id) +{ + MXS_SESSION* rval = find_session(id); + if (rval) + { + m_sessions.erase(id); + } + return rval; +} + +MXS_SESSION* Worker::find_session(uint64_t id) +{ + MXS_SESSION* rval = NULL; + SessionsById::const_iterator iter = m_sessions.find(id); + if (iter != m_sessions.end()) + { + rval = iter->second; + } + return rval; +} + void Worker::run() { this_thread.current_worker_id = m_id; diff --git a/server/modules/protocol/MySQL/MySQLClient/mysql_client.c b/server/modules/protocol/MySQL/MySQLClient/mysql_client.c index 91cec5568..fa684b277 100644 --- a/server/modules/protocol/MySQL/MySQLClient/mysql_client.c +++ b/server/modules/protocol/MySQL/MySQLClient/mysql_client.c @@ -12,40 +12,6 @@ * Public License. */ -/** - * @file mysql_client.c - * - * MySQL Protocol module for handling the protocol between the gateway - * and the client. - * - * Revision History - * Date Who Description - * 14/06/2013 Mark Riddoch Initial version - * 17/06/2013 Massimiliano Pinto Added Client To MaxScale routines - * 24/06/2013 Massimiliano Pinto Added: fetch passwords from service users' hashtable - * 02/09/2013 Massimiliano Pinto Added: session refcount - * 16/12/2013 Massimiliano Pinto Added: client closed socket detection with recv(..., MSG_PEEK) - * 24/02/2014 Massimiliano Pinto Added: on failed authentication a new users' table is loaded - * with time and frequency limitations - * If current user is authenticated the new users' table will - * replace the old one - * 28/02/2014 Massimiliano Pinto Added: client IPv4 in dcb->ipv4 and inet_ntop for string - * representation - * 11/03/2014 Massimiliano Pinto Added: Unix socket support - * 07/05/2014 Massimiliano Pinto Added: specific version string in server handshake - * 09/09/2014 Massimiliano Pinto Added: 777 permission for socket path - * 13/10/2014 Massimiliano Pinto Added: dbname authentication check - * 10/11/2014 Massimiliano Pinto Added: client charset added to protocol struct - * 29/05/2015 Markus Makela Added SSL support - * 11/06/2015 Martin Brampton COM_QUIT suppressed for persistent connections - * 04/09/2015 Martin Brampton Introduce DUMMY session to fulfill guarantee DCB always has session - * 09/09/2015 Martin Brampton Modify error handler calls - * 11/01/2016 Martin Brampton Remove SSL write code, now handled at lower level; - * replace gwbuf_consume by gwbuf_free (multiple). - * 07/02/2016 Martin Brampton Split off authentication and SSL. - * 31/05/2016 Martin Brampton Implement connection throttling - */ - #define MXS_MODULE_NAME "MySQLClient" #include @@ -61,6 +27,7 @@ #include #include #include +#include static int process_init(void); static void process_finish(void); @@ -85,12 +52,12 @@ static int gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read); static int gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities); static bool ensure_complete_packet(DCB *dcb, GWBUF **read_buffer, int nbytes_read); static void gw_process_one_new_client(DCB *client_dcb); +static bool process_special_commands(DCB* client_dcb, GWBUF *read_buffer, int nbytes_read); /* * The "module object" for the mysqld client protocol module. */ - /** * The module entry point routine. It is this routine that * must populate the structure that is referred to as the @@ -701,7 +668,7 @@ gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read) * normal data handling function instead of this one. */ MXS_SESSION *session = - session_alloc_with_id(dcb->service, dcb, protocol->tid); + session_alloc_with_id(dcb->service, dcb, protocol->tid); if (session != NULL) { @@ -709,6 +676,8 @@ gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read) ss_dassert(session->state != SESSION_STATE_ALLOC && session->state != SESSION_STATE_DUMMY); protocol->protocol_auth_state = MXS_AUTH_STATE_COMPLETE; + ss_debug(bool check = ) mxs_worker_register_session(session); + ss_dassert(check); mxs_mysql_send_ok(dcb, next_sequence, 0, NULL); } else @@ -933,33 +902,14 @@ gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read) if (nbytes_read < 3 || nbytes_read < (MYSQL_GET_PAYLOAD_LEN((uint8_t *) GWBUF_DATA(read_buffer)) + 4)) { - dcb->dcb_readqueue = read_buffer; - return 0; } } - /** - * Handle COM_SET_OPTION. This seems to be only used by some versions of PHP. - * - * The option is stored as a two byte integer with the values 0 for enabling - * multi-statements and 1 for disabling it. - */ - MySQLProtocol *proto = dcb->protocol; - uint8_t opt; - - if (proto->current_command == MYSQL_COM_SET_OPTION && - gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 2, 1, &opt)) + if (!process_special_commands(dcb, read_buffer, nbytes_read)) { - if (opt) - { - proto->client_capabilities &= ~GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS; - } - else - { - proto->client_capabilities |= GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS; - } + return 0; } return gw_read_finish_processing(dcb, read_buffer, capabilities); @@ -1307,7 +1257,10 @@ static int gw_client_close(DCB *dcb) CHK_DCB(dcb); ss_dassert(dcb->protocol); mysql_protocol_done(dcb); - session_close(dcb->session); + MXS_SESSION* target = dcb->session; + ss_debug(MXS_SESSION* removed = ) mxs_worker_deregister_session(target->ses_id); + ss_dassert(removed == target); + session_close(target); return 1; } @@ -1526,3 +1479,68 @@ static bool ensure_complete_packet(DCB *dcb, GWBUF **read_buffer, int nbytes_rea return true; } + +/** + * Some SQL commands/queries need to be detected and handled by the protocol + * and MaxScale instead of being routed forward as is. + * @param dcb Client dcb + * @param read_buffer the current read buffer + * @param nbytes_read How many bytes were read + * @return true if read buffer should be sent forward to routing, false if more + * data is required or processing is complete + */ +static bool process_special_commands(DCB* dcb, GWBUF *read_buffer, int nbytes_read) +{ + /** + * Handle COM_SET_OPTION. This seems to be only used by some versions of PHP. + * + * The option is stored as a two byte integer with the values 0 for enabling + * multi-statements and 1 for disabling it. + */ + MySQLProtocol *proto = dcb->protocol; + uint8_t opt; + + if (proto->current_command == MYSQL_COM_SET_OPTION && + gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 2, 1, &opt)) + { + if (opt) + { + proto->client_capabilities &= ~GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS; + } + else + { + proto->client_capabilities |= GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS; + } + } + /** + * Handle COM_PROCESS_KILL + */ + else if ((proto->current_command == MYSQL_COM_PROCESS_KILL)) + { + /* Make sure we have a complete SQL packet before trying to read the + * process id. If not, try again next time. */ + unsigned int expected_len = + MYSQL_GET_PAYLOAD_LEN((uint8_t *)GWBUF_DATA(read_buffer)) + MYSQL_HEADER_LEN; + if (gwbuf_length(read_buffer) < expected_len) + { + dcb->dcb_readqueue = read_buffer; + return false; + } + else + { + uint8_t bytes[4]; + if (gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 1, sizeof(bytes), (uint8_t*)bytes) + == sizeof(bytes)) + { + uint64_t process_id = gw_mysql_get_byte4(bytes); + // Do not send this packet for routing + gwbuf_free(read_buffer); + session_broadcast_kill_command(dcb->session, process_id); + // Even if id not found, send ok. TODO: send a correct response to client + mxs_mysql_send_ok(dcb, 1, 0, NULL); + return false; + } + } + } + return true; +} \ No newline at end of file