MXS-1452: Add support for KILL USER <name>

Added support for killing queries by username. This will kill all
connections from that particular user on all servers.
This commit is contained in:
Markus Mäkelä
2017-10-02 14:53:30 +03:00
parent 4150dee952
commit 96d160f897
5 changed files with 131 additions and 54 deletions

View File

@ -20,6 +20,7 @@
#include <limits.h>
#include <netinet/tcp.h>
#include <sys/stat.h>
#include <string>
#include <maxscale/alloc.h>
#include <maxscale/authenticator.h>
@ -75,7 +76,7 @@ static void gw_process_one_new_client(DCB *client_dcb);
static spec_com_res_t process_special_commands(DCB *client_dcb, GWBUF *read_buffer, int nbytes_read);
static spec_com_res_t handle_query_kill(DCB* dcb, GWBUF* read_buffer, spec_com_res_t current,
bool is_complete, unsigned int packet_len);
static bool parse_kill_query(char *query, uint64_t *thread_id_out, kill_type_t *kt_out);
static bool parse_kill_query(char *query, uint64_t *thread_id_out, kill_type_t *kt_out, std::string* user);
/**
* The module entry point routine. It is this routine that
@ -1725,12 +1726,19 @@ spec_com_res_t handle_query_kill(DCB* dcb, GWBUF* read_buffer, spec_com_res_t cu
querybuf[copied_len] = '\0';
kill_type_t kt = KT_CONNECTION;
uint64_t thread_id = 0;
bool parsed = parse_kill_query(querybuf, &thread_id, &kt);
rval = RES_END;
std::string user;
if (parsed && (thread_id > 0)) // MaxScale session counter starts at 1
if (parse_kill_query(querybuf, &thread_id, &kt, &user))
{
mxs_mysql_execute_kill(dcb->session, thread_id, kt);
if (thread_id > 0)
{
mxs_mysql_execute_kill(dcb->session, thread_id, kt);
}
else if (!user.empty())
{
mxs_mysql_execute_kill_user(dcb->session, user.c_str(), kt);
}
}
}
}
@ -1745,31 +1753,48 @@ spec_com_res_t handle_query_kill(DCB* dcb, GWBUF* read_buffer, spec_com_res_t cu
return rval;
}
static void extract_user(char* token, std::string* user)
{
char* end = strchr(token, ';');
if (end)
{
user->assign(token, end - token);
}
else
{
user->assign(token);
}
}
/**
* Parse a "KILL [CONNECTION | QUERY] <process_id>" query. Will modify
* the argument string even if unsuccessful.
* Parse a "KILL [CONNECTION | QUERY] [ <process_id> | USER <username> ]" query.
* Will modify the argument string even if unsuccessful.
*
* @param query Query string to parse
* @paran thread_id_out Thread id output
* @param kt_out Kill command type output
* @return true on success, false on error
*/
static bool parse_kill_query(char *query, uint64_t *thread_id_out, kill_type_t *kt_out)
static bool parse_kill_query(char *query, uint64_t *thread_id_out, kill_type_t *kt_out, std::string* user)
{
const char WORD_CONNECTION[] = "CONNECTION";
const char WORD_QUERY[] = "QUERY";
const char WORD_HARD[] = "HARD";
const char WORD_SOFT[] = "SOFT";
const char WORD_USER[] = "USER";
const char DELIM[] = " \n\t";
int kill_type = KT_CONNECTION;
unsigned long long int thread_id = 0;
std::string tmpuser;
enum kill_parse_state_t
{
KILL,
CONN_QUERY,
ID,
USER,
SEMICOLON,
DONE
} state = KILL;
@ -1826,52 +1851,50 @@ static bool parse_kill_query(char *query, uint64_t *thread_id_out, kill_type_t *
break;
case ID:
if (strncasecmp(token, WORD_USER, sizeof(WORD_USER) - 1) == 0)
{
state = USER;
get_next = true;
break;
}
else
{
/* strtoull() accepts negative numbers, so check for '-' here */
if (*token == '-')
{
error = true;
break;
}
char *endptr_id = NULL;
thread_id = strtoull(token, &endptr_id, 0);
if ((thread_id == ULLONG_MAX) && (errno == ERANGE))
long long int l = strtoll(token, &endptr_id, 0);
if ((l == LLONG_MAX && errno == ERANGE) ||
(*endptr_id != '\0' && *endptr_id != ';') ||
l <= 0 || endptr_id == token)
{
// Not a positive 32-bit integer
error = true;
errno = 0;
}
else if (endptr_id == token)
{
error = true; // No digits were read
}
else if (*endptr_id == '\0') // Can be real end or written by strtok
{
state = SEMICOLON; // In case we have space before ;
get_next = true;
}
else if (*endptr_id == ';')
{
token = endptr_id;
state = SEMICOLON;
}
else
{
error = true;
ss_dassert(*endptr_id == '\0' || *endptr_id == ';');
state = SEMICOLON; // In case we have space before ;
get_next = true;
thread_id = l;
}
}
break;
case USER:
extract_user(token, &tmpuser);
state = SEMICOLON;
get_next = true;
break;
case SEMICOLON:
if (strncmp(token, ";", 1) == 0)
{
if (strncmp(token, ";", 1) == 0)
{
state = DONE;
get_next = true;
}
else
{
error = true;
}
state = DONE;
get_next = true;
}
else
{
error = true;
}
break;
@ -1894,6 +1917,7 @@ static bool parse_kill_query(char *query, uint64_t *thread_id_out, kill_type_t *
{
*thread_id_out = thread_id;
*kt_out = (kill_type_t)kill_type;
*user = tmpuser;
return true;
}
}

View File

@ -17,6 +17,7 @@
#include <netinet/tcp.h>
#include <set>
#include <sstream>
#include <vector>
@ -1014,6 +1015,7 @@ bool gw_get_shared_session_auth_info(DCB* dcb, MYSQL_session* session)
* @param message SQL message
* @return 1 on success, 0 on error
*
* @todo Support more than 255 affected rows
*/
int mxs_mysql_send_ok(DCB *dcb, int sequence, uint8_t affected_rows, const char* message)
{
@ -1747,3 +1749,52 @@ void mxs_mysql_execute_kill(MXS_SESSION* issuer, uint64_t target_id, kill_type_t
mxs_mysql_send_ok(issuer->client_dcb, 1, 0, NULL);
}
}
typedef std::set<SERVER*> ServerSet;
struct KillUserInfo
{
std::string user;
ServerSet targets;
};
static bool kill_user_func(DCB *dcb, void *data)
{
KillUserInfo* info = (KillUserInfo*)data;
if (dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER &&
strcasecmp(dcb->session->client_dcb->user, info->user.c_str()) == 0)
{
info->targets.insert(dcb->server);
}
return true;
}
void mxs_mysql_execute_kill_user(MXS_SESSION* issuer, const char* user, kill_type_t type)
{
// Gather a list of servers and connection IDs to kill
KillUserInfo info = {user};
dcb_foreach(kill_user_func, &info);
// Execute the KILL on all of the servers
for (ServerSet::iterator it = info.targets.begin();
it != info.targets.end(); it++)
{
LocalClient* client = LocalClient::create(issuer, *it);
const char* hard = (type & KT_HARD) ? "HARD " :
(type & KT_SOFT) ? "SOFT " : "";
const char* query = (type & KT_QUERY) ? "QUERY " : "";
std::stringstream ss;
ss << "KILL " << hard << query << "USER " << user;
GWBUF* buffer = modutil_create_query(ss.str().c_str());
client->queue_query(buffer);
gwbuf_free(buffer);
// The LocalClient needs to delete itself once the queries are done
client->self_destruct();
}
mxs_mysql_send_ok(issuer->client_dcb, info.targets.size(), 0, NULL);
}