From 5dc49a59bea5f156df9d1f053dd415cd9e968bf6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Fri, 5 May 2017 19:28:44 +0300 Subject: [PATCH 1/4] Fix build failures on CentOS 6 The older C++ compiler doesn't support struct initialization with explicit values. In addition to this, fixed a few other warnings that caused errors. --- server/core/atomic.cc | 8 ++++---- server/core/messagequeue.cc | 2 +- server/core/test/testconfig.cc | 9 ++++++--- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/server/core/atomic.cc b/server/core/atomic.cc index 979756555..afff9c25d 100644 --- a/server/core/atomic.cc +++ b/server/core/atomic.cc @@ -94,7 +94,7 @@ void atomic_store_int32(int *variable, int value) #ifdef MXS_USE_ATOMIC_BUILTINS __atomic_store_n(variable, value, __ATOMIC_SEQ_CST); #else - __sync_lock_test_and_set(variable, value); + (void)__sync_lock_test_and_set(variable, value); #endif } @@ -103,7 +103,7 @@ void atomic_store_int64(int64_t *variable, int64_t value) #ifdef MXS_USE_ATOMIC_BUILTINS __atomic_store_n(variable, value, __ATOMIC_SEQ_CST); #else - __sync_lock_test_and_set(variable, value); + (void)__sync_lock_test_and_set(variable, value); #endif } @@ -112,7 +112,7 @@ void atomic_store_uint64(uint64_t *variable, uint64_t value) #ifdef MXS_USE_ATOMIC_BUILTINS __atomic_store_n(variable, value, __ATOMIC_SEQ_CST); #else - __sync_lock_test_and_set(variable, value); + (void)__sync_lock_test_and_set(variable, value); #endif } @@ -121,6 +121,6 @@ void atomic_store_ptr(void **variable, void *value) #ifdef MXS_USE_ATOMIC_BUILTINS __atomic_store_n(variable, value, __ATOMIC_SEQ_CST); #else - __sync_lock_test_and_set(variable, value); + (void)__sync_lock_test_and_set(variable, value); #endif } diff --git a/server/core/messagequeue.cc b/server/core/messagequeue.cc index c6680c72a..32ba5c46a 100644 --- a/server/core/messagequeue.cc +++ b/server/core/messagequeue.cc @@ -25,7 +25,7 @@ namespace { -struct +static struct { bool initialized; int pipe_flags; diff --git a/server/core/test/testconfig.cc b/server/core/test/testconfig.cc index 0448ec024..69370c4fc 100644 --- a/server/core/test/testconfig.cc +++ b/server/core/test/testconfig.cc @@ -43,7 +43,8 @@ int test_validity() {MXS_END_MODULE_PARAMS} }; - CONFIG_CONTEXT ctx = {.object = (char*)""}; + CONFIG_CONTEXT ctx = {}; + ctx.object = (char*)""; /** Int parameter */ TEST(config_param_is_valid(params, "p1", "1", &ctx)); @@ -90,7 +91,8 @@ int test_validity() TEST(!config_param_is_valid(params, "p6", "This is not a valid path", &ctx)); /** Service parameter */ - CONFIG_CONTEXT svc = {.object = (char*)"test-service"}; + CONFIG_CONTEXT svc = {}; + svc.object = (char*)"test-service"; ctx.next = &svc; config_add_param(&svc, "type", "service"); TEST(config_param_is_valid(params, "p7", "test-service", &ctx)); @@ -193,7 +195,8 @@ int test_required_parameters() {MXS_END_MODULE_PARAMS} }; - CONFIG_CONTEXT ctx = {.object = (char*)""}; + CONFIG_CONTEXT ctx = {}; + ctx.object = (char*)""; TEST(missing_required_parameters(params, ctx.parameters)); config_add_defaults(&ctx, params); From f66620c89ca70f111b954c37c3a326dee68adc0a Mon Sep 17 00:00:00 2001 From: Johan Wikman Date: Thu, 4 May 2017 15:00:53 +0300 Subject: [PATCH 2/4] Accept auto_ptr where T is derived type Without the member template it is not possible to pass an auto_ptr instantiated with a derived type to post() or broadcast(). The reason is that the conversion constructor and conversion operator of auto_ptr are equally good for that purpose, and hence the compilation ends with an error. --- server/core/maxscale/worker.hh | 12 ++++++++++++ server/core/maxscale/workertask.hh | 11 ++++++----- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/server/core/maxscale/worker.hh b/server/core/maxscale/worker.hh index e1822d498..b9db99198 100644 --- a/server/core/maxscale/worker.hh +++ b/server/core/maxscale/worker.hh @@ -298,6 +298,12 @@ public: */ bool post(std::auto_ptr sTask, enum execute_mode_t mode = EXECUTE_AUTO); + template + bool post(std::auto_ptr sTask, enum execute_mode_t mode = EXECUTE_AUTO) + { + return post(std::auto_ptr(sTask.release()), mode); + } + /** * Posts a task to all workers for execution. * @@ -331,6 +337,12 @@ public: */ static size_t broadcast(std::auto_ptr sTask); + template + static size_t broadcast(std::auto_ptr sTask) + { + return broadcast(std::auto_ptr(sTask.release())); + } + /** * Executes a task on all workers in serial mode (the task is executed * on at most one worker thread at a time). When the function returns diff --git a/server/core/maxscale/workertask.hh b/server/core/maxscale/workertask.hh index ee05aa620..408533d39 100644 --- a/server/core/maxscale/workertask.hh +++ b/server/core/maxscale/workertask.hh @@ -49,17 +49,18 @@ public: */ class WorkerDisposableTask { +public: + /** + * Destructor + */ + virtual ~WorkerDisposableTask(); + protected: /** * Constructor */ WorkerDisposableTask(); - /** - * Destructor - */ - virtual ~WorkerDisposableTask(); - /** * @brief Called in the context of a specific worker. * From 17f6e94cba7bde7dd42fa88c1e758fb76bd24672 Mon Sep 17 00:00:00 2001 From: Esa Korhonen Date: Tue, 2 May 2017 10:05:19 +0300 Subject: [PATCH 3/4] KILL [CONNECTION | QUERY] support, part2 MySQL sessions are added to a hasmap when created, removed when closed. MYSQL_COM_PROCESS_KILL is now detected, the thread_id is read and the kill command sent to all worker threads to find the correct session. If found, a fake hangup even is created for the client dcb. As is, this function is of little use since the client could just disconnect itself instead. Later on, additional commands of this nature will be added. --- include/maxscale/session.h | 8 ++ include/maxscale/worker.h | 23 ++- server/core/maxscale/worker.hh | 25 ++++ server/core/session.cc | 60 ++++++++ server/core/worker.cc | 59 ++++++++ .../protocol/MySQL/MySQLClient/mysql_client.c | 132 ++++++++++-------- 6 files changed, 247 insertions(+), 60 deletions(-) diff --git a/include/maxscale/session.h b/include/maxscale/session.h index e8746d8fe..162ccec97 100644 --- a/include/maxscale/session.h +++ b/include/maxscale/session.h @@ -413,6 +413,14 @@ bool session_take_stmt(MXS_SESSION *session, GWBUF **buffer, const struct server */ void session_clear_stmt(MXS_SESSION *session); +/** + * Try to kill a specific session. This function only sends messages to + * worker threads without waiting for the result. + * @param issuer The session where the command originates. + * @param target_id Target session id. + */ +void session_broadcast_kill_command(MXS_SESSION* issuer, uint32_t target_id); + /** * @brief Convert a session to JSON * diff --git a/include/maxscale/worker.h b/include/maxscale/worker.h index 9c0619afe..f323e0912 100644 --- a/include/maxscale/worker.h +++ b/include/maxscale/worker.h @@ -116,9 +116,26 @@ bool mxs_worker_post_message(MXS_WORKER* worker, uint32_t msg_id, intptr_t arg1, */ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); -// These automatically act on the currently executing worker thread. Not implemented yet. -void mxs_add_to_session_map(uint32_t id, MXS_SESSION* session); -void mxs_remove_from_session_map(uint32_t id); +/** + * Add a session to the current worker's session map. + * @param id With which id to add. Typically session->ses_id. + * @param session Session to add. + * @return true if successful, false if id already existed in map. + */ +bool mxs_add_to_session_map(uint32_t id, MXS_SESSION* session); + +/** + * Remove a session from the current worker's session map. + * @param id Which id to remove. + * @return The removed session or NULL if not found. + */ +MXS_SESSION* mxs_remove_from_session_map(uint32_t id); + +/** + * Find a session in the current worker's session map. + * @param id Which id to find. + * @return The found session or NULL if not found. + */ MXS_SESSION* mxs_find_in_session_map(uint32_t id); MXS_END_DECLS diff --git a/server/core/maxscale/worker.hh b/server/core/maxscale/worker.hh index b9db99198..c3060fc15 100644 --- a/server/core/maxscale/worker.hh +++ b/server/core/maxscale/worker.hh @@ -386,6 +386,31 @@ public: */ bool post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); + /** + * Add a session to the sessions hashmap + * + * @param id Session id, must be unique + * @param session The session to add + * @return true if successful + */ + bool add_to_session_map(SessionsById::key_type id, SessionsById::mapped_type session); + + /** + * Remove a session from the sessions hashmap + * + * @param id Session id + * @return The removed session, or NULL if not found + */ + SessionsById::mapped_type remove_from_session_map(SessionsById::key_type id); + + /** + * Find a session in the sessions hashmap + * + * @param id Session id + * @return The found session, or NULL if not found + */ + SessionsById::mapped_type find_in_session_map(SessionsById::key_type id); + /** * Broadcast a message to all worker. * diff --git a/server/core/session.cc b/server/core/session.cc index 5793ce78a..ec6937d83 100644 --- a/server/core/session.cc +++ b/server/core/session.cc @@ -50,6 +50,8 @@ #include "maxscale/session.h" #include "maxscale/filter.h" +#include "maxscale/worker.hh" +#include "maxscale/workertask.hh" using std::string; @@ -68,6 +70,41 @@ static MXS_SESSION *session_find_free(); static void session_final_free(MXS_SESSION *session); static MXS_SESSION* session_alloc_body(SERVICE* service, DCB* client_dcb, MXS_SESSION* session); + +namespace +{ + +class KillCmdTask : public maxscale::Worker::DisposableTask +{ +private: + std::string m_issuer_username; + std::string m_issuer_host; + uint64_t m_target_id; + +public: + KillCmdTask(MXS_SESSION* issuer, uint64_t target_id) + { + DCB* issuer_dcb = issuer->client_dcb; + m_issuer_username.assign(issuer_dcb->user); + m_issuer_host.assign(issuer_dcb->remote); + m_target_id = target_id; + } + void execute(maxscale::Worker& worker) + { + MXS_SESSION* target = worker.find_in_session_map(m_target_id); + if (target) + { + DCB* target_dcb = target->client_dcb; + if ((strcmp(m_issuer_username.c_str(), target_dcb->user) == 0) && + (strcmp(m_issuer_host.c_str(), target_dcb->remote) == 0)) + { + poll_fake_hangup_event(target_dcb); + } + } + } +}; +} + /** * The clientReply of the session. * @@ -999,6 +1036,29 @@ uint32_t session_get_next_id() return atomic_add_uint32(&next_session_id, 1); } +void session_broadcast_kill_command(MXS_SESSION* issuer, uint32_t target_id) +{ + /* First, check if the target id belongs to the current worker. If it does, + * send hangup event. Otherwise, use a worker task to send a message to all + * workers. + */ + MXS_SESSION* target_ses = mxs_find_in_session_map(target_id); + if (target_ses) + { + if ((strcmp(issuer->client_dcb->user, target_ses->client_dcb->user) == 0) && + (strcmp(issuer->client_dcb->remote, target_ses->client_dcb->remote) == 0)) + { + poll_fake_hangup_event(target_ses->client_dcb); + } + } + else + { + KillCmdTask* kill_task = new KillCmdTask(issuer, target_id); + std::auto_ptr sTask(kill_task); + maxscale::Worker::broadcast(sTask); + } +} + json_t* session_to_json(const MXS_SESSION *session, const char *host) { json_t* rval = json_object(); diff --git a/server/core/worker.cc b/server/core/worker.cc index 7d8e6444f..e551c2774 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -712,6 +712,65 @@ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg return Worker::broadcast_message(msg_id, arg1, arg2); } +bool mxs_add_to_session_map(uint32_t id, MXS_SESSION* session) +{ + bool rval = false; + Worker* worker = Worker::get_current(); + if (worker) + { + rval = worker->add_to_session_map(id, session); + } + return rval; +} + +MXS_SESSION* mxs_remove_from_session_map(uint32_t id) +{ + MXS_SESSION* rval = NULL; + Worker* worker = Worker::get_current(); + if (worker) + { + rval = worker->remove_from_session_map(id); + } + return rval; +} + +MXS_SESSION* mxs_find_in_session_map(uint32_t id) +{ + MXS_SESSION* rval = NULL; + Worker* worker = Worker::get_current(); + if (worker) + { + rval = worker->find_in_session_map(id); + } + return rval; +} + +bool Worker::add_to_session_map(SessionsById::key_type id, SessionsById::mapped_type session) +{ + return m_sessions.insert(SessionsById::value_type(id, session)).second; +} + +Worker::SessionsById::mapped_type Worker::remove_from_session_map(SessionsById::key_type id) +{ + Worker::SessionsById::mapped_type rval = find_in_session_map(id); + if (rval) + { + m_sessions.erase(id); + } + return rval; +} + +Worker::SessionsById::mapped_type Worker::find_in_session_map(SessionsById::key_type id) +{ + Worker::SessionsById::mapped_type rval = NULL; + SessionsById::const_iterator iter = m_sessions.find(id); + if (iter != m_sessions.end()) + { + rval = iter->second; + } + return rval; +} + void Worker::run() { this_thread.current_worker_id = m_id; diff --git a/server/modules/protocol/MySQL/MySQLClient/mysql_client.c b/server/modules/protocol/MySQL/MySQLClient/mysql_client.c index 91cec5568..ad47e712d 100644 --- a/server/modules/protocol/MySQL/MySQLClient/mysql_client.c +++ b/server/modules/protocol/MySQL/MySQLClient/mysql_client.c @@ -12,40 +12,6 @@ * Public License. */ -/** - * @file mysql_client.c - * - * MySQL Protocol module for handling the protocol between the gateway - * and the client. - * - * Revision History - * Date Who Description - * 14/06/2013 Mark Riddoch Initial version - * 17/06/2013 Massimiliano Pinto Added Client To MaxScale routines - * 24/06/2013 Massimiliano Pinto Added: fetch passwords from service users' hashtable - * 02/09/2013 Massimiliano Pinto Added: session refcount - * 16/12/2013 Massimiliano Pinto Added: client closed socket detection with recv(..., MSG_PEEK) - * 24/02/2014 Massimiliano Pinto Added: on failed authentication a new users' table is loaded - * with time and frequency limitations - * If current user is authenticated the new users' table will - * replace the old one - * 28/02/2014 Massimiliano Pinto Added: client IPv4 in dcb->ipv4 and inet_ntop for string - * representation - * 11/03/2014 Massimiliano Pinto Added: Unix socket support - * 07/05/2014 Massimiliano Pinto Added: specific version string in server handshake - * 09/09/2014 Massimiliano Pinto Added: 777 permission for socket path - * 13/10/2014 Massimiliano Pinto Added: dbname authentication check - * 10/11/2014 Massimiliano Pinto Added: client charset added to protocol struct - * 29/05/2015 Markus Makela Added SSL support - * 11/06/2015 Martin Brampton COM_QUIT suppressed for persistent connections - * 04/09/2015 Martin Brampton Introduce DUMMY session to fulfill guarantee DCB always has session - * 09/09/2015 Martin Brampton Modify error handler calls - * 11/01/2016 Martin Brampton Remove SSL write code, now handled at lower level; - * replace gwbuf_consume by gwbuf_free (multiple). - * 07/02/2016 Martin Brampton Split off authentication and SSL. - * 31/05/2016 Martin Brampton Implement connection throttling - */ - #define MXS_MODULE_NAME "MySQLClient" #include @@ -61,6 +27,7 @@ #include #include #include +#include static int process_init(void); static void process_finish(void); @@ -85,12 +52,12 @@ static int gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read); static int gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities); static bool ensure_complete_packet(DCB *dcb, GWBUF **read_buffer, int nbytes_read); static void gw_process_one_new_client(DCB *client_dcb); +static bool process_special_commands(DCB* client_dcb, GWBUF *read_buffer, int nbytes_read); /* * The "module object" for the mysqld client protocol module. */ - /** * The module entry point routine. It is this routine that * must populate the structure that is referred to as the @@ -709,6 +676,8 @@ gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read) ss_dassert(session->state != SESSION_STATE_ALLOC && session->state != SESSION_STATE_DUMMY); protocol->protocol_auth_state = MXS_AUTH_STATE_COMPLETE; + ss_debug(bool check =) mxs_add_to_session_map(session->ses_id, session); + ss_dassert(check); mxs_mysql_send_ok(dcb, next_sequence, 0, NULL); } else @@ -933,33 +902,14 @@ gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read) if (nbytes_read < 3 || nbytes_read < (MYSQL_GET_PAYLOAD_LEN((uint8_t *) GWBUF_DATA(read_buffer)) + 4)) { - dcb->dcb_readqueue = read_buffer; - return 0; } } - /** - * Handle COM_SET_OPTION. This seems to be only used by some versions of PHP. - * - * The option is stored as a two byte integer with the values 0 for enabling - * multi-statements and 1 for disabling it. - */ - MySQLProtocol *proto = dcb->protocol; - uint8_t opt; - - if (proto->current_command == MYSQL_COM_SET_OPTION && - gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 2, 1, &opt)) + if (!process_special_commands(dcb, read_buffer, nbytes_read)) { - if (opt) - { - proto->client_capabilities &= ~GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS; - } - else - { - proto->client_capabilities |= GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS; - } + return 0; } return gw_read_finish_processing(dcb, read_buffer, capabilities); @@ -1307,7 +1257,10 @@ static int gw_client_close(DCB *dcb) CHK_DCB(dcb); ss_dassert(dcb->protocol); mysql_protocol_done(dcb); - session_close(dcb->session); + MXS_SESSION* target = dcb->session; + ss_debug(MXS_SESSION* removed =) mxs_remove_from_session_map(target->ses_id); + ss_dassert(removed == target); + session_close(target); return 1; } @@ -1526,3 +1479,68 @@ static bool ensure_complete_packet(DCB *dcb, GWBUF **read_buffer, int nbytes_rea return true; } + +/** + * Some SQL commands/queries need to be detected and handled by the protocol + * and MaxScale instead of being routed forward as is. + * @param dcb Client dcb + * @param read_buffer the current read buffer + * @param nbytes_read How many bytes were read + * @return true if read buffer should be sent forward to routing, false if more + * data is required or processing is complete + */ +static bool process_special_commands(DCB* dcb, GWBUF *read_buffer, int nbytes_read) +{ + /** + * Handle COM_SET_OPTION. This seems to be only used by some versions of PHP. + * + * The option is stored as a two byte integer with the values 0 for enabling + * multi-statements and 1 for disabling it. + */ + MySQLProtocol *proto = dcb->protocol; + uint8_t opt; + + if (proto->current_command == MYSQL_COM_SET_OPTION && + gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 2, 1, &opt)) + { + if (opt) + { + proto->client_capabilities &= ~GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS; + } + else + { + proto->client_capabilities |= GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS; + } + } + /** + * Handle COM_PROCESS_KILL + */ + else if((proto->current_command == MYSQL_COM_PROCESS_KILL)) + { + /* Make sure we have a complete SQL packet before trying to read the + * process id. If not, try again next time. */ + unsigned int expected_len = + MYSQL_GET_PAYLOAD_LEN((uint8_t *)GWBUF_DATA(read_buffer)) + MYSQL_HEADER_LEN; + if (gwbuf_length(read_buffer) < expected_len) + { + dcb->dcb_readqueue = read_buffer; + return false; + } + else + { + uint8_t bytes[4]; + if (gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 1, sizeof(bytes), (uint8_t*)bytes) + == sizeof(bytes)) + { + uint32_t process_id = gw_mysql_get_byte4(bytes); + // Do not send this packet for routing + gwbuf_free(read_buffer); + session_broadcast_kill_command(dcb->session, process_id); + // Even if id not found, send ok. TODO: send a correct response to client + mxs_mysql_send_ok(dcb, 1, 0, NULL); + return false; + } + } + } + return true; +} \ No newline at end of file From ee20191645c609895060eb321e3594b514200869 Mon Sep 17 00:00:00 2001 From: Esa Korhonen Date: Thu, 4 May 2017 17:51:10 +0300 Subject: [PATCH 4/4] KILL [CONNECTION | QUERY] support, part2B Various small changes to part2, as suggested by comments and otherwise. Mostly renaming, working logic should not change. Exception: session id changed to 64bit in the container and associated functions. Another commit will change it to 64bit in the session itself. --- include/maxscale/session.h | 3 +- include/maxscale/worker.h | 18 +++-- server/core/maxscale/worker.hh | 15 ++-- server/core/session.cc | 72 +++++++++++-------- server/core/worker.cc | 30 ++++---- .../protocol/MySQL/MySQLClient/mysql_client.c | 14 ++-- 6 files changed, 84 insertions(+), 68 deletions(-) diff --git a/include/maxscale/session.h b/include/maxscale/session.h index 162ccec97..03f5f4963 100644 --- a/include/maxscale/session.h +++ b/include/maxscale/session.h @@ -416,10 +416,11 @@ void session_clear_stmt(MXS_SESSION *session); /** * Try to kill a specific session. This function only sends messages to * worker threads without waiting for the result. + * * @param issuer The session where the command originates. * @param target_id Target session id. */ -void session_broadcast_kill_command(MXS_SESSION* issuer, uint32_t target_id); +void session_broadcast_kill_command(MXS_SESSION* issuer, uint64_t target_id); /** * @brief Convert a session to JSON diff --git a/include/maxscale/worker.h b/include/maxscale/worker.h index f323e0912..711c5fbe4 100644 --- a/include/maxscale/worker.h +++ b/include/maxscale/worker.h @@ -117,25 +117,29 @@ bool mxs_worker_post_message(MXS_WORKER* worker, uint32_t msg_id, intptr_t arg1, size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); /** - * Add a session to the current worker's session map. - * @param id With which id to add. Typically session->ses_id. + * Add a session to the current worker's session container. Currently only + * required for some special commands e.g. "KILL " to work. + * * @param session Session to add. * @return true if successful, false if id already existed in map. */ -bool mxs_add_to_session_map(uint32_t id, MXS_SESSION* session); +bool mxs_worker_register_session(MXS_SESSION* session); /** - * Remove a session from the current worker's session map. + * Remove a session from the current worker's session container. Does not actually + * remove anything from an epoll-set or affect the session in any way. + * * @param id Which id to remove. * @return The removed session or NULL if not found. */ -MXS_SESSION* mxs_remove_from_session_map(uint32_t id); +MXS_SESSION* mxs_worker_deregister_session(uint64_t id); /** - * Find a session in the current worker's session map. + * Find a session in the current worker's session container. + * * @param id Which id to find. * @return The found session or NULL if not found. */ -MXS_SESSION* mxs_find_in_session_map(uint32_t id); +MXS_SESSION* mxs_worker_find_session(uint64_t id); MXS_END_DECLS diff --git a/server/core/maxscale/worker.hh b/server/core/maxscale/worker.hh index c3060fc15..900b5f17c 100644 --- a/server/core/maxscale/worker.hh +++ b/server/core/maxscale/worker.hh @@ -69,7 +69,7 @@ public: typedef WORKER_STATISTICS STATISTICS; typedef WorkerTask Task; typedef WorkerDisposableTask DisposableTask; - typedef std::tr1::unordered_map SessionsById; + typedef std::tr1::unordered_map SessionsById; enum state_t { @@ -387,29 +387,28 @@ public: bool post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2); /** - * Add a session to the sessions hashmap + * Add a session to the session container. * - * @param id Session id, must be unique * @param session The session to add * @return true if successful */ - bool add_to_session_map(SessionsById::key_type id, SessionsById::mapped_type session); + bool register_session(MXS_SESSION* session); /** - * Remove a session from the sessions hashmap + * Remove a session from the session container. * * @param id Session id * @return The removed session, or NULL if not found */ - SessionsById::mapped_type remove_from_session_map(SessionsById::key_type id); + MXS_SESSION* deregister_session(uint64_t id); /** - * Find a session in the sessions hashmap + * Find a session in the session container. * * @param id Session id * @return The found session, or NULL if not found */ - SessionsById::mapped_type find_in_session_map(SessionsById::key_type id); + MXS_SESSION* find_session(uint64_t id); /** * Broadcast a message to all worker. diff --git a/server/core/session.cc b/server/core/session.cc index ec6937d83..26e803452 100644 --- a/server/core/session.cc +++ b/server/core/session.cc @@ -73,35 +73,49 @@ static MXS_SESSION* session_alloc_body(SERVICE* service, DCB* client_dcb, namespace { +/** + * Checks if issuer_user@issuer_host has the privilege to kill the target session. + * Currently just checks that the user and host are the same. + * + * This function should only be called in the worker thread normally handling + * the target session, otherwise target session could be freed while function is + * running. + * + * @param issuer_user User name of command issuer + * @param issuer_host Host/ip of command issuer + * @param target Target session + * @return + */ +bool issuer_can_kill_target(const string& issuer_user, const string& issuer_host, + const MXS_SESSION* target) +{ + DCB* target_dcb = target->client_dcb; + return ((strcmp(issuer_user.c_str(), target_dcb->user) == 0) && + (strcmp(issuer_host.c_str(), target_dcb->remote) == 0)); +} class KillCmdTask : public maxscale::Worker::DisposableTask { -private: - std::string m_issuer_username; - std::string m_issuer_host; - uint64_t m_target_id; - public: KillCmdTask(MXS_SESSION* issuer, uint64_t target_id) + : m_issuer_user(issuer->client_dcb->user) + , m_issuer_host(issuer->client_dcb->remote) + , m_target_id(target_id) { - DCB* issuer_dcb = issuer->client_dcb; - m_issuer_username.assign(issuer_dcb->user); - m_issuer_host.assign(issuer_dcb->remote); - m_target_id = target_id; } + void execute(maxscale::Worker& worker) { - MXS_SESSION* target = worker.find_in_session_map(m_target_id); - if (target) + MXS_SESSION* target = worker.find_session(m_target_id); + if (target && issuer_can_kill_target(m_issuer_user, m_issuer_host, target)) { - DCB* target_dcb = target->client_dcb; - if ((strcmp(m_issuer_username.c_str(), target_dcb->user) == 0) && - (strcmp(m_issuer_host.c_str(), target_dcb->remote) == 0)) - { - poll_fake_hangup_event(target_dcb); - } + poll_fake_hangup_event(target->client_dcb); } } +private: + std::string m_issuer_user; + std::string m_issuer_host; + uint64_t m_target_id; }; } @@ -1036,26 +1050,28 @@ uint32_t session_get_next_id() return atomic_add_uint32(&next_session_id, 1); } -void session_broadcast_kill_command(MXS_SESSION* issuer, uint32_t target_id) +void session_broadcast_kill_command(MXS_SESSION* issuer, uint64_t target_id) { /* First, check if the target id belongs to the current worker. If it does, * send hangup event. Otherwise, use a worker task to send a message to all * workers. */ - MXS_SESSION* target_ses = mxs_find_in_session_map(target_id); - if (target_ses) + MXS_SESSION* target = mxs_worker_find_session(target_id); + if (target && + issuer_can_kill_target(issuer->client_dcb->user, + issuer->client_dcb->remote, + target)) { - if ((strcmp(issuer->client_dcb->user, target_ses->client_dcb->user) == 0) && - (strcmp(issuer->client_dcb->remote, target_ses->client_dcb->remote) == 0)) - { - poll_fake_hangup_event(target_ses->client_dcb); - } + poll_fake_hangup_event(target->client_dcb); } else { - KillCmdTask* kill_task = new KillCmdTask(issuer, target_id); - std::auto_ptr sTask(kill_task); - maxscale::Worker::broadcast(sTask); + KillCmdTask* kill_task = new (std::nothrow) KillCmdTask(issuer, target_id); + if (kill_task) + { + std::auto_ptr sKillTask(kill_task); + maxscale::Worker::broadcast(sKillTask); + } } } diff --git a/server/core/worker.cc b/server/core/worker.cc index e551c2774..9112a65ed 100644 --- a/server/core/worker.cc +++ b/server/core/worker.cc @@ -712,47 +712,43 @@ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg return Worker::broadcast_message(msg_id, arg1, arg2); } -bool mxs_add_to_session_map(uint32_t id, MXS_SESSION* session) +bool mxs_worker_register_session(MXS_SESSION* session) { - bool rval = false; Worker* worker = Worker::get_current(); - if (worker) - { - rval = worker->add_to_session_map(id, session); - } - return rval; + ss_dassert(worker); + return worker->register_session(session); } -MXS_SESSION* mxs_remove_from_session_map(uint32_t id) +MXS_SESSION* mxs_worker_deregister_session(uint64_t id) { MXS_SESSION* rval = NULL; Worker* worker = Worker::get_current(); if (worker) { - rval = worker->remove_from_session_map(id); + rval = worker->deregister_session(id); } return rval; } -MXS_SESSION* mxs_find_in_session_map(uint32_t id) +MXS_SESSION* mxs_worker_find_session(uint64_t id) { MXS_SESSION* rval = NULL; Worker* worker = Worker::get_current(); if (worker) { - rval = worker->find_in_session_map(id); + rval = worker->find_session(id); } return rval; } -bool Worker::add_to_session_map(SessionsById::key_type id, SessionsById::mapped_type session) +bool Worker::register_session(MXS_SESSION* session) { - return m_sessions.insert(SessionsById::value_type(id, session)).second; + return m_sessions.insert(SessionsById::value_type(session->ses_id, session)).second; } -Worker::SessionsById::mapped_type Worker::remove_from_session_map(SessionsById::key_type id) +MXS_SESSION* Worker::deregister_session(uint64_t id) { - Worker::SessionsById::mapped_type rval = find_in_session_map(id); + MXS_SESSION* rval = find_session(id); if (rval) { m_sessions.erase(id); @@ -760,9 +756,9 @@ Worker::SessionsById::mapped_type Worker::remove_from_session_map(SessionsById:: return rval; } -Worker::SessionsById::mapped_type Worker::find_in_session_map(SessionsById::key_type id) +MXS_SESSION* Worker::find_session(uint64_t id) { - Worker::SessionsById::mapped_type rval = NULL; + MXS_SESSION* rval = NULL; SessionsById::const_iterator iter = m_sessions.find(id); if (iter != m_sessions.end()) { diff --git a/server/modules/protocol/MySQL/MySQLClient/mysql_client.c b/server/modules/protocol/MySQL/MySQLClient/mysql_client.c index ad47e712d..fa684b277 100644 --- a/server/modules/protocol/MySQL/MySQLClient/mysql_client.c +++ b/server/modules/protocol/MySQL/MySQLClient/mysql_client.c @@ -668,7 +668,7 @@ gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read) * normal data handling function instead of this one. */ MXS_SESSION *session = - session_alloc_with_id(dcb->service, dcb, protocol->tid); + session_alloc_with_id(dcb->service, dcb, protocol->tid); if (session != NULL) { @@ -676,7 +676,7 @@ gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read) ss_dassert(session->state != SESSION_STATE_ALLOC && session->state != SESSION_STATE_DUMMY); protocol->protocol_auth_state = MXS_AUTH_STATE_COMPLETE; - ss_debug(bool check =) mxs_add_to_session_map(session->ses_id, session); + ss_debug(bool check = ) mxs_worker_register_session(session); ss_dassert(check); mxs_mysql_send_ok(dcb, next_sequence, 0, NULL); } @@ -1258,7 +1258,7 @@ static int gw_client_close(DCB *dcb) ss_dassert(dcb->protocol); mysql_protocol_done(dcb); MXS_SESSION* target = dcb->session; - ss_debug(MXS_SESSION* removed =) mxs_remove_from_session_map(target->ses_id); + ss_debug(MXS_SESSION* removed = ) mxs_worker_deregister_session(target->ses_id); ss_dassert(removed == target); session_close(target); return 1; @@ -1515,12 +1515,12 @@ static bool process_special_commands(DCB* dcb, GWBUF *read_buffer, int nbytes_re /** * Handle COM_PROCESS_KILL */ - else if((proto->current_command == MYSQL_COM_PROCESS_KILL)) + else if ((proto->current_command == MYSQL_COM_PROCESS_KILL)) { /* Make sure we have a complete SQL packet before trying to read the * process id. If not, try again next time. */ unsigned int expected_len = - MYSQL_GET_PAYLOAD_LEN((uint8_t *)GWBUF_DATA(read_buffer)) + MYSQL_HEADER_LEN; + MYSQL_GET_PAYLOAD_LEN((uint8_t *)GWBUF_DATA(read_buffer)) + MYSQL_HEADER_LEN; if (gwbuf_length(read_buffer) < expected_len) { dcb->dcb_readqueue = read_buffer; @@ -1530,9 +1530,9 @@ static bool process_special_commands(DCB* dcb, GWBUF *read_buffer, int nbytes_re { uint8_t bytes[4]; if (gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 1, sizeof(bytes), (uint8_t*)bytes) - == sizeof(bytes)) + == sizeof(bytes)) { - uint32_t process_id = gw_mysql_get_byte4(bytes); + uint64_t process_id = gw_mysql_get_byte4(bytes); // Do not send this packet for routing gwbuf_free(read_buffer); session_broadcast_kill_command(dcb->session, process_id);