Send KILL commands to backends

KILL commands are now sent to the backends in an asynchronous manner. As
the LocalClient class is used to connect to the servers, this will cause
an extra connection to be created on top of the original connections
created by the session.

If the user does not have the permissions to execute the KILL, the error
message is currently lost. This could be solved by adding a "result
handler" into the LocalClient class which is called with the result.
This commit is contained in:
Markus Mäkelä
2017-09-27 16:18:36 +03:00
parent a7e610a70a
commit 4dd6842447
11 changed files with 144 additions and 154 deletions

View File

@ -38,6 +38,7 @@ public:
* @return New virtual client or NULL on error
*/
static LocalClient* create(MXS_SESSION* session, SERVICE* service);
static LocalClient* create(MXS_SESSION* session, SERVER* server);
/**
* Queue a new query for execution
@ -48,7 +49,15 @@ public:
*/
bool queue_query(GWBUF* buffer);
/**
* Destroy the client by sending a COM_QUIT to the backend
*
* @note After calling this function, object must be treated as a deleted object
*/
void self_destruct();
private:
static LocalClient* create(MXS_SESSION* session, const char* ip, uint64_t port);
LocalClient(MXS_SESSION* session, int fd);
static uint32_t poll_handler(struct mxs_poll_data* data, int wid, uint32_t events);
void process(uint32_t events);
@ -73,4 +82,5 @@ private:
std::deque<mxs::Buffer> m_queue;
MYSQL_session m_client;
MySQLProtocol m_protocol;
bool m_self_destruct;
};

View File

@ -624,4 +624,13 @@ uint32_t mxs_mysql_extract_ps_id(GWBUF* buffer);
*/
bool mxs_mysql_command_will_respond(uint8_t cmd);
/* Type of the kill-command sent by client. */
typedef enum kill_type
{
KT_CONNECTION,
KT_QUERY
} kill_type_t;
void mxs_mysql_execute_kill(MXS_SESSION* issuer, uint64_t target_id, kill_type_t type);
MXS_END_DECLS

View File

