MXS-1985: Kill connections inside workers

The LocalClient micro-client required a reference to the session that was
valid at construction time. This is the reason why the previous
implementation used dcb_foreach to first gather the targets and then
execute queries on them. By replacing this reference with pointers to the
raw data it requires, we lift the requirement of the orignating session
being alive at construction time.

Now that the LocalClient no longer holds a reference to the session, the
killing of the connection does not have to be done on the same thread that
started the process. This prevents the deadlock that occurred when
concurrect dcb_foreach calls were made.

Replaced the unused dcb_foreach_parallel with a version of dcb_foreach
that allows iteration of DCBs local to this worker. The dcb_foreach_local
is the basis upon which all DCB access outside of administrative tasks
should be built on.

This change will introduce a regression in functionality: The client will
no longer receive an error if no connections match the KILL query
criteria. This is done to avoid having to synchronize the workers after
they have performed the killing of their own connections.
This commit is contained in:
Markus Mäkelä
2018-07-20 04:30:54 +03:00
parent 101dad74a7
commit 21eef8a670
6 changed files with 126 additions and 126 deletions

View File

@ -25,20 +25,15 @@
static const uint32_t poll_events = EPOLLIN | EPOLLOUT | EPOLLET | ERROR_EVENTS;
LocalClient::LocalClient(MXS_SESSION* session, int fd):
LocalClient::LocalClient(MYSQL_session* session, MySQLProtocol* proto, int fd):
m_state(VC_WAITING_HANDSHAKE),
m_sock(fd),
m_expected_bytes(0),
m_client({}),
m_protocol({}),
m_client(*session),
m_protocol(*proto),
m_self_destruct(false)
{
MXS_POLL_DATA::handler = LocalClient::poll_handler;
MySQLProtocol* client = (MySQLProtocol*)session->client_dcb->protocol;
m_protocol.charset = client->charset;
m_protocol.client_capabilities = client->client_capabilities;
m_protocol.extra_capabilities = client->extra_capabilities;
gw_get_shared_session_auth_info(session->client_dcb, &m_client);
}
LocalClient::~LocalClient()
@ -237,7 +232,7 @@ uint32_t LocalClient::poll_handler(struct mxs_poll_data* data, int wid, uint32_t
return 0;
}
LocalClient* LocalClient::create(MXS_SESSION* session, const char* ip, uint64_t port)
LocalClient* LocalClient::create(MYSQL_session* session, MySQLProtocol* proto, const char* ip, uint64_t port)
{
LocalClient* rval = NULL;
sockaddr_storage addr;
@ -245,7 +240,7 @@ LocalClient* LocalClient::create(MXS_SESSION* session, const char* ip, uint64_t
if (fd > 0 && (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) == 0 || errno == EINPROGRESS))
{
LocalClient* relay = new (std::nothrow) LocalClient(session, fd);
LocalClient* relay = new (std::nothrow) LocalClient(session, proto, fd);
if (relay)
{
@ -271,7 +266,7 @@ LocalClient* LocalClient::create(MXS_SESSION* session, const char* ip, uint64_t
return rval;
}
LocalClient* LocalClient::create(MXS_SESSION* session, SERVICE* service)
LocalClient* LocalClient::create(MYSQL_session* session, MySQLProtocol* proto, SERVICE* service)
{
LocalClient* rval = NULL;
LISTENER_ITERATOR iter;
@ -282,7 +277,7 @@ LocalClient* LocalClient::create(MXS_SESSION* session, SERVICE* service)
if (listener->port > 0)
{
/** Pick the first network listener */
rval = create(session, "127.0.0.1", service->ports->port);
rval = create(session, proto, "127.0.0.1", service->ports->port);
break;
}
}
@ -290,7 +285,7 @@ LocalClient* LocalClient::create(MXS_SESSION* session, SERVICE* service)
return rval;
}
LocalClient* LocalClient::create(MXS_SESSION* session, SERVER* server)
LocalClient* LocalClient::create(MYSQL_session* session, MySQLProtocol* proto, SERVER* server)
{
return create(session, server->name, server->port);
return create(session, proto, server->name, server->port);
}

View File

@ -19,7 +19,7 @@
#include <set>
#include <sstream>
#include <vector>
#include <map>
#include <maxscale/alloc.h>
#include <maxscale/hk_heartbeat.h>
@ -30,6 +30,7 @@
#include <maxscale/utils.h>
#include <maxscale/protocol/mariadb_client.hh>
#include <maxscale/poll.h>
#include <maxscale/worker.h>
uint8_t null_client_sha1[MYSQL_SCRAMBLE_LEN] = "";
@ -1586,17 +1587,60 @@ bool mxs_mysql_command_will_respond(uint8_t cmd)
cmd != MXS_COM_STMT_CLOSE;
}
typedef std::vector< std::pair<SERVER*, uint64_t> > TargetList;
namespace
{
// Servers and queries to execute on them
typedef std::map<SERVER*, std::string> TargetList;
struct KillInfo
{
uint64_t target_id;
typedef bool (*DcbCallback)(DCB *dcb, void *data);
KillInfo(std::string query, MXS_SESSION* ses, DcbCallback callback):
origin(mxs_worker_get_current_id()),
query_base(query),
protocol(*(MySQLProtocol*)ses->client_dcb->protocol),
cb(callback)
{
gw_get_shared_session_auth_info(ses->client_dcb, &session);
}
int origin;
std::string query_base;
MYSQL_session session;
MySQLProtocol protocol;
DcbCallback cb;
TargetList targets;
};
static bool kill_func(DCB *dcb, void *data);
struct ConnKillInfo: public KillInfo
{
ConnKillInfo(uint64_t id, std::string query, MXS_SESSION* ses):
KillInfo(query, ses, kill_func),
target_id(id)
{}
uint64_t target_id;
};
static bool kill_user_func(DCB *dcb, void *data);
struct UserKillInfo: public KillInfo
{
UserKillInfo(std::string name, std::string query, MXS_SESSION* ses):
KillInfo(query, ses, kill_user_func),
user(name)
{}
std::string user;
};
static bool kill_func(DCB *dcb, void *data)
{
KillInfo* info = (KillInfo*)data;
ConnKillInfo* info = static_cast<ConnKillInfo*>(data);
if (dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER &&
dcb->session->ses_id == info->target_id)
@ -1606,7 +1650,9 @@ static bool kill_func(DCB *dcb, void *data)
if (proto->thread_id)
{
// DCB is connected and we know the thread ID so we can kill it
info->targets.push_back(std::make_pair(dcb->server, proto->thread_id));
std::stringstream ss;
ss << info->query_base << proto->thread_id;
info->targets[dcb->server] = ss.str();
}
else
{
@ -1619,82 +1665,29 @@ static bool kill_func(DCB *dcb, void *data)
return true;
}
void mxs_mysql_execute_kill(MXS_SESSION* issuer, uint64_t target_id, kill_type_t type)
{
// Gather a list of servers and connection IDs to kill
KillInfo info = {target_id};
dcb_foreach(kill_func, &info);
if (info.targets.empty())
{
// No session found, send an error
std::stringstream err;
err << "Unknown thread id: " << target_id;
mysql_send_standard_error(issuer->client_dcb, 1, 1094, err.str().c_str());
}
else
{
// Execute the KILL on all of the servers
for (TargetList::iterator it = info.targets.begin();
it != info.targets.end(); it++)
{
LocalClient* client = LocalClient::create(issuer, it->first);
const char* hard = (type & KT_HARD) ? "HARD " :
(type & KT_SOFT) ? "SOFT " :
"";
const char* query = (type & KT_QUERY) ? "QUERY " : "";
std::stringstream ss;
ss << "KILL " << hard << query << it->second;
GWBUF* buffer = modutil_create_query(ss.str().c_str());
client->queue_query(buffer);
gwbuf_free(buffer);
// The LocalClient needs to delete itself once the queries are done
client->self_destruct();
}
mxs_mysql_send_ok(issuer->client_dcb, 1, 0, NULL);
}
}
typedef std::set<SERVER*> ServerSet;
struct KillUserInfo
{
std::string user;
ServerSet targets;
};
static bool kill_user_func(DCB *dcb, void *data)
{
KillUserInfo* info = (KillUserInfo*)data;
UserKillInfo* info = (UserKillInfo*)data;
if (dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER &&
strcasecmp(dcb->session->client_dcb->user, info->user.c_str()) == 0)
{
info->targets.insert(dcb->server);
info->targets[dcb->server] = info->query_base;
}
return true;
}
void mxs_mysql_execute_kill_user(MXS_SESSION* issuer, const char* user, kill_type_t type)
static void worker_func(int thread_id, void* data)
{
// Gather a list of servers and connection IDs to kill
KillUserInfo info = {user};
dcb_foreach(kill_user_func, &info);
KillInfo* info = static_cast<KillInfo*>(data);
dcb_foreach_local(info->cb, info);
// Execute the KILL on all of the servers
for (ServerSet::iterator it = info.targets.begin();
it != info.targets.end(); it++)
for (TargetList::iterator it = info->targets.begin();
it != info->targets.end(); it++)
{
LocalClient* client = LocalClient::create(issuer, *it);
const char* hard = (type & KT_HARD) ? "HARD " :
(type & KT_SOFT) ? "SOFT " : "";
const char* query = (type & KT_QUERY) ? "QUERY " : "";
std::stringstream ss;
ss << "KILL " << hard << query << "USER " << user;
GWBUF* buffer = modutil_create_query(ss.str().c_str());
LocalClient* client = LocalClient::create(&info->session, &info->protocol, it->first);
GWBUF* buffer = modutil_create_query(it->second.c_str());
client->queue_query(buffer);
gwbuf_free(buffer);
@ -1702,5 +1695,43 @@ void mxs_mysql_execute_kill_user(MXS_SESSION* issuer, const char* user, kill_typ
client->self_destruct();
}
mxs_mysql_send_ok(issuer->client_dcb, info.targets.size(), 0, NULL);
delete info;
}
}
void mxs_mysql_execute_kill(MXS_SESSION* issuer, uint64_t target_id, kill_type_t type)
{
const char* hard = (type & KT_HARD) ? "HARD " : (type & KT_SOFT) ? "SOFT " : "";
const char* query = (type & KT_QUERY) ? "QUERY " : "";
std::stringstream ss;
ss << "KILL " << hard << query;
for (int i = 0; i < config_threadcount(); i++)
{
MXS_WORKER* worker = mxs_worker_get(i);
ss_dassert(worker);
mxs_worker_post_message(worker, MXS_WORKER_MSG_CALL, (intptr_t)worker_func,
(intptr_t)new ConnKillInfo(target_id, ss.str(), issuer));
}
mxs_mysql_send_ok(issuer->client_dcb, 1, 0, NULL);
}
void mxs_mysql_execute_kill_user(MXS_SESSION* issuer, const char* user, kill_type_t type)
{
const char* hard = (type & KT_HARD) ? "HARD " : (type & KT_SOFT) ? "SOFT " : "";
const char* query = (type & KT_QUERY) ? "QUERY " : "";
std::stringstream ss;
ss << "KILL " << hard << query << "USER " << user;
for (int i = 0; i < config_threadcount(); i++)
{
MXS_WORKER* worker = mxs_worker_get(i);
ss_dassert(worker);
mxs_worker_post_message(worker, MXS_WORKER_MSG_CALL, (intptr_t)worker_func,
(intptr_t)new UserKillInfo(user, ss.str(), issuer));
}
mxs_mysql_send_ok(issuer->client_dcb, 1, 0, NULL);
}