Merge branch 'develop' into MXS-1209

This commit is contained in:
MassimilianoPinto
2017-05-09 08:16:20 +02:00
10 changed files with 295 additions and 76 deletions

View File

@ -94,7 +94,7 @@ void atomic_store_int32(int *variable, int value)
#ifdef MXS_USE_ATOMIC_BUILTINS
__atomic_store_n(variable, value, __ATOMIC_SEQ_CST);
#else
__sync_lock_test_and_set(variable, value);
(void)__sync_lock_test_and_set(variable, value);
#endif
}
@ -103,7 +103,7 @@ void atomic_store_int64(int64_t *variable, int64_t value)
#ifdef MXS_USE_ATOMIC_BUILTINS
__atomic_store_n(variable, value, __ATOMIC_SEQ_CST);
#else
__sync_lock_test_and_set(variable, value);
(void)__sync_lock_test_and_set(variable, value);
#endif
}
@ -112,7 +112,7 @@ void atomic_store_uint64(uint64_t *variable, uint64_t value)
#ifdef MXS_USE_ATOMIC_BUILTINS
__atomic_store_n(variable, value, __ATOMIC_SEQ_CST);
#else
__sync_lock_test_and_set(variable, value);
(void)__sync_lock_test_and_set(variable, value);
#endif
}
@ -121,6 +121,6 @@ void atomic_store_ptr(void **variable, void *value)
#ifdef MXS_USE_ATOMIC_BUILTINS
__atomic_store_n(variable, value, __ATOMIC_SEQ_CST);
#else
__sync_lock_test_and_set(variable, value);
(void)__sync_lock_test_and_set(variable, value);
#endif
}

View File

@ -69,7 +69,7 @@ public:
typedef WORKER_STATISTICS STATISTICS;
typedef WorkerTask Task;
typedef WorkerDisposableTask DisposableTask;
typedef std::tr1::unordered_map<uint32_t, MXS_SESSION*> SessionsById;
typedef std::tr1::unordered_map<uint64_t, MXS_SESSION*> SessionsById;
enum state_t
{
@ -298,6 +298,12 @@ public:
*/
bool post(std::auto_ptr<DisposableTask> sTask, enum execute_mode_t mode = EXECUTE_AUTO);
template<class T>
bool post(std::auto_ptr<T> sTask, enum execute_mode_t mode = EXECUTE_AUTO)
{
return post(std::auto_ptr<DisposableTask>(sTask.release()), mode);
}
/**
* Posts a task to all workers for execution.
*
@ -331,6 +337,12 @@ public:
*/
static size_t broadcast(std::auto_ptr<DisposableTask> sTask);
template<class T>
static size_t broadcast(std::auto_ptr<T> sTask)
{
return broadcast(std::auto_ptr<DisposableTask>(sTask.release()));
}
/**
* Executes a task on all workers in serial mode (the task is executed
* on at most one worker thread at a time). When the function returns
@ -374,6 +386,30 @@ public:
*/
bool post_message(uint32_t msg_id, intptr_t arg1, intptr_t arg2);
/**
* Add a session to the session container.
*
* @param session The session to add
* @return true if successful
*/
bool register_session(MXS_SESSION* session);
/**
* Remove a session from the session container.
*
* @param id Session id
* @return The removed session, or NULL if not found
*/
MXS_SESSION* deregister_session(uint64_t id);
/**
* Find a session in the session container.
*
* @param id Session id
* @return The found session, or NULL if not found
*/
MXS_SESSION* find_session(uint64_t id);
/**
* Broadcast a message to all worker.
*

View File

@ -49,17 +49,18 @@ public:
*/
class WorkerDisposableTask
{
public:
/**
* Destructor
*/
virtual ~WorkerDisposableTask();
protected:
/**
* Constructor
*/
WorkerDisposableTask();
/**
* Destructor
*/
virtual ~WorkerDisposableTask();
/**
* @brief Called in the context of a specific worker.
*

View File

@ -25,7 +25,7 @@
namespace
{
struct
static struct
{
bool initialized;
int pipe_flags;

View File

@ -50,6 +50,8 @@
#include "maxscale/session.h"
#include "maxscale/filter.h"
#include "maxscale/worker.hh"
#include "maxscale/workertask.hh"
using std::string;
@ -68,6 +70,55 @@ static MXS_SESSION *session_find_free();
static void session_final_free(MXS_SESSION *session);
static MXS_SESSION* session_alloc_body(SERVICE* service, DCB* client_dcb,
MXS_SESSION* session);
namespace
{
/**
* Checks if issuer_user@issuer_host has the privilege to kill the target session.
* Currently just checks that the user and host are the same.
*
* This function should only be called in the worker thread normally handling
* the target session, otherwise target session could be freed while function is
* running.
*
* @param issuer_user User name of command issuer
* @param issuer_host Host/ip of command issuer
* @param target Target session
* @return
*/
bool issuer_can_kill_target(const string& issuer_user, const string& issuer_host,
const MXS_SESSION* target)
{
DCB* target_dcb = target->client_dcb;
return ((strcmp(issuer_user.c_str(), target_dcb->user) == 0) &&
(strcmp(issuer_host.c_str(), target_dcb->remote) == 0));
}
class KillCmdTask : public maxscale::Worker::DisposableTask
{
public:
KillCmdTask(MXS_SESSION* issuer, uint64_t target_id)
: m_issuer_user(issuer->client_dcb->user)
, m_issuer_host(issuer->client_dcb->remote)
, m_target_id(target_id)
{
}
void execute(maxscale::Worker& worker)
{
MXS_SESSION* target = worker.find_session(m_target_id);
if (target && issuer_can_kill_target(m_issuer_user, m_issuer_host, target))
{
poll_fake_hangup_event(target->client_dcb);
}
}
private:
std::string m_issuer_user;
std::string m_issuer_host;
uint64_t m_target_id;
};
}
/**
* The clientReply of the session.
*
@ -999,6 +1050,31 @@ uint32_t session_get_next_id()
return atomic_add_uint32(&next_session_id, 1);
}
void session_broadcast_kill_command(MXS_SESSION* issuer, uint64_t target_id)
{
/* First, check if the target id belongs to the current worker. If it does,
* send hangup event. Otherwise, use a worker task to send a message to all
* workers.
*/
MXS_SESSION* target = mxs_worker_find_session(target_id);
if (target &&
issuer_can_kill_target(issuer->client_dcb->user,
issuer->client_dcb->remote,
target))
{
poll_fake_hangup_event(target->client_dcb);
}
else
{
KillCmdTask* kill_task = new (std::nothrow) KillCmdTask(issuer, target_id);
if (kill_task)
{
std::auto_ptr<KillCmdTask> sKillTask(kill_task);
maxscale::Worker::broadcast(sKillTask);
}
}
}
json_t* session_to_json(const MXS_SESSION *session, const char *host)
{
json_t* rval = json_object();

View File

@ -43,7 +43,8 @@ int test_validity()
{MXS_END_MODULE_PARAMS}
};
CONFIG_CONTEXT ctx = {.object = (char*)""};
CONFIG_CONTEXT ctx = {};
ctx.object = (char*)"";
/** Int parameter */
TEST(config_param_is_valid(params, "p1", "1", &ctx));
@ -90,7 +91,8 @@ int test_validity()
TEST(!config_param_is_valid(params, "p6", "This is not a valid path", &ctx));
/** Service parameter */
CONFIG_CONTEXT svc = {.object = (char*)"test-service"};
CONFIG_CONTEXT svc = {};
svc.object = (char*)"test-service";
ctx.next = &svc;
config_add_param(&svc, "type", "service");
TEST(config_param_is_valid(params, "p7", "test-service", &ctx));
@ -193,7 +195,8 @@ int test_required_parameters()
{MXS_END_MODULE_PARAMS}
};
CONFIG_CONTEXT ctx = {.object = (char*)""};
CONFIG_CONTEXT ctx = {};
ctx.object = (char*)"";
TEST(missing_required_parameters(params, ctx.parameters));
config_add_defaults(&ctx, params);

View File

@ -712,6 +712,61 @@ size_t mxs_worker_broadcast_message(uint32_t msg_id, intptr_t arg1, intptr_t arg
return Worker::broadcast_message(msg_id, arg1, arg2);
}
bool mxs_worker_register_session(MXS_SESSION* session)
{
Worker* worker = Worker::get_current();
ss_dassert(worker);
return worker->register_session(session);
}
MXS_SESSION* mxs_worker_deregister_session(uint64_t id)
{
MXS_SESSION* rval = NULL;
Worker* worker = Worker::get_current();
if (worker)
{
rval = worker->deregister_session(id);
}
return rval;
}
MXS_SESSION* mxs_worker_find_session(uint64_t id)
{
MXS_SESSION* rval = NULL;
Worker* worker = Worker::get_current();
if (worker)
{
rval = worker->find_session(id);
}
return rval;
}
bool Worker::register_session(MXS_SESSION* session)
{
return m_sessions.insert(SessionsById::value_type(session->ses_id, session)).second;
}
MXS_SESSION* Worker::deregister_session(uint64_t id)
{
MXS_SESSION* rval = find_session(id);
if (rval)
{
m_sessions.erase(id);
}
return rval;
}
MXS_SESSION* Worker::find_session(uint64_t id)
{
MXS_SESSION* rval = NULL;
SessionsById::const_iterator iter = m_sessions.find(id);
if (iter != m_sessions.end())
{
rval = iter->second;
}
return rval;
}
void Worker::run()
{
this_thread.current_worker_id = m_id;

View File

@ -12,40 +12,6 @@
* Public License.
*/
/**
* @file mysql_client.c
*
* MySQL Protocol module for handling the protocol between the gateway
* and the client.
*
* Revision History
* Date Who Description
* 14/06/2013 Mark Riddoch Initial version
* 17/06/2013 Massimiliano Pinto Added Client To MaxScale routines
* 24/06/2013 Massimiliano Pinto Added: fetch passwords from service users' hashtable
* 02/09/2013 Massimiliano Pinto Added: session refcount
* 16/12/2013 Massimiliano Pinto Added: client closed socket detection with recv(..., MSG_PEEK)
* 24/02/2014 Massimiliano Pinto Added: on failed authentication a new users' table is loaded
* with time and frequency limitations
* If current user is authenticated the new users' table will
* replace the old one
* 28/02/2014 Massimiliano Pinto Added: client IPv4 in dcb->ipv4 and inet_ntop for string
* representation
* 11/03/2014 Massimiliano Pinto Added: Unix socket support
* 07/05/2014 Massimiliano Pinto Added: specific version string in server handshake
* 09/09/2014 Massimiliano Pinto Added: 777 permission for socket path
* 13/10/2014 Massimiliano Pinto Added: dbname authentication check
* 10/11/2014 Massimiliano Pinto Added: client charset added to protocol struct
* 29/05/2015 Markus Makela Added SSL support
* 11/06/2015 Martin Brampton COM_QUIT suppressed for persistent connections
* 04/09/2015 Martin Brampton Introduce DUMMY session to fulfill guarantee DCB always has session
* 09/09/2015 Martin Brampton Modify error handler calls
* 11/01/2016 Martin Brampton Remove SSL write code, now handled at lower level;
* replace gwbuf_consume by gwbuf_free (multiple).
* 07/02/2016 Martin Brampton Split off authentication and SSL.
* 31/05/2016 Martin Brampton Implement connection throttling
*/
#define MXS_MODULE_NAME "MySQLClient"
#include <maxscale/protocol.h>
@ -61,6 +27,7 @@
#include <maxscale/query_classifier.h>
#include <maxscale/authenticator.h>
#include <maxscale/session.h>
#include <maxscale/worker.h>
static int process_init(void);
static void process_finish(void);
@ -85,12 +52,12 @@ static int gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read);
static int gw_read_finish_processing(DCB *dcb, GWBUF *read_buffer, uint64_t capabilities);
static bool ensure_complete_packet(DCB *dcb, GWBUF **read_buffer, int nbytes_read);
static void gw_process_one_new_client(DCB *client_dcb);
static bool process_special_commands(DCB* client_dcb, GWBUF *read_buffer, int nbytes_read);
/*
* The "module object" for the mysqld client protocol module.
*/
/**
* The module entry point routine. It is this routine that
* must populate the structure that is referred to as the
@ -701,7 +668,7 @@ gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
* normal data handling function instead of this one.
*/
MXS_SESSION *session =
session_alloc_with_id(dcb->service, dcb, protocol->tid);
session_alloc_with_id(dcb->service, dcb, protocol->tid);
if (session != NULL)
{
@ -709,6 +676,8 @@ gw_read_do_authentication(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
ss_dassert(session->state != SESSION_STATE_ALLOC &&
session->state != SESSION_STATE_DUMMY);
protocol->protocol_auth_state = MXS_AUTH_STATE_COMPLETE;
ss_debug(bool check = ) mxs_worker_register_session(session);
ss_dassert(check);
mxs_mysql_send_ok(dcb, next_sequence, 0, NULL);
}
else
@ -933,33 +902,14 @@ gw_read_normal_data(DCB *dcb, GWBUF *read_buffer, int nbytes_read)
if (nbytes_read < 3 || nbytes_read <
(MYSQL_GET_PAYLOAD_LEN((uint8_t *) GWBUF_DATA(read_buffer)) + 4))
{
dcb->dcb_readqueue = read_buffer;
return 0;
}
}
/**
* Handle COM_SET_OPTION. This seems to be only used by some versions of PHP.
*
* The option is stored as a two byte integer with the values 0 for enabling
* multi-statements and 1 for disabling it.
*/
MySQLProtocol *proto = dcb->protocol;
uint8_t opt;
if (proto->current_command == MYSQL_COM_SET_OPTION &&
gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 2, 1, &opt))
if (!process_special_commands(dcb, read_buffer, nbytes_read))
{
if (opt)
{
proto->client_capabilities &= ~GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS;
}
else
{
proto->client_capabilities |= GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS;
}
return 0;
}
return gw_read_finish_processing(dcb, read_buffer, capabilities);
@ -1307,7 +1257,10 @@ static int gw_client_close(DCB *dcb)
CHK_DCB(dcb);
ss_dassert(dcb->protocol);
mysql_protocol_done(dcb);
session_close(dcb->session);
MXS_SESSION* target = dcb->session;
ss_debug(MXS_SESSION* removed = ) mxs_worker_deregister_session(target->ses_id);
ss_dassert(removed == target);
session_close(target);
return 1;
}
@ -1526,3 +1479,68 @@ static bool ensure_complete_packet(DCB *dcb, GWBUF **read_buffer, int nbytes_rea
return true;
}
/**
* Some SQL commands/queries need to be detected and handled by the protocol
* and MaxScale instead of being routed forward as is.
* @param dcb Client dcb
* @param read_buffer the current read buffer
* @param nbytes_read How many bytes were read
* @return true if read buffer should be sent forward to routing, false if more
* data is required or processing is complete
*/
static bool process_special_commands(DCB* dcb, GWBUF *read_buffer, int nbytes_read)
{
/**
* Handle COM_SET_OPTION. This seems to be only used by some versions of PHP.
*
* The option is stored as a two byte integer with the values 0 for enabling
* multi-statements and 1 for disabling it.
*/
MySQLProtocol *proto = dcb->protocol;
uint8_t opt;
if (proto->current_command == MYSQL_COM_SET_OPTION &&
gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 2, 1, &opt))
{
if (opt)
{
proto->client_capabilities &= ~GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS;
}
else
{
proto->client_capabilities |= GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS;
}
}
/**
* Handle COM_PROCESS_KILL
*/
else if ((proto->current_command == MYSQL_COM_PROCESS_KILL))
{
/* Make sure we have a complete SQL packet before trying to read the
* process id. If not, try again next time. */
unsigned int expected_len =
MYSQL_GET_PAYLOAD_LEN((uint8_t *)GWBUF_DATA(read_buffer)) + MYSQL_HEADER_LEN;
if (gwbuf_length(read_buffer) < expected_len)
{
dcb->dcb_readqueue = read_buffer;
return false;
}
else
{
uint8_t bytes[4];
if (gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 1, sizeof(bytes), (uint8_t*)bytes)
== sizeof(bytes))
{
uint64_t process_id = gw_mysql_get_byte4(bytes);
// Do not send this packet for routing
gwbuf_free(read_buffer);
session_broadcast_kill_command(dcb->session, process_id);
// Even if id not found, send ok. TODO: send a correct response to client
mxs_mysql_send_ok(dcb, 1, 0, NULL);
return false;
}
}
}
return true;
}