@ -414,15 +414,6 @@ bool session_take_stmt(MXS_SESSION *session, GWBUF **buffer, const struct server
*/
void session_clear_stmt(MXS_SESSION *session);
/**
* Try to kill a specific session. This function only sends messages to
* worker threads without waiting for the result.
*
* @param issuer The session where the command originates.
* @param target_id Target session id.
*/
void session_broadcast_kill_command(MXS_SESSION* issuer, uint64_t target_id);
/**
* @brief Convert a session to JSON
*

View File

@ -62,54 +62,6 @@ static void session_final_free(MXS_SESSION *session);
static MXS_SESSION* session_alloc_body(SERVICE* service, DCB* client_dcb,
MXS_SESSION* session);
namespace
{
/**
* Checks if issuer_user@issuer_host has the privilege to kill the target session.
* Currently just checks that the user and host are the same.
*
* This function should only be called in the worker thread normally handling
* the target session, otherwise target session could be freed while function is
* running.
*
* @param issuer_user User name of command issuer
* @param issuer_host Host/ip of command issuer
* @param target Target session
* @return
*/
bool issuer_can_kill_target(const string& issuer_user, const string& issuer_host,
const MXS_SESSION* target)
{
DCB* target_dcb = target->client_dcb;
return ((strcmp(issuer_user.c_str(), target_dcb->user) == 0) &&
(strcmp(issuer_host.c_str(), target_dcb->remote) == 0));
}
class KillCmdTask : public maxscale::Worker::DisposableTask
{
public:
KillCmdTask(MXS_SESSION* issuer, uint64_t target_id)
: m_issuer_user(issuer->client_dcb->user)
, m_issuer_host(issuer->client_dcb->remote)
, m_target_id(target_id)
{
}
void execute(maxscale::Worker& worker)
{
MXS_SESSION* target = worker.session_registry().lookup(m_target_id);
if (target && issuer_can_kill_target(m_issuer_user, m_issuer_host, target))
{
poll_fake_hangup_event(target->client_dcb);
}
}
private:
std::string m_issuer_user;
std::string m_issuer_host;
uint64_t m_target_id;
};
}
/**
* The clientReply of the session.
*
@ -1031,31 +983,6 @@ uint64_t session_get_next_id()
return atomic_add_uint64(&next_session_id, 1);
}
void session_broadcast_kill_command(MXS_SESSION* issuer, uint64_t target_id)
{
/* First, check if the target id belongs to the current worker. If it does,
* send hangup event. Otherwise, use a worker task to send a message to all
* workers.
*/
MXS_SESSION* target = mxs_worker_find_session(target_id);
if (target &&
issuer_can_kill_target(issuer->client_dcb->user,
issuer->client_dcb->remote,
target))
{
poll_fake_hangup_event(target->client_dcb);
}
else
{
KillCmdTask* kill_task = new (std::nothrow) KillCmdTask(issuer, target_id);
if (kill_task)
{
std::auto_ptr<KillCmdTask> sKillTask(kill_task);
maxscale::Worker::broadcast(sKillTask);
}
}
}
json_t* session_json_data(const MXS_SESSION *session, const char *host)
{
json_t* data = json_object();

View File

@ -1,4 +1,4 @@
add_library(tee SHARED tee.cc teesession.cc local_client.cc)
add_library(tee SHARED tee.cc teesession.cc)
target_link_libraries(tee maxscale-common MySQLCommon)
set_target_properties(tee PROPERTIES VERSION "1.0.0")
install_module(tee core)

View File

@ -25,7 +25,6 @@
#include <maxscale/modulecmd.h>
#include "tee.hh"
#include "local_client.hh"
#include "teesession.hh"
static const MXS_ENUM_VALUE option_values[] =

View File

@ -15,8 +15,7 @@
#include <maxscale/cppdefs.hh>
#include <maxscale/filter.hh>
#include "local_client.hh"
#include <maxscale/protocol/mariadb_client.hh>
class Tee;

View File

@ -1,4 +1,4 @@
add_library(MySQLCommon SHARED mysql_common.c)
add_library(MySQLCommon SHARED mysql_common.cc mariadb_client.cc)
target_link_libraries(MySQLCommon maxscale-common)
set_target_properties(MySQLCommon PROPERTIES VERSION "2.0.0")
install_module(MySQLCommon core)

View File

@ -47,13 +47,6 @@ typedef enum spec_com_res_t
// wait for more data.
} spec_com_res_t;
/* Type of the kill-command sent by client. */
typedef enum kill_type
{
KT_CONNECTION,
KT_QUERY
} kill_type_t;
const char WORD_KILL[] = "KILL";
static int process_init(void);
@ -1674,9 +1667,7 @@ static spec_com_res_t process_special_commands(DCB *dcb, GWBUF *read_buffer, int
== sizeof(bytes))
{
uint64_t process_id = gw_mysql_get_byte4(bytes);
session_broadcast_kill_command(dcb->session, process_id);
// Even if id not found, send ok. TODO: send a correct response to client
mxs_mysql_send_ok(dcb, 1, 0, NULL);
mxs_mysql_execute_kill(dcb->session, process_id, KT_CONNECTION);
rval = RES_END;
}
}
@ -1735,30 +1726,11 @@ spec_com_res_t handle_query_kill(DCB* dcb, GWBUF* read_buffer, spec_com_res_t cu
kill_type_t kt = KT_CONNECTION;
uint64_t thread_id = 0;
bool parsed = parse_kill_query(querybuf, &thread_id, &kt);
rval = RES_END;
if (parsed && (thread_id > 0)) // MaxScale session counter starts at 1
{
switch (kt)
{
case KT_CONNECTION:
session_broadcast_kill_command(dcb->session, thread_id);
// Even if id not found, send ok. TODO: send a correct response to client
mxs_mysql_send_ok(dcb, 1, 0, NULL);
rval = RES_END;
break;
case KT_QUERY:
// TODO: Implement this
MXS_WARNING("Received 'KILL QUERY %" PRIu64 "' from "
"the client. This feature is not supported.", thread_id);
mysql_send_custom_error(dcb, 1, 0, "'KILL QUERY <thread_id>' "
"is not supported.");
rval = RES_END;
break;
default:
ss_dassert(!true);
}
mxs_mysql_execute_kill(dcb->session, thread_id, kt);
}
}
}

View File

