KILL [CONNECTION | QUERY] support, part2
MySQL sessions are added to a hasmap when created, removed when closed. MYSQL_COM_PROCESS_KILL is now detected, the thread_id is read and the kill command sent to all worker threads to find the correct session. If found, a fake hangup even is created for the client dcb. As is, this function is of little use since the client could just disconnect itself instead. Later on, additional commands of this nature will be added.
This commit is contained in:
parent
f66620c89c
commit
17f6e94cba
@ -413,6 +413,14 @@ bool session_take_stmt(MXS_SESSION *session, GWBUF **buffer, const struct server
|
||||
*/
|
||||
void session_clear_stmt(MXS_SESSION *session);
|
||||
|
||||
/**
|
||||
* Try to kill a specific session. This function only sends messages to
|
||||
* worker threads without waiting for the result.
|
||||
* @param issuer The session where the command originates.
|
||||
* @param target_id Target session id.
|
||||
*/
|
||||
void session_broadcast_kill_command(MXS_SESSION* issuer, uint32_t target_id);
|
||||
|
||||
/**
|
||||
* @brief Convert a session to JSON
|
||||
*
|
||||
|
@ -116,9 +116,26 @@ bool mxs_worker_post_message(MXS_WORKER* worker, uint32_t msg_id, intptr_t arg1,
|
||||
*/
|
||||
size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2);
|
||||
|
||||
// These automatically act on the currently executing worker thread. Not implemented yet.
|
||||
void mxs_add_to_session_map(uint32_t id, MXS_SESSION* session);
|
||||
void mxs_remove_from_session_map(uint32_t id);
|
||||
/**
|
||||
* Add a session to the current worker's session map.
|
||||
* @param id With which id to add. Typically session->ses_id.
|
||||
* @param session Session to add.
|
||||
* @return true if successful, false if id already existed in map.
|
||||
*/
|
||||
bool mxs_add_to_session_map(uint32_t id, MXS_SESSION* session);
|
||||
|
||||
/**
|
||||
* Remove a session from the current worker's session map.
|
||||
* @param id Which id to remove.
|
||||
* @return The removed session or NULL if not found.
|
||||
*/
|
||||
MXS_SESSION* mxs_remove_from_session_map(uint32_t id);
|
||||
|
||||
/**
|
||||
* Find a session in the current worker's session map.
|
||||
* @param id Which id to find.
|
||||
* @return The found session or NULL if not found.
|
||||
*/
|
||||
MXS_SESSION* mxs_find_in_session_map(uint32_t id);
|
||||
|
||||
MXS_END_DECLS
|
||||
|
@ -386,6 +386,31 @@ public:
|
||||
*/
|
||||
bool post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2);
|
||||
|
||||
/**
|
||||
* Add a session to the sessions hashmap
|
||||
*
|
||||
* @param id Session id, must be unique
|
||||
* @param session The session to add
|
||||
* @return true if successful
|
||||
*/
|
||||
bool add_to_session_map(SessionsById::key_type id, SessionsById::mapped_type session);
|
||||
|
||||
/**
|
||||
* Remove a session from the sessions hashmap
|
||||
*
|
||||
* @param id Session id
|
||||
* @return The removed session, or NULL if not found
|
||||
*/
|
||||
SessionsById::mapped_type remove_from_session_map(SessionsById::key_type id);
|
||||
|
||||
/**
|
||||
* Find a session in the sessions hashmap
|
||||
*
|
||||
* @param id Session id
|
||||
* @return The found session, or NULL if not found
|
||||
*/
|
||||
SessionsById::mapped_type find_in_session_map(SessionsById::key_type id);
|
||||
|
||||
/**
|
||||
* Broadcast a message to all worker.
|
||||
*
|
||||
|
@ -50,6 +50,8 @@
|
||||
|
||||
#include "maxscale/session.h"
|
||||
#include "maxscale/filter.h"
|
||||
#include "maxscale/worker.hh"
|
||||
#include "maxscale/workertask.hh"
|
||||
|
||||
using std::string;
|
||||
|
||||
@ -68,6 +70,41 @@ static MXS_SESSION *session_find_free();
|
||||
static void session_final_free(MXS_SESSION *session);
|
||||
static MXS_SESSION* session_alloc_body(SERVICE* service, DCB* client_dcb,
|
||||
MXS_SESSION* session);
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
class KillCmdTask : public maxscale::Worker::DisposableTask
|
||||
{
|
||||
private:
|
||||
std::string m_issuer_username;
|
||||
std::string m_issuer_host;
|
||||
uint64_t m_target_id;
|
||||
|
||||
public:
|
||||
KillCmdTask(MXS_SESSION* issuer, uint64_t target_id)
|
||||
{
|
||||
DCB* issuer_dcb = issuer->client_dcb;
|
||||
m_issuer_username.assign(issuer_dcb->user);
|
||||
m_issuer_host.assign(issuer_dcb->remote);
|
||||
m_target_id = target_id;
|
||||
}
|
||||
void execute(maxscale::Worker& worker)
|
||||
{
|
||||
MXS_SESSION* target = worker.find_in_session_map(m_target_id);
|
||||
if (target)
|
||||
{
|
||||
DCB* target_dcb = target->client_dcb;
|
||||
if ((strcmp(m_issuer_username.c_str(), target_dcb->user) == 0) &&
|
||||
(strcmp(m_issuer_host.c_str(), target_dcb->remote) == 0))
|
||||
{
|
||||
poll_fake_hangup_event(target_dcb);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* The clientReply of the session.
|
||||
*
|
||||
@ -999,6 +1036,29 @@ uint32_t session_get_next_id()
|
||||
return atomic_add_uint32(&next_session_id, 1);
|
||||
}
|
||||
|
||||
void session_broadcast_kill_command(MXS_SESSION* issuer, uint32_t target_id)
|
||||
{
|
||||
/* First, check if the target id belongs to the current worker. If it does,
|
||||
* send hangup event. Otherwise, use a worker task to send a message to all
|
||||
* workers.
|
||||
*/
|
||||
MXS_SESSION* target_ses = mxs_find_in_session_map(target_id);
|
||||
if (target_ses)
|
||||
{
|
||||
if ((strcmp(issuer->client_dcb->user, target_ses->client_dcb->user) == 0) &&
|
||||
(strcmp(issuer->client_dcb->remote, target_ses->client_dcb->remote) == 0))
|
||||
{
|
||||
poll_fake_hangup_event(target_ses->client_dcb);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
KillCmdTask* kill_task = new KillCmdTask(issuer, target_id);
|
||||
std::auto_ptr<maxscale::Worker::DisposableTask> sTask(kill_task);
|
||||
maxscale::Worker::broadcast(sTask);
|
||||
}
|
||||
}
|
||||
|
||||
json_t* session_to_json(const MXS_SESSION *session, const char *host)
|
||||
{
|
||||
json_t* rval = json_object();
|
||||
|
@ -712,6 +712,65 @@ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg
|
||||
return Worker::broadcast_message(msg_id, arg1, arg2);
|
||||
}
|
||||
|
||||
bool mxs_add_to_session_map(uint32_t id, MXS_SESSION* session)
|
||||
{
|
||||
bool rval = false;
|
||||
Worker* worker = Worker::get_current();
|
||||
if (worker)
|
||||
{
|
||||
rval = worker->add_to_session_map(id, session);
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
MXS_SESSION* mxs_remove_from_session_map(uint32_t id)
|
||||
{
|
||||
MXS_SESSION* rval = NULL;
|
||||
Worker* worker = Worker::get_current();
|
||||
if (worker)
|
||||
{
|
||||
rval = worker->remove_from_session_map(id);
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
MXS_SESSION* mxs_find_in_session_map(uint32_t id)
|
||||
{
|
||||
MXS_SESSION* rval = NULL;
|
||||
Worker* worker = Worker::get_current();
|
||||
if (worker)
|
||||
{
|
||||
rval = worker->find_in_session_map(id);
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
bool Worker::add_to_session_map(SessionsById::key_type id, SessionsById::mapped_type session)
|
||||
{
|
||||
return m_sessions.insert(SessionsById::value_type(id, session)).second;
|
||||
}
|
||||
|
||||
Worker::SessionsById::mapped_type Worker::remove_from_session_map(SessionsById::key_type id)
|
||||
{
|
||||
Worker::SessionsById::mapped_type rval = find_in_session_map(id);
|
||||
if (rval)
|
||||
{
|
||||
m_sessions.erase(id);
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
Worker::SessionsById::mapped_type Worker::find_in_session_map(SessionsById::key_type id)
|
||||
{
|
||||
Worker::SessionsById::mapped_type rval = NULL;
|
||||
SessionsById::const_iterator iter = m_sessions.find(id);
|
||||
if (iter != m_sessions.end())
|
||||
{
|
||||
rval = iter->second;
|
||||
}
|
||||
return rval;
|
||||
}
|
||||
|
||||
void Worker::run()
|
||||
{
|
||||
this_thread.current_worker_id = m_id;
|
||||
|
@ -12,40 +12,6 @@
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* @file mysql_client.c
|
||||
*
|
||||
* MySQL Protocol module for handling the protocol between the gateway
|
||||
* and the client.
|
||||
*
|
||||
* Revision History
|
||||
* Date Who Description
|
||||
* 14/06/2013 Mark Riddoch Initial version
|
||||
* 17/06/2013 Massimiliano Pinto Added Client To MaxScale routines
|
||||
* 24/06/2013 Massimiliano Pinto Added: fetch passwords from service users' hashtable
|
||||
* 02/09/2013 Massimiliano Pinto Added: session refcount
|
||||
* 16/12/2013 Massimiliano Pinto Added: client closed socket detection with recv(..., MSG_PEEK)
|
||||
* 24/02/2014 Massimiliano Pinto Added: on failed authentication a new users' table is loaded
|
||||
* with time and frequency limitations
|
||||
* If current user is authenticated the new users' table will
|
||||
* replace the old one
|
||||
* 28/02/2014 Massimiliano Pinto Added: client IPv4 in dcb->ipv4 and inet_ntop for string
|
||||
* representation
|
||||
* 11/03/2014 Massimiliano Pinto Added: Unix socket support
|
||||
* 07/05/2014 Massimiliano Pinto Added: specific version string in server handshake
|
||||
* 09/09/2014 Massimiliano Pinto Added: 777 permission for socket path
|
||||
* 13/10/2014 Massimiliano Pinto Added: dbname authentication check
|
||||
* 10/11/2014 Massimiliano Pinto Added: client charset added to protocol struct
|
||||
* 29/05/2015 Markus Makela Added SSL support
|
||||
* 11/06/2015 Martin Brampton COM_QUIT suppressed for persistent connections
|
||||
* 04/09/2015 Martin Brampton Introduce DUMMY session to fulfill guarantee DCB always has session
|
||||
* 09/09/2015 Martin Brampton Modify error handler calls
|
||||
* 11/01/2016 Martin Brampton Remove SSL write code, now handled at lower level;
|
||||
* replace gwbuf_consume by gwbuf_free (multiple).
|
||||
* 07/02/2016 Martin Brampton Split off authentication and SSL.
|
||||
* 31/05/2016 Martin Brampton Implement connection throttling
|
||||
*/
|
||||
|
||||
#define MXS_MODULE_NAME "MySQLClient"
|
||||
|
||||
#include <maxscale/protocol.h>
|
||||
@ -61,6 +27,7 @@
|
||||
#include <maxscale/query_classifier.h>
|
||||
#include <maxscale/authenticator.h>
|
||||
#include <maxscale/session.h>
|
||||
#include <maxscale/worker.h>
|
||||
|
||||
static int process_init(void);
|
||||
static void process_finish(void);
|
||||
@ -85,12 +52,12 @@ static int gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read);
|
||||
static int gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities);
|
||||
static bool ensure_complete_packet(DCB *dcb, GWBUF **read_buffer, int nbytes_read);
|
||||
static void gw_process_one_new_client(DCB *client_dcb);
|
||||
static bool process_special_commands(DCB* client_dcb, GWBUF *read_buffer, int nbytes_read);
|
||||
|
||||
/*
|
||||
* The "module object" for the mysqld client protocol module.
|
||||
*/
|
||||
|
||||
|
||||
/**
|
||||
* The module entry point routine. It is this routine that
|
||||
* must populate the structure that is referred to as the
|
||||
@ -709,6 +676,8 @@ gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
|
||||
ss_dassert(session->state != SESSION_STATE_ALLOC &&
|
||||
session->state != SESSION_STATE_DUMMY);
|
||||
protocol->protocol_auth_state = MXS_AUTH_STATE_COMPLETE;
|
||||
ss_debug(bool check =) mxs_add_to_session_map(session->ses_id, session);
|
||||
ss_dassert(check);
|
||||
mxs_mysql_send_ok(dcb, next_sequence, 0, NULL);
|
||||
}
|
||||
else
|
||||
@ -933,33 +902,14 @@ gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
|
||||
if (nbytes_read < 3 || nbytes_read <
|
||||
(MYSQL_GET_PAYLOAD_LEN((uint8_t *) GWBUF_DATA(read_buffer)) + 4))
|
||||
{
|
||||
|
||||
dcb->dcb_readqueue = read_buffer;
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle COM_SET_OPTION. This seems to be only used by some versions of PHP.
|
||||
*
|
||||
* The option is stored as a two byte integer with the values 0 for enabling
|
||||
* multi-statements and 1 for disabling it.
|
||||
*/
|
||||
MySQLProtocol *proto = dcb->protocol;
|
||||
uint8_t opt;
|
||||
|
||||
if (proto->current_command == MYSQL_COM_SET_OPTION &&
|
||||
gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 2, 1, &opt))
|
||||
if (!process_special_commands(dcb, read_buffer, nbytes_read))
|
||||
{
|
||||
if (opt)
|
||||
{
|
||||
proto->client_capabilities &= ~GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS;
|
||||
}
|
||||
else
|
||||
{
|
||||
proto->client_capabilities |= GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
return gw_read_finish_processing(dcb, read_buffer, capabilities);
|
||||
@ -1307,7 +1257,10 @@ static int gw_client_close(DCB *dcb)
|
||||
CHK_DCB(dcb);
|
||||
ss_dassert(dcb->protocol);
|
||||
mysql_protocol_done(dcb);
|
||||
session_close(dcb->session);
|
||||
MXS_SESSION* target = dcb->session;
|
||||
ss_debug(MXS_SESSION* removed =) mxs_remove_from_session_map(target->ses_id);
|
||||
ss_dassert(removed == target);
|
||||
session_close(target);
|
||||
return 1;
|
||||
}
|
||||
|
||||
@ -1526,3 +1479,68 @@ static bool ensure_complete_packet(DCB *dcb, GWBUF **read_buffer, int nbytes_rea
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Some SQL commands/queries need to be detected and handled by the protocol
|
||||
* and MaxScale instead of being routed forward as is.
|
||||
* @param dcb Client dcb
|
||||
* @param read_buffer the current read buffer
|
||||
* @param nbytes_read How many bytes were read
|
||||
* @return true if read buffer should be sent forward to routing, false if more
|
||||
* data is required or processing is complete
|
||||
*/
|
||||
static bool process_special_commands(DCB* dcb, GWBUF *read_buffer, int nbytes_read)
|
||||
{
|
||||
/**
|
||||
* Handle COM_SET_OPTION. This seems to be only used by some versions of PHP.
|
||||
*
|
||||
* The option is stored as a two byte integer with the values 0 for enabling
|
||||
* multi-statements and 1 for disabling it.
|
||||
*/
|
||||
MySQLProtocol *proto = dcb->protocol;
|
||||
uint8_t opt;
|
||||
|
||||
if (proto->current_command == MYSQL_COM_SET_OPTION &&
|
||||
gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 2, 1, &opt))
|
||||
{
|
||||
if (opt)
|
||||
{
|
||||
proto->client_capabilities &= ~GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS;
|
||||
}
|
||||
else
|
||||
{
|
||||
proto->client_capabilities |= GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Handle COM_PROCESS_KILL
|
||||
*/
|
||||
else if((proto->current_command == MYSQL_COM_PROCESS_KILL))
|
||||
{
|
||||
/* Make sure we have a complete SQL packet before trying to read the
|
||||
* process id. If not, try again next time. */
|
||||
unsigned int expected_len =
|
||||
MYSQL_GET_PAYLOAD_LEN((uint8_t *)GWBUF_DATA(read_buffer)) + MYSQL_HEADER_LEN;
|
||||
if (gwbuf_length(read_buffer) < expected_len)
|
||||
{
|
||||
dcb->dcb_readqueue = read_buffer;
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
uint8_t bytes[4];
|
||||
if (gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 1, sizeof(bytes), (uint8_t*)bytes)
|
||||
== sizeof(bytes))
|
||||
{
|
||||
uint32_t process_id = gw_mysql_get_byte4(bytes);
|
||||
// Do not send this packet for routing
|
||||
gwbuf_free(read_buffer);
|
||||
session_broadcast_kill_command(dcb->session, process_id);
|
||||
// Even if id not found, send ok. TODO: send a correct response to client
|
||||
mxs_mysql_send_ok(dcb, 1, 0, NULL);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user