Merge branch '2.2' into develop

This commit is contained in:
Markus Mäkelä
2018-07-26 11:27:09 +03:00
29 changed files with 409 additions and 417 deletions

View File

@ -25,20 +25,15 @@
static const uint32_t poll_events = EPOLLIN | EPOLLOUT | EPOLLET | ERROR_EVENTS;
LocalClient::LocalClient(MXS_SESSION* session, int fd):
LocalClient::LocalClient(MYSQL_session* session, MySQLProtocol* proto, int fd):
m_state(VC_WAITING_HANDSHAKE),
m_sock(fd),
m_expected_bytes(0),
m_client({}),
m_protocol({}),
m_client(*session),
m_protocol(*proto),
m_self_destruct(false)
{
MXS_POLL_DATA::handler = LocalClient::poll_handler;
MySQLProtocol* client = (MySQLProtocol*)session->client_dcb->protocol;
m_protocol.charset = client->charset;
m_protocol.client_capabilities = client->client_capabilities;
m_protocol.extra_capabilities = client->extra_capabilities;
gw_get_shared_session_auth_info(session->client_dcb, &m_client);
}
LocalClient::~LocalClient()
@ -237,7 +232,7 @@ uint32_t LocalClient::poll_handler(struct mxs_poll_data* data, void* worker, uin
return 0;
}
LocalClient* LocalClient::create(MXS_SESSION* session, const char* ip, uint64_t port)
LocalClient* LocalClient::create(MYSQL_session* session, MySQLProtocol* proto, const char* ip, uint64_t port)
{
LocalClient* rval = NULL;
sockaddr_storage addr;
@ -245,7 +240,7 @@ LocalClient* LocalClient::create(MXS_SESSION* session, const char* ip, uint64_t
if (fd > 0 && (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) == 0 || errno == EINPROGRESS))
{
LocalClient* relay = new (std::nothrow) LocalClient(session, fd);
LocalClient* relay = new (std::nothrow) LocalClient(session, proto, fd);
if (relay)
{
@ -271,7 +266,7 @@ LocalClient* LocalClient::create(MXS_SESSION* session, const char* ip, uint64_t
return rval;
}
LocalClient* LocalClient::create(MXS_SESSION* session, SERVICE* service)
LocalClient* LocalClient::create(MYSQL_session* session, MySQLProtocol* proto, SERVICE* service)
{
LocalClient* rval = NULL;
LISTENER_ITERATOR iter;
@ -282,7 +277,7 @@ LocalClient* LocalClient::create(MXS_SESSION* session, SERVICE* service)
if (listener->port > 0)
{
/** Pick the first network listener */
rval = create(session, "127.0.0.1", service->ports->port);
rval = create(session, proto, "127.0.0.1", service->ports->port);
break;
}
}
@ -290,7 +285,7 @@ LocalClient* LocalClient::create(MXS_SESSION* session, SERVICE* service)
return rval;
}
LocalClient* LocalClient::create(MXS_SESSION* session, SERVER* server)
LocalClient* LocalClient::create(MYSQL_session* session, MySQLProtocol* proto, SERVER* server)
{
return create(session, server->address, server->port);
return create(session, proto, server->address, server->port);
}

View File

@ -1619,17 +1619,6 @@ static bool reauthenticate_client(MXS_SESSION* session, GWBUF* packetbuf)
return rval;
}
// Helper function for debug assertions
static bool only_one_packet(GWBUF* buffer)
{
ss_dassert(buffer);
uint8_t header[4] = {};
gwbuf_copy_data(buffer, 0, MYSQL_HEADER_LEN, header);
size_t packet_len = gw_mysql_get_byte3(header);
size_t buffer_len = gwbuf_length(buffer);
return packet_len + MYSQL_HEADER_LEN == buffer_len;
}
/**
* Detect if buffer includes partial mysql packet or multiple packets.
* Store partial packet to dcb_readqueue. Send complete packets one by one
@ -1654,12 +1643,11 @@ static int route_by_statement(MXS_SESSION* session, uint64_t capabilities, GWBUF
// Process client request one packet at a time
packetbuf = modutil_get_next_MySQL_packet(p_readbuf);
// TODO: Do this only when RCAP_TYPE_CONTIGUOUS_INPUT is requested
packetbuf = gwbuf_make_contiguous(packetbuf);
if (packetbuf != NULL)
{
ss_dassert(only_one_packet(packetbuf));
// TODO: Do this only when RCAP_TYPE_CONTIGUOUS_INPUT is requested
packetbuf = gwbuf_make_contiguous(packetbuf);
CHK_GWBUF(packetbuf);
MySQLProtocol* proto = (MySQLProtocol*)session->client_dcb->protocol;
@ -1760,6 +1748,9 @@ static int route_by_statement(MXS_SESSION* session, uint64_t capabilities, GWBUF
rc = 0;
gwbuf_free(packetbuf);
packetbuf = NULL;
MXS_ERROR("User reauthentication failed for '%s'@'%s'",
session->client_dcb->user,
session->client_dcb->remote);
}
}

View File

@ -156,7 +156,10 @@ public:
bypass_whitespace();
if (is_set(m_pI))
// Check that there's enough characters to contain a SET keyword
bool long_enough = m_pEnd - m_pI > 3 ;
if (long_enough && is_set(m_pI))
{
rv = parse(pSql_mode);
}

View File

@ -19,7 +19,7 @@
#include <set>
#include <sstream>
#include <vector>
#include <map>
#include <maxscale/alloc.h>
#include <maxscale/clock.h>
@ -30,6 +30,7 @@
#include <maxscale/utils.h>
#include <maxscale/protocol/mariadb_client.hh>
#include <maxscale/poll.h>
#include <maxscale/routingworker.h>
uint8_t null_client_sha1[MYSQL_SCRAMBLE_LEN] = "";
@ -1309,121 +1310,106 @@ bool mxs_mysql_command_will_respond(uint8_t cmd)
cmd != MXS_COM_STMT_CLOSE;
}
typedef std::vector< std::pair<SERVER*, uint64_t> > TargetList;
namespace
{
// Servers and queries to execute on them
typedef std::map<SERVER*, std::string> TargetList;
struct KillInfo
{
uint64_t target_id;
typedef bool (*DcbCallback)(DCB *dcb, void *data);
KillInfo(std::string query, MXS_SESSION* ses, DcbCallback callback):
origin(mxs_rworker_get_current_id()),
query_base(query),
protocol(*(MySQLProtocol*)ses->client_dcb->protocol),
cb(callback)
{
gw_get_shared_session_auth_info(ses->client_dcb, &session);
}
int origin;
std::string query_base;
MYSQL_session session;
MySQLProtocol protocol;
DcbCallback cb;
TargetList targets;
};
static bool kill_func(DCB *dcb, void *data);
struct ConnKillInfo: public KillInfo
{
ConnKillInfo(uint64_t id, std::string query, MXS_SESSION* ses):
KillInfo(query, ses, kill_func),
target_id(id)
{}
uint64_t target_id;
};
static bool kill_user_func(DCB *dcb, void *data);
struct UserKillInfo: public KillInfo
{
UserKillInfo(std::string name, std::string query, MXS_SESSION* ses):
KillInfo(query, ses, kill_user_func),
user(name)
{}
std::string user;
};
static bool kill_func(DCB *dcb, void *data)
{
bool rval = true;
KillInfo* info = (KillInfo*)data;
ConnKillInfo* info = static_cast<ConnKillInfo*>(data);
if (dcb->session->ses_id == info->target_id)
{
for (auto it = dcb->session->dcb_set->begin(); it != dcb->session->dcb_set->end(); it++)
MySQLProtocol* proto = (MySQLProtocol*)dcb->protocol;
if (proto->thread_id)
{
MySQLProtocol* proto = (MySQLProtocol*)(*it)->protocol;
if (proto->thread_id)
{
// DCB is connected and we know the thread ID so we can kill it
info->targets.push_back(std::make_pair((*it)->server, proto->thread_id));
}
else
{
// DCB is not yet connected, send a hangup to forcibly close it
dcb->session->close_reason = SESSION_CLOSE_KILLED;
poll_fake_hangup_event(*it);
}
}
// Found the session, stop iterating over DCBs
rval = false;
}
return rval;
}
void mxs_mysql_execute_kill(MXS_SESSION* issuer, uint64_t target_id, kill_type_t type)
{
// Gather a list of servers and connection IDs to kill
KillInfo info = {target_id};
dcb_foreach(kill_func, &info);
if (info.targets.empty())
{
// No session found, send an error
std::stringstream err;
err << "Unknown thread id: " << target_id;
mysql_send_standard_error(issuer->client_dcb, 1, 1094, err.str().c_str());
}
else
{
// Execute the KILL on all of the servers
for (TargetList::iterator it = info.targets.begin();
it != info.targets.end(); it++)
{
LocalClient* client = LocalClient::create(issuer, it->first);
const char* hard = (type & KT_HARD) ? "HARD " :
(type & KT_SOFT) ? "SOFT " :
"";
const char* query = (type & KT_QUERY) ? "QUERY " : "";
// DCB is connected and we know the thread ID so we can kill it
std::stringstream ss;
ss << "KILL " << hard << query << it->second;
GWBUF* buffer = modutil_create_query(ss.str().c_str());
client->queue_query(buffer);
gwbuf_free(buffer);
// The LocalClient needs to delete itself once the queries are done
client->self_destruct();
ss << info->query_base << proto->thread_id;
info->targets[dcb->server] = ss.str();
}
else
{
// DCB is not yet connected, send a hangup to forcibly close it
dcb->session->close_reason = SESSION_CLOSE_KILLED;
poll_fake_hangup_event(dcb);
}
mxs_mysql_send_ok(issuer->client_dcb, 1, 0, NULL);
}
}
typedef std::set<SERVER*> ServerSet;
struct KillUserInfo
{
std::string user;
ServerSet targets;
};
static bool kill_user_func(DCB *dcb, void *data)
{
KillUserInfo* info = (KillUserInfo*)data;
if (dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER &&
strcasecmp(dcb->session->client_dcb->user, info->user.c_str()) == 0)
{
info->targets.insert(dcb->server);
}
return true;
}
void mxs_mysql_execute_kill_user(MXS_SESSION* issuer, const char* user, kill_type_t type)
static bool kill_user_func(DCB *dcb, void *data)
{
// Gather a list of servers and connection IDs to kill
KillUserInfo info = {user};
dcb_foreach(kill_user_func, &info);
UserKillInfo* info = (UserKillInfo*)data;
// Execute the KILL on all of the servers
for (ServerSet::iterator it = info.targets.begin();
it != info.targets.end(); it++)
if (dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER &&
strcasecmp(dcb->session->client_dcb->user, info->user.c_str()) == 0)
{
LocalClient* client = LocalClient::create(issuer, *it);
const char* hard = (type & KT_HARD) ? "HARD " :
(type & KT_SOFT) ? "SOFT " : "";
const char* query = (type & KT_QUERY) ? "QUERY " : "";
std::stringstream ss;
ss << "KILL " << hard << query << "USER " << user;
GWBUF* buffer = modutil_create_query(ss.str().c_str());
info->targets[dcb->server] = info->query_base;
}
return true;
}
static void worker_func(int thread_id, void* data)
{
KillInfo* info = static_cast<KillInfo*>(data);
dcb_foreach_local(info->cb, info);
for (TargetList::iterator it = info->targets.begin();
it != info->targets.end(); it++)
{
LocalClient* client = LocalClient::create(&info->session, &info->protocol, it->first);
GWBUF* buffer = modutil_create_query(it->second.c_str());
client->queue_query(buffer);
gwbuf_free(buffer);
@ -1431,7 +1417,45 @@ void mxs_mysql_execute_kill_user(MXS_SESSION* issuer, const char* user, kill_typ
client->self_destruct();
}
mxs_mysql_send_ok(issuer->client_dcb, info.targets.size(), 0, NULL);
delete info;
}
}
void mxs_mysql_execute_kill(MXS_SESSION* issuer, uint64_t target_id, kill_type_t type)
{
const char* hard = (type & KT_HARD) ? "HARD " : (type & KT_SOFT) ? "SOFT " : "";
const char* query = (type & KT_QUERY) ? "QUERY " : "";
std::stringstream ss;
ss << "KILL " << hard << query;
for (int i = 0; i < config_threadcount(); i++)
{
MXS_WORKER* worker = mxs_rworker_get(i);
ss_dassert(worker);
mxs_worker_post_message(worker, MXS_WORKER_MSG_CALL, (intptr_t)worker_func,
(intptr_t)new ConnKillInfo(target_id, ss.str(), issuer));
}
mxs_mysql_send_ok(issuer->client_dcb, 1, 0, NULL);
}
void mxs_mysql_execute_kill_user(MXS_SESSION* issuer, const char* user, kill_type_t type)
{
const char* hard = (type & KT_HARD) ? "HARD " : (type & KT_SOFT) ? "SOFT " : "";
const char* query = (type & KT_QUERY) ? "QUERY " : "";
std::stringstream ss;
ss << "KILL " << hard << query << "USER " << user;
for (int i = 0; i < config_threadcount(); i++)
{
MXS_WORKER* worker = mxs_rworker_get(i);
ss_dassert(worker);
mxs_worker_post_message(worker, MXS_WORKER_MSG_CALL, (intptr_t)worker_func,
(intptr_t)new UserKillInfo(user, ss.str(), issuer));
}
mxs_mysql_send_ok(issuer->client_dcb, 1, 0, NULL);
}
/**