@ -11,17 +11,16 @@
* Public License.
*/
#include "local_client.hh"
#include <maxscale/protocol/mariadb_client.hh>
#include <maxscale/utils.h>
// TODO: Find a way to cleanly expose this
#include "../../../core/maxscale/worker.hh"
#ifdef EPOLLRDHUP
#define ERROR_EVENTS (EPOLLRDHUP | EPOLLHUP)
#define ERROR_EVENTS (EPOLLRDHUP | EPOLLHUP | EPOLLERR)
#else
#define ERROR_EVENTS EPOLLHUP
#define ERROR_EVENTS (EPOLLHUP | EPOLLERR)
#endif
static const uint32_t poll_events = EPOLLIN | EPOLLOUT | EPOLLET | ERROR_EVENTS;
@ -31,7 +30,8 @@ LocalClient::LocalClient(MXS_SESSION* session, int fd):
m_sock(fd),
m_expected_bytes(0),
m_client({}),
m_protocol({})
m_protocol({}),
m_self_destruct(false)
{
MXS_POLL_DATA::handler = LocalClient::poll_handler;
MySQLProtocol* client = (MySQLProtocol*)session->client_dcb->protocol;
@ -66,6 +66,14 @@ bool LocalClient::queue_query(GWBUF* buffer)
return my_buf != NULL;
}
void LocalClient::self_destruct()
{
GWBUF* buffer = mysql_create_com_quit(NULL, 0);
queue_query(buffer);
gwbuf_free(buffer);
m_self_destruct = true;
}
void LocalClient::close()
{
mxs::Worker* worker = mxs::Worker::get_current();
@ -135,6 +143,10 @@ void LocalClient::process(uint32_t events)
{
drain_queue();
}
else if (m_state == VC_ERROR && m_self_destruct)
{
delete this;
}
}
GWBUF* LocalClient::read_complete_packet()
@ -225,20 +237,11 @@ uint32_t LocalClient::poll_handler(struct mxs_poll_data* data, int wid, uint32_t
return 0;
}
LocalClient* LocalClient::create(MXS_SESSION* session, SERVICE* service)
LocalClient* LocalClient::create(MXS_SESSION* session, const char* ip, uint64_t port)
{
LocalClient* rval = NULL;
LISTENER_ITERATOR iter;
for (SERV_LISTENER* listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener->port > 0)
{
/** Pick the first network listener */
sockaddr_storage addr;
int fd = open_network_socket(MXS_SOCKET_NETWORK, &addr, "127.0.0.1",
service->ports->port);
int fd = open_network_socket(MXS_SOCKET_NETWORK, &addr, ip, port);
if (fd > 0 && (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) == 0 || errno == EINPROGRESS))
{
@ -265,9 +268,29 @@ LocalClient* LocalClient::create(MXS_SESSION* session, SERVICE* service)
{
::close(fd);
}
return rval;
}
LocalClient* LocalClient::create(MXS_SESSION* session, SERVICE* service)
{
LocalClient* rval = NULL;
LISTENER_ITERATOR iter;
for (SERV_LISTENER* listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener->port > 0)
{
/** Pick the first network listener */
rval = create(session, "127.0.0.1", service->ports->port);
break;
}
}
return rval;
}
LocalClient* LocalClient::create(MXS_SESSION* session, SERVER* server)
{
return create(session, server->name, server->port);
}

View File

@ -17,6 +17,9 @@
#include <netinet/tcp.h>
#include <sstream>
#include <vector>
#include <maxscale/alloc.h>
#include <maxscale/hk_heartbeat.h>
#include <maxscale/log_manager.h>
@ -24,6 +27,8 @@
#include <maxscale/mysql_utils.h>
#include <maxscale/protocol/mysql.h>
#include <maxscale/utils.h>
#include <maxscale/protocol/mariadb_client.hh>
uint8_t null_client_sha1[MYSQL_SCRAMBLE_LEN] = "";
@ -31,7 +36,7 @@ static server_command_t* server_command_init(server_command_t* srvcmd, mxs_mysql
MYSQL_session* mysql_session_alloc()
{
MYSQL_session *ses = MXS_CALLOC(1, sizeof(MYSQL_session));
MYSQL_session* ses = (MYSQL_session*)MXS_CALLOC(1, sizeof(MYSQL_session));
if (ses)
{
@ -1358,7 +1363,7 @@ mxs_auth_state_t gw_send_backend_auth(DCB *dcb)
MYSQL_session client;
gw_get_shared_session_auth_info(dcb->session->client_dcb, &client);
GWBUF* buffer = gw_generate_auth_response(&client, dcb->protocol,
GWBUF* buffer = gw_generate_auth_response(&client, (MySQLProtocol*)dcb->protocol,
with_ssl, ssl_established);
ss_dassert(buffer);
@ -1683,3 +1688,58 @@ bool mxs_mysql_command_will_respond(uint8_t cmd)
cmd != MXS_COM_STMT_CLOSE &&
cmd != MXS_COM_STMT_FETCH;
}
typedef std::vector< std::pair<SERVER*, uint64_t> > TargetList;
struct KillInfo
{
uint64_t target_id;
TargetList targets;
};
static bool kill_func(DCB *dcb, void *data)
{
KillInfo* info = (KillInfo*)data;
if (dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER &&
dcb->session->ses_id == info->target_id)
{
MySQLProtocol* proto = (MySQLProtocol*)dcb->protocol;
info->targets.push_back(std::make_pair(dcb->server, proto->thread_id));
}
return true;
}
void mxs_mysql_execute_kill(MXS_SESSION* issuer, uint64_t target_id, kill_type_t type)
{
// Gather a list of servers and connection IDs to kill
KillInfo info = {target_id};
dcb_foreach(kill_func, &info);
if (info.targets.empty())
{
// No session found, send an error
std::stringstream err;
err << "Unknown thread id: " << target_id;
mysql_send_standard_error(issuer->client_dcb, 1, 1094, err.str().c_str());
}
else
{
// Execute the KILL on all of the servers
for (TargetList::iterator it = info.targets.begin();
it != info.targets.end(); it++)
{
LocalClient* client = LocalClient::create(issuer, it->first);
std::stringstream ss;
ss << "KILL " << (type == KT_QUERY ? "QUERY " : "") << it->second;
GWBUF* buffer = modutil_create_query(ss.str().c_str());
client->queue_query(buffer);
gwbuf_free(buffer);
// The LocalClient needs to delete itself once the queries are done
client->self_destruct();
}
mxs_mysql_send_ok(issuer->client_dcb, 1, 0, NULL);
}
}