MaxScale/server/modules/protocol/MySQL/mariadb_client.cc
Markus Mäkelä 21eef8a670
MXS-1985: Kill connections inside workers
The LocalClient micro-client required a reference to the session that was
valid at construction time. This is the reason why the previous
implementation used dcb_foreach to first gather the targets and then
execute queries on them. By replacing this reference with pointers to the
raw data it requires, we lift the requirement of the orignating session
being alive at construction time.

Now that the LocalClient no longer holds a reference to the session, the
killing of the connection does not have to be done on the same thread that
started the process. This prevents the deadlock that occurred when
concurrect dcb_foreach calls were made.

Replaced the unused dcb_foreach_parallel with a version of dcb_foreach
that allows iteration of DCBs local to this worker. The dcb_foreach_local
is the basis upon which all DCB access outside of administrative tasks
should be built on.

This change will introduce a regression in functionality: The client will
no longer receive an error if no connections match the KILL query
criteria. This is done to avoid having to synchronize the workers after
they have performed the killing of their own connections.
2018-07-24 09:51:46 +03:00

292 lines
6.9 KiB
C++

/*
* Copyright (c) 2016 MariaDB Corporation Ab
*
* Use of this software is governed by the Business Source License included
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
*
* Change Date: 2020-01-01
*
* On the date above, in accordance with the Business Source License, use
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#include <maxscale/protocol/mariadb_client.hh>
#include <maxscale/utils.h>
// TODO: Find a way to cleanly expose this
#include "../../../core/internal/worker.hh"
#ifdef EPOLLRDHUP
#define ERROR_EVENTS (EPOLLRDHUP | EPOLLHUP | EPOLLERR)
#else
#define ERROR_EVENTS (EPOLLHUP | EPOLLERR)
#endif
static const uint32_t poll_events = EPOLLIN | EPOLLOUT | EPOLLET | ERROR_EVENTS;
LocalClient::LocalClient(MYSQL_session* session, MySQLProtocol* proto, int fd):
m_state(VC_WAITING_HANDSHAKE),
m_sock(fd),
m_expected_bytes(0),
m_client(*session),
m_protocol(*proto),
m_self_destruct(false)
{
MXS_POLL_DATA::handler = LocalClient::poll_handler;
}
LocalClient::~LocalClient()
{
if (m_state != VC_ERROR)
{
close();
}
}
bool LocalClient::queue_query(GWBUF* buffer)
{
GWBUF* my_buf = NULL;
if (m_state != VC_ERROR && (my_buf = gwbuf_deep_clone(buffer)))
{
m_queue.push_back(my_buf);
if (m_state == VC_OK)
{
drain_queue();
}
}
return my_buf != NULL;
}
void LocalClient::self_destruct()
{
GWBUF* buffer = mysql_create_com_quit(NULL, 0);
queue_query(buffer);
gwbuf_free(buffer);
m_self_destruct = true;
}
void LocalClient::close()
{
mxs::Worker* worker = mxs::Worker::get_current();
ss_dassert(worker);
worker->remove_fd(m_sock);
::close(m_sock);
}
void LocalClient::error()
{
if (m_state != VC_ERROR)
{
close();
m_state = VC_ERROR;
}
}
void LocalClient::process(uint32_t events)
{
if (events & EPOLLIN)
{
GWBUF* buf = read_complete_packet();
if (buf)
{
if (m_state == VC_WAITING_HANDSHAKE)
{
if (gw_decode_mysql_server_handshake(&m_protocol, GWBUF_DATA(buf) + MYSQL_HEADER_LEN) == 0)
{
GWBUF* response = gw_generate_auth_response(&m_client, &m_protocol, false, false);
m_queue.push_front(response);
m_state = VC_RESPONSE_SENT;
}
else
{
error();
}
}
else if (m_state == VC_RESPONSE_SENT)
{
if (mxs_mysql_is_ok_packet(buf))
{
m_state = VC_OK;
}
else
{
error();
}
}
gwbuf_free(buf);
}
}
if (events & EPOLLOUT)
{
/** Queue is drained */
}
if (events & ERROR_EVENTS)
{
error();
}
if (m_queue.size() && m_state != VC_ERROR && m_state != VC_WAITING_HANDSHAKE)
{
drain_queue();
}
else if (m_state == VC_ERROR && m_self_destruct)
{
delete this;
}
}
GWBUF* LocalClient::read_complete_packet()
{
GWBUF* rval = NULL;
while (true)
{
uint8_t buffer[1024];
int rc = read(m_sock, buffer, sizeof(buffer));
if (rc == -1)
{
if (errno != EAGAIN && errno != EWOULDBLOCK)
{
MXS_ERROR("Failed to read from backend: %d, %s", errno, mxs_strerror(errno));
error();
}
break;
}
mxs::Buffer chunk(buffer, rc);
m_partial.append(chunk);
size_t len = m_partial.length();
if (m_expected_bytes == 0 && len >= 3)
{
mxs::Buffer::iterator iter = m_partial.begin();
m_expected_bytes = MYSQL_HEADER_LEN;
m_expected_bytes += *iter++;
m_expected_bytes += (*iter++ << 8);
m_expected_bytes += (*iter++ << 16);
}
if (len >= m_expected_bytes)
{
/** Read complete packet. Reset expected byte count and make
* the buffer contiguous. */
m_expected_bytes = 0;
m_partial.make_contiguous();
rval = m_partial.release();
break;
}
}
return rval;
}
void LocalClient::drain_queue()
{
bool more = true;
while (m_queue.size() && more)
{
/** Grab a buffer from the queue */
GWBUF* buf = m_queue.front().release();
m_queue.pop_front();
while (buf)
{
int rc = write(m_sock, GWBUF_DATA(buf), GWBUF_LENGTH(buf));
if (rc > 0)
{
buf = gwbuf_consume(buf, rc);
}
else
{
if (rc == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
{
MXS_ERROR("Failed to write to backend: %d, %s", errno, mxs_strerror(errno));
error();
}
m_queue.push_front(buf);
more = false;
break;
}
}
}
}
uint32_t LocalClient::poll_handler(struct mxs_poll_data* data, int wid, uint32_t events)
{
LocalClient* client = static_cast<LocalClient*>(data);
client->process(events);
return 0;
}
LocalClient* LocalClient::create(MYSQL_session* session, MySQLProtocol* proto, const char* ip, uint64_t port)
{
LocalClient* rval = NULL;
sockaddr_storage addr;
int fd = open_network_socket(MXS_SOCKET_NETWORK, &addr, ip, port);
if (fd > 0 && (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) == 0 || errno == EINPROGRESS))
{
LocalClient* relay = new (std::nothrow) LocalClient(session, proto, fd);
if (relay)
{
mxs::Worker* worker = mxs::Worker::get_current();
if (worker->add_fd(fd, poll_events, (MXS_POLL_DATA*)relay))
{
rval = relay;
}
else
{
relay->m_state = VC_ERROR;
delete rval;
rval = NULL;
}
}
}
if (rval == NULL && fd > 0)
{
::close(fd);
}
return rval;
}
LocalClient* LocalClient::create(MYSQL_session* session, MySQLProtocol* proto, SERVICE* service)
{
LocalClient* rval = NULL;
LISTENER_ITERATOR iter;
for (SERV_LISTENER* listener = listener_iterator_init(service, &iter);
listener; listener = listener_iterator_next(&iter))
{
if (listener->port > 0)
{
/** Pick the first network listener */
rval = create(session, proto, "127.0.0.1", service->ports->port);
break;
}
}
return rval;
}
LocalClient* LocalClient::create(MYSQL_session* session, MySQLProtocol* proto, SERVER* server)
{
return create(session, proto, server->name, server->port);
}