
If multiple statements were stored in a single buffer only one of them would get registered.
2179 lines
71 KiB
C++
2179 lines
71 KiB
C++
/*
|
|
*
|
|
* Copyright (c) 2016 MariaDB Corporation Ab
|
|
*
|
|
* Use of this software is governed by the Business Source License included
|
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
|
*
|
|
* Change Date: 2022-01-01
|
|
*
|
|
* On the date above, in accordance with the Business Source License, use
|
|
* of this software will be governed by version 2 or later of the General
|
|
* Public License.
|
|
*/
|
|
|
|
#define MXS_MODULE_NAME "mariadbclient"
|
|
|
|
#include <maxscale/ccdefs.hh>
|
|
|
|
#include <inttypes.h>
|
|
#include <limits.h>
|
|
#include <netinet/tcp.h>
|
|
#include <sys/stat.h>
|
|
#include <algorithm>
|
|
#include <string>
|
|
#include <vector>
|
|
|
|
#include <maxscale/alloc.h>
|
|
#include <maxscale/authenticator.h>
|
|
#include <maxscale/log.h>
|
|
#include <maxscale/modinfo.h>
|
|
#include <maxscale/modutil.h>
|
|
#include <maxscale/poll.h>
|
|
#include <maxscale/protocol.h>
|
|
#include <maxscale/protocol/mysql.h>
|
|
#include <maxscale/query_classifier.h>
|
|
#include <maxscale/router.h>
|
|
#include <maxscale/routingworker.hh>
|
|
#include <maxscale/session.h>
|
|
#include <maxscale/ssl.h>
|
|
#include <maxscale/utils.h>
|
|
|
|
#include "setparser.hh"
|
|
#include "sqlmodeparser.hh"
|
|
|
|
/** Return type of process_special_commands() */
|
|
typedef enum spec_com_res_t
|
|
{
|
|
RES_CONTINUE, // No special command detected, proceed as normal.
|
|
RES_END, // Query handling completed, do not send to filters/router.
|
|
RES_MORE_DATA // Possible special command, but not enough data to be sure. Must
|
|
// wait for more data.
|
|
} spec_com_res_t;
|
|
|
|
const char WORD_KILL[] = "KILL";
|
|
|
|
static int process_init(void);
|
|
static void process_finish(void);
|
|
static int thread_init(void);
|
|
static void thread_finish(void);
|
|
|
|
static int gw_MySQLAccept(DCB* listener);
|
|
static int gw_MySQLListener(DCB* listener, char* config_bind);
|
|
static int gw_read_client_event(DCB* dcb);
|
|
static int gw_write_client_event(DCB* dcb);
|
|
static int gw_MySQLWrite_client(DCB* dcb, GWBUF* queue);
|
|
static int gw_error_client_event(DCB* dcb);
|
|
static int gw_client_close(DCB* dcb);
|
|
static int gw_client_hangup_event(DCB* dcb);
|
|
static char* gw_default_auth();
|
|
static int gw_connection_limit(DCB* dcb, int limit);
|
|
static int MySQLSendHandshake(DCB* dcb);
|
|
static int route_by_statement(MXS_SESSION*, uint64_t, GWBUF**);
|
|
static void mysql_client_auth_error_handling(DCB* dcb, int auth_val, int packet_number);
|
|
static int gw_read_do_authentication(DCB* dcb, GWBUF* read_buffer, int nbytes_read);
|
|
static int gw_read_normal_data(DCB* dcb, GWBUF* read_buffer, int nbytes_read);
|
|
static int gw_read_finish_processing(DCB* dcb, GWBUF* read_buffer, uint64_t capabilities);
|
|
static void gw_process_one_new_client(DCB* client_dcb);
|
|
static spec_com_res_t process_special_commands(DCB* client_dcb, GWBUF* read_buffer, int nbytes_read);
|
|
static spec_com_res_t handle_query_kill(DCB* dcb,
|
|
GWBUF* read_buffer,
|
|
spec_com_res_t current,
|
|
bool is_complete,
|
|
unsigned int packet_len);
|
|
static bool parse_kill_query(char* query, uint64_t* thread_id_out, kill_type_t* kt_out, std::string* user);
|
|
static void parse_and_set_trx_state(MXS_SESSION* ses, GWBUF* data);
|
|
/**
|
|
* The module entry point routine. It is this routine that
|
|
* must populate the structure that is referred to as the
|
|
* "module object", this is a structure with the set of
|
|
* external entry points for this module.
|
|
*
|
|
* @return The module object
|
|
*/
|
|
|
|
extern "C"
|
|
{
|
|
|
|
MXS_MODULE* MXS_CREATE_MODULE()
|
|
{
|
|
static MXS_PROTOCOL MyObject =
|
|
{
|
|
gw_read_client_event, /* Read - EPOLLIN handler */
|
|
gw_MySQLWrite_client, /* Write - data from gateway */
|
|
gw_write_client_event, /* WriteReady - EPOLLOUT handler */
|
|
gw_error_client_event, /* Error - EPOLLERR handler */
|
|
gw_client_hangup_event, /* HangUp - EPOLLHUP handler */
|
|
gw_MySQLAccept, /* Accept */
|
|
NULL, /* Connect */
|
|
gw_client_close, /* Close */
|
|
gw_MySQLListener, /* Listen */
|
|
NULL, /* Authentication */
|
|
gw_default_auth, /* Default authenticator */
|
|
gw_connection_limit, /* Send error connection limit */
|
|
NULL,
|
|
NULL
|
|
};
|
|
|
|
static MXS_MODULE info =
|
|
{
|
|
MXS_MODULE_API_PROTOCOL,
|
|
MXS_MODULE_GA,
|
|
MXS_PROTOCOL_VERSION,
|
|
"The client to MaxScale MySQL protocol implementation",
|
|
"V1.1.0",
|
|
MXS_NO_MODULE_CAPABILITIES,
|
|
&MyObject,
|
|
process_init,
|
|
process_finish,
|
|
thread_init,
|
|
thread_finish,
|
|
{
|
|
{MXS_END_MODULE_PARAMS}
|
|
}
|
|
};
|
|
|
|
return &info;
|
|
}
|
|
}
|
|
/*lint +e14 */
|
|
|
|
/**
|
|
* Performs process wide initialization.
|
|
*
|
|
* @return 0 if successful, non-zero otherwise.
|
|
*/
|
|
static int process_init(void)
|
|
{
|
|
int rv = mysql_library_init(0, NULL, NULL);
|
|
|
|
if (rv != 0)
|
|
{
|
|
MXS_ERROR("MySQL initialization failed, MariaDB MaxScale will exit. "
|
|
"MySQL Error: %d, %s.",
|
|
mysql_errno(NULL),
|
|
mysql_error(NULL));
|
|
}
|
|
|
|
return rv;
|
|
}
|
|
|
|
/**
|
|
* Performs process wide finalization.
|
|
*/
|
|
static void process_finish(void)
|
|
{
|
|
mysql_library_end();
|
|
}
|
|
|
|
/**
|
|
* Performs thread-specific initialization.
|
|
*
|
|
* @return 0 if successful, non-zero otherwise.
|
|
*/
|
|
static int thread_init(void)
|
|
{
|
|
int rv = mysql_thread_init();
|
|
|
|
if (rv != 0)
|
|
{
|
|
MXS_ERROR("MySQL thread initialization failed, the thread will exit.");
|
|
}
|
|
|
|
return rv;
|
|
}
|
|
|
|
/**
|
|
* Performs thread specific finalization.
|
|
*/
|
|
static void thread_finish(void)
|
|
{
|
|
mysql_thread_end();
|
|
}
|
|
|
|
/**
|
|
* The default authenticator name for this protocol
|
|
*
|
|
* @return name of authenticator
|
|
*/
|
|
static char* gw_default_auth()
|
|
{
|
|
return (char*)"MySQLAuth";
|
|
}
|
|
|
|
std::string get_version_string(SERVICE* service)
|
|
{
|
|
std::string rval = DEFAULT_VERSION_STRING;
|
|
|
|
if (service->version_string[0])
|
|
{
|
|
// User-defined version string, use it
|
|
rval = service->version_string;
|
|
}
|
|
else
|
|
{
|
|
uint64_t intver = UINT64_MAX;
|
|
|
|
for (SERVER_REF* ref = service->dbref; ref; ref = ref->next)
|
|
{
|
|
if (ref->server->version && ref->server->version < intver)
|
|
{
|
|
rval = ref->server->version_string;
|
|
intver = ref->server->version;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Older applications don't understand versions other than 5 and cause strange problems
|
|
if (rval[0] != '5')
|
|
{
|
|
const char prefix[] = "5.5.5-";
|
|
rval = prefix + rval;
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
/**
|
|
* MySQLSendHandshake
|
|
*
|
|
* @param dcb The descriptor control block to use for sending the handshake request
|
|
* @return The packet length sent
|
|
*/
|
|
int MySQLSendHandshake(DCB* dcb)
|
|
{
|
|
uint8_t* outbuf = NULL;
|
|
uint32_t mysql_payload_size = 0;
|
|
uint8_t mysql_packet_header[4];
|
|
uint8_t mysql_packet_id = 0;
|
|
/* uint8_t mysql_filler = GW_MYSQL_HANDSHAKE_FILLER; not needed*/
|
|
uint8_t mysql_protocol_version = GW_MYSQL_PROTOCOL_VERSION;
|
|
uint8_t* mysql_handshake_payload = NULL;
|
|
uint8_t mysql_thread_id_num[4];
|
|
uint8_t mysql_scramble_buf[9] = "";
|
|
uint8_t mysql_plugin_data[13] = "";
|
|
uint8_t mysql_server_capabilities_one[2];
|
|
uint8_t mysql_server_capabilities_two[2];
|
|
uint8_t mysql_server_language = 8;
|
|
uint8_t mysql_server_status[2];
|
|
uint8_t mysql_scramble_len = 21;
|
|
uint8_t mysql_filler_ten[10] = {};
|
|
/* uint8_t mysql_last_byte = 0x00; not needed */
|
|
char server_scramble[GW_MYSQL_SCRAMBLE_SIZE + 1] = "";
|
|
|
|
bool is_maria = false;
|
|
|
|
if (dcb->service->dbref)
|
|
{
|
|
mysql_server_language = dcb->service->dbref->server->charset;
|
|
|
|
if (dcb->service->dbref->server->version >= 100200)
|
|
{
|
|
/** The backend servers support the extended capabilities */
|
|
is_maria = true;
|
|
}
|
|
}
|
|
|
|
MySQLProtocol* protocol = DCB_PROTOCOL(dcb, MySQLProtocol);
|
|
GWBUF* buf;
|
|
std::string version = get_version_string(dcb->service);
|
|
|
|
gw_generate_random_str(server_scramble, GW_MYSQL_SCRAMBLE_SIZE);
|
|
|
|
// copy back to the caller
|
|
memcpy(protocol->scramble, server_scramble, GW_MYSQL_SCRAMBLE_SIZE);
|
|
|
|
if (is_maria)
|
|
{
|
|
/**
|
|
* The new 10.2 capability flags are stored in the last 4 bytes of the
|
|
* 10 byte filler block.
|
|
*/
|
|
uint32_t new_flags = MXS_MARIA_CAP_STMT_BULK_OPERATIONS;
|
|
memcpy(mysql_filler_ten + 6, &new_flags, sizeof(new_flags));
|
|
}
|
|
|
|
// Get the equivalent of the server thread id.
|
|
protocol->thread_id = session_get_next_id();
|
|
// Send only the low 32bits in the handshake.
|
|
gw_mysql_set_byte4(mysql_thread_id_num, (uint32_t)(protocol->thread_id));
|
|
memcpy(mysql_scramble_buf, server_scramble, 8);
|
|
|
|
memcpy(mysql_plugin_data, server_scramble + 8, 12);
|
|
|
|
/**
|
|
* Use the default authentication plugin name in the initial handshake. If the
|
|
* authenticator needs to change the authentication method, it should send
|
|
* an AuthSwitchRequest packet to the client.
|
|
*/
|
|
const char* plugin_name = DEFAULT_MYSQL_AUTH_PLUGIN;
|
|
int plugin_name_len = strlen(plugin_name);
|
|
|
|
mysql_payload_size =
|
|
sizeof(mysql_protocol_version) + (version.length() + 1) + sizeof(mysql_thread_id_num) + 8
|
|
+ sizeof( /* mysql_filler */ uint8_t) + sizeof(mysql_server_capabilities_one)
|
|
+ sizeof(mysql_server_language)
|
|
+ sizeof(mysql_server_status) + sizeof(mysql_server_capabilities_two) + sizeof(mysql_scramble_len)
|
|
+ sizeof(mysql_filler_ten) + 12 + sizeof( /* mysql_last_byte */ uint8_t) + plugin_name_len
|
|
+ sizeof( /* mysql_last_byte */ uint8_t);
|
|
|
|
// allocate memory for packet header + payload
|
|
if ((buf = gwbuf_alloc(sizeof(mysql_packet_header) + mysql_payload_size)) == NULL)
|
|
{
|
|
mxb_assert(buf != NULL);
|
|
return 0;
|
|
}
|
|
outbuf = GWBUF_DATA(buf);
|
|
|
|
// write packet header with mysql_payload_size
|
|
gw_mysql_set_byte3(mysql_packet_header, mysql_payload_size);
|
|
|
|
// write packet number, now is 0
|
|
mysql_packet_header[3] = mysql_packet_id;
|
|
memcpy(outbuf, mysql_packet_header, sizeof(mysql_packet_header));
|
|
|
|
// current buffer pointer
|
|
mysql_handshake_payload = outbuf + sizeof(mysql_packet_header);
|
|
|
|
// write protocol version
|
|
memcpy(mysql_handshake_payload, &mysql_protocol_version, sizeof(mysql_protocol_version));
|
|
mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_protocol_version);
|
|
|
|
// write server version plus 0 filler
|
|
strcpy((char*)mysql_handshake_payload, version.c_str());
|
|
mysql_handshake_payload = mysql_handshake_payload + version.length();
|
|
|
|
*mysql_handshake_payload = 0x00;
|
|
|
|
mysql_handshake_payload++;
|
|
|
|
// write thread id
|
|
memcpy(mysql_handshake_payload, mysql_thread_id_num, sizeof(mysql_thread_id_num));
|
|
mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_thread_id_num);
|
|
|
|
// write scramble buf
|
|
memcpy(mysql_handshake_payload, mysql_scramble_buf, 8);
|
|
mysql_handshake_payload = mysql_handshake_payload + 8;
|
|
*mysql_handshake_payload = GW_MYSQL_HANDSHAKE_FILLER;
|
|
mysql_handshake_payload++;
|
|
|
|
// write server capabilities part one
|
|
mysql_server_capabilities_one[0] = (uint8_t)GW_MYSQL_CAPABILITIES_SERVER;
|
|
mysql_server_capabilities_one[1] = (uint8_t)(GW_MYSQL_CAPABILITIES_SERVER >> 8);
|
|
|
|
if (is_maria)
|
|
{
|
|
/** A MariaDB 10.2 server doesn't send the CLIENT_MYSQL capability
|
|
* to signal that it supports extended capabilities */
|
|
mysql_server_capabilities_one[0] &= ~(uint8_t)GW_MYSQL_CAPABILITIES_CLIENT_MYSQL;
|
|
}
|
|
|
|
if (ssl_required_by_dcb(dcb))
|
|
{
|
|
mysql_server_capabilities_one[1] |= (int)GW_MYSQL_CAPABILITIES_SSL >> 8;
|
|
}
|
|
|
|
memcpy(mysql_handshake_payload, mysql_server_capabilities_one, sizeof(mysql_server_capabilities_one));
|
|
mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_server_capabilities_one);
|
|
|
|
// write server language
|
|
memcpy(mysql_handshake_payload, &mysql_server_language, sizeof(mysql_server_language));
|
|
mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_server_language);
|
|
|
|
// write server status
|
|
mysql_server_status[0] = 2;
|
|
mysql_server_status[1] = 0;
|
|
memcpy(mysql_handshake_payload, mysql_server_status, sizeof(mysql_server_status));
|
|
mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_server_status);
|
|
|
|
// write server capabilities part two
|
|
mysql_server_capabilities_two[0] = (uint8_t)(GW_MYSQL_CAPABILITIES_SERVER >> 16);
|
|
mysql_server_capabilities_two[1] = (uint8_t)(GW_MYSQL_CAPABILITIES_SERVER >> 24);
|
|
|
|
// Check that we match the old values
|
|
mxb_assert(mysql_server_capabilities_two[0] == 15);
|
|
/** NOTE: pre-2.1 versions sent the fourth byte of the capabilities as
|
|
* the value 128 even though there's no such capability. */
|
|
|
|
memcpy(mysql_handshake_payload, mysql_server_capabilities_two, sizeof(mysql_server_capabilities_two));
|
|
mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_server_capabilities_two);
|
|
|
|
// write scramble_len
|
|
memcpy(mysql_handshake_payload, &mysql_scramble_len, sizeof(mysql_scramble_len));
|
|
mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_scramble_len);
|
|
|
|
// write 10 filler
|
|
memcpy(mysql_handshake_payload, mysql_filler_ten, sizeof(mysql_filler_ten));
|
|
mysql_handshake_payload = mysql_handshake_payload + sizeof(mysql_filler_ten);
|
|
|
|
// write plugin data
|
|
memcpy(mysql_handshake_payload, mysql_plugin_data, 12);
|
|
mysql_handshake_payload = mysql_handshake_payload + 12;
|
|
|
|
// write last byte, 0
|
|
*mysql_handshake_payload = 0x00;
|
|
mysql_handshake_payload++;
|
|
|
|
// to be understanded ????
|
|
memcpy(mysql_handshake_payload, plugin_name, plugin_name_len);
|
|
mysql_handshake_payload = mysql_handshake_payload + plugin_name_len;
|
|
|
|
// write last byte, 0
|
|
*mysql_handshake_payload = 0x00;
|
|
|
|
// writing data in the Client buffer queue
|
|
dcb->func.write(dcb, buf);
|
|
protocol->protocol_auth_state = MXS_AUTH_STATE_MESSAGE_READ;
|
|
|
|
return sizeof(mysql_packet_header) + mysql_payload_size;
|
|
}
|
|
|
|
/**
|
|
* Write function for client DCB: writes data from MaxScale to Client
|
|
*
|
|
* @param dcb The DCB of the client
|
|
* @param queue Queue of buffers to write
|
|
*/
|
|
int gw_MySQLWrite_client(DCB* dcb, GWBUF* queue)
|
|
{
|
|
if (GWBUF_IS_REPLY_OK(queue) && dcb->service->session_track_trx_state)
|
|
{
|
|
parse_and_set_trx_state(dcb->session, queue);
|
|
}
|
|
return dcb_write(dcb, queue);
|
|
}
|
|
|
|
/**
|
|
* @brief Client read event triggered by EPOLLIN
|
|
*
|
|
* @param dcb Descriptor control block
|
|
* @return 0 if succeed, 1 otherwise
|
|
*/
|
|
int gw_read_client_event(DCB* dcb)
|
|
{
|
|
MySQLProtocol* protocol;
|
|
GWBUF* read_buffer = NULL;
|
|
int return_code = 0;
|
|
uint32_t nbytes_read = 0;
|
|
uint32_t max_bytes = 0;
|
|
|
|
if (dcb->dcb_role != DCB_ROLE_CLIENT_HANDLER)
|
|
{
|
|
MXS_ERROR("DCB must be a client handler for MySQL client protocol.");
|
|
return 1;
|
|
}
|
|
|
|
protocol = (MySQLProtocol*)dcb->protocol;
|
|
|
|
MXS_DEBUG("Protocol state: %s", gw_mysql_protocol_state2string(protocol->protocol_auth_state));
|
|
|
|
/**
|
|
* The use of max_bytes seems like a hack, but no better option is available
|
|
* at the time of writing. When a MySQL server receives a new connection
|
|
* request, it sends an Initial Handshake Packet. Where the client wants to
|
|
* use SSL, it responds with an SSL Request Packet (in place of a Handshake
|
|
* Response Packet). The SSL Request Packet contains only the basic header,
|
|
* and not the user credentials. It is 36 bytes long. The server then
|
|
* initiates the SSL handshake (via calls to OpenSSL).
|
|
*
|
|
* In many cases, this is what happens. But occasionally, the client seems
|
|
* to send a packet much larger than 36 bytes (in tests it was 333 bytes).
|
|
* If the whole of the packet is read, it is then lost to the SSL handshake
|
|
* process. Why this happens is presently unknown. Reading just 36 bytes
|
|
* when the server requires SSL and SSL has not yet been negotiated seems
|
|
* to solve the problem.
|
|
*
|
|
* If a neater solution can be found, so much the better.
|
|
*/
|
|
if (ssl_required_but_not_negotiated(dcb))
|
|
{
|
|
max_bytes = 36;
|
|
}
|
|
return_code = dcb_read(dcb, &read_buffer, max_bytes);
|
|
if (return_code < 0)
|
|
{
|
|
dcb_close(dcb);
|
|
}
|
|
if (0 == (nbytes_read = gwbuf_length(read_buffer)))
|
|
{
|
|
return return_code;
|
|
}
|
|
|
|
return_code = 0;
|
|
|
|
switch (protocol->protocol_auth_state)
|
|
{
|
|
/**
|
|
*
|
|
* When a listener receives a new connection request, it creates a
|
|
* request handler DCB to for the client connection. The listener also
|
|
* sends the initial authentication request to the client. The first
|
|
* time this function is called from the poll loop, the client reply
|
|
* to the authentication request should be available.
|
|
*
|
|
* If the authentication is successful the protocol authentication state
|
|
* will be changed to MYSQL_IDLE (see below).
|
|
*
|
|
*/
|
|
case MXS_AUTH_STATE_MESSAGE_READ:
|
|
if (nbytes_read < 3
|
|
|| (0 == max_bytes && nbytes_read < MYSQL_GET_PACKET_LEN(read_buffer))
|
|
|| (0 != max_bytes && nbytes_read < max_bytes))
|
|
{
|
|
dcb_readq_append(dcb, read_buffer);
|
|
}
|
|
else
|
|
{
|
|
if (nbytes_read > MYSQL_GET_PACKET_LEN(read_buffer))
|
|
{
|
|
// We read more data than was needed
|
|
dcb_readq_append(dcb, read_buffer);
|
|
read_buffer = modutil_get_next_MySQL_packet(&dcb->readq);
|
|
}
|
|
|
|
return_code = gw_read_do_authentication(dcb, read_buffer, nbytes_read);
|
|
}
|
|
break;
|
|
|
|
/**
|
|
*
|
|
* Once a client connection is authenticated, the protocol authentication
|
|
* state will be MYSQL_IDLE and so every event of data received will
|
|
* result in a call that comes to this section of code.
|
|
*
|
|
*/
|
|
case MXS_AUTH_STATE_COMPLETE:
|
|
/* After this call read_buffer will point to freed data */
|
|
return_code = gw_read_normal_data(dcb, read_buffer, nbytes_read);
|
|
break;
|
|
|
|
case MXS_AUTH_STATE_FAILED:
|
|
gwbuf_free(read_buffer);
|
|
return_code = 1;
|
|
break;
|
|
|
|
default:
|
|
MXS_ERROR("In mysql_client.c unexpected protocol authentication state");
|
|
break;
|
|
}
|
|
|
|
return return_code;
|
|
}
|
|
|
|
/**
|
|
* Get length of a null-terminated string
|
|
*
|
|
* @param str String to measure
|
|
* @param len Maximum length to read
|
|
*
|
|
* @return Length of @c str or -1 if the string is not null-terminated
|
|
*/
|
|
static int get_zstr_len(const char* str, int len)
|
|
{
|
|
const char* end = str + len;
|
|
int slen = 0;
|
|
|
|
while (str < end && *str)
|
|
{
|
|
str++;
|
|
slen++;
|
|
}
|
|
|
|
if (str == end)
|
|
{
|
|
// The string is not null terminated
|
|
slen = -1;
|
|
}
|
|
|
|
return slen;
|
|
}
|
|
|
|
/**
|
|
* @brief Store client connection information into the DCB
|
|
* @param dcb Client DCB
|
|
* @param buffer Buffer containing the handshake response packet
|
|
*/
|
|
static void store_client_information(DCB* dcb, GWBUF* buffer)
|
|
{
|
|
size_t len = gwbuf_length(buffer);
|
|
uint8_t data[len];
|
|
MySQLProtocol* proto = (MySQLProtocol*)dcb->protocol;
|
|
MYSQL_session* ses = (MYSQL_session*)dcb->data;
|
|
|
|
gwbuf_copy_data(buffer, 0, len, data);
|
|
mxb_assert(MYSQL_GET_PAYLOAD_LEN(data) + MYSQL_HEADER_LEN == len
|
|
|| len == MYSQL_AUTH_PACKET_BASE_SIZE); // For SSL request packet
|
|
|
|
// We OR the capability bits in order to retain the starting bits sent
|
|
// when an SSL connection is opened. Oracle Connector/J 8.0 appears to drop
|
|
// the SSL capability bit mid-authentication which causes MaxScale to think
|
|
// that SSL is not used.
|
|
proto->client_capabilities |= gw_mysql_get_byte4(data + MYSQL_CLIENT_CAP_OFFSET);
|
|
proto->charset = data[MYSQL_CHARSET_OFFSET];
|
|
|
|
/** MariaDB 10.2 compatible clients don't set the first bit to signal that
|
|
* there are extra capabilities stored in the last 4 bytes of the 23 byte filler. */
|
|
if ((proto->client_capabilities & GW_MYSQL_CAPABILITIES_CLIENT_MYSQL) == 0)
|
|
{
|
|
proto->extra_capabilities = gw_mysql_get_byte4(data + MARIADB_CAP_OFFSET);
|
|
}
|
|
|
|
if (len > MYSQL_AUTH_PACKET_BASE_SIZE)
|
|
{
|
|
const char* username = (const char*)data + MYSQL_AUTH_PACKET_BASE_SIZE;
|
|
int userlen = get_zstr_len(username, len - MYSQL_AUTH_PACKET_BASE_SIZE);
|
|
|
|
if (userlen != -1)
|
|
{
|
|
if ((int)sizeof(ses->user) > userlen)
|
|
{
|
|
strcpy(ses->user, username);
|
|
}
|
|
|
|
// Include the null terminator in the user length
|
|
userlen++;
|
|
|
|
if (proto->client_capabilities & GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB)
|
|
{
|
|
/** Client is connecting with a default database */
|
|
uint8_t authlen = data[MYSQL_AUTH_PACKET_BASE_SIZE + userlen];
|
|
size_t dboffset = MYSQL_AUTH_PACKET_BASE_SIZE + userlen + authlen + 1;
|
|
|
|
if (dboffset < len)
|
|
{
|
|
int dblen = get_zstr_len((const char*)data + dboffset, len - dboffset);
|
|
|
|
if (dblen != -1 && (int)sizeof(ses->db) > dblen)
|
|
{
|
|
strcpy(ses->db, (const char*)data + dboffset);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Debug check function for authentication packets
|
|
*
|
|
* Check that the packet is consistent with how the protocol works and that no
|
|
* unexpected data is processed.
|
|
*
|
|
* @param dcb Client DCB
|
|
* @param buf Buffer containing packet
|
|
* @param bytes Number of bytes available
|
|
*/
|
|
static void check_packet(DCB* dcb, GWBUF* buf, int bytes)
|
|
{
|
|
uint8_t hdr[MYSQL_HEADER_LEN];
|
|
mxb_assert(gwbuf_copy_data(buf, 0, MYSQL_HEADER_LEN, hdr) == MYSQL_HEADER_LEN);
|
|
|
|
int buflen = gwbuf_length(buf);
|
|
int pktlen = MYSQL_GET_PAYLOAD_LEN(hdr) + MYSQL_HEADER_LEN;
|
|
|
|
if (bytes == MYSQL_AUTH_PACKET_BASE_SIZE)
|
|
{
|
|
/** This is an SSL request packet */
|
|
mxb_assert(dcb->listener->ssl);
|
|
mxb_assert(buflen == bytes && pktlen >= buflen);
|
|
}
|
|
else
|
|
{
|
|
/** Normal packet */
|
|
mxb_assert(buflen == pktlen);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Client read event, process when client not yet authenticated
|
|
*
|
|
* @param dcb Descriptor control block
|
|
* @param read_buffer A buffer containing the data read from client
|
|
* @param nbytes_read The number of bytes of data read
|
|
* @return 0 if succeed, 1 otherwise
|
|
*/
|
|
static int gw_read_do_authentication(DCB* dcb, GWBUF* read_buffer, int nbytes_read)
|
|
{
|
|
MXB_AT_DEBUG(check_packet(dcb, read_buffer, nbytes_read));
|
|
|
|
/** Allocate the shared session structure */
|
|
if (dcb->data == NULL && (dcb->data = mysql_session_alloc()) == NULL)
|
|
{
|
|
dcb_close(dcb);
|
|
return 1;
|
|
}
|
|
|
|
/** Read the client's packet sequence and increment that by one */
|
|
uint8_t next_sequence;
|
|
gwbuf_copy_data(read_buffer, MYSQL_SEQ_OFFSET, 1, &next_sequence);
|
|
|
|
if (next_sequence == 1 || (ssl_required_by_dcb(dcb) && next_sequence == 2))
|
|
{
|
|
/** This is the first response from the client, read the connection
|
|
* information and store them in the shared structure. For SSL connections,
|
|
* this will be packet number two since the first packet will be the
|
|
* Protocol::SSLRequest packet.
|
|
*
|
|
* @see
|
|
* https://dev.mysql.com/doc/internals/en/connection-phase-packets.html#packet-Protocol::SSLRequest
|
|
*/
|
|
store_client_information(dcb, read_buffer);
|
|
}
|
|
|
|
next_sequence++;
|
|
|
|
/**
|
|
* The first step in the authentication process is to extract the
|
|
* relevant information from the buffer supplied and place it
|
|
* into a data structure pointed to by the DCB. The "success"
|
|
* result is not final, it implies only that the process is so
|
|
* far successful, not that authentication has completed. If the
|
|
* data extraction succeeds, then a call is made to the actual
|
|
* authenticate function to carry out the user checks.
|
|
*/
|
|
int auth_val = MXS_AUTH_FAILED;
|
|
if (dcb->authfunc.extract(dcb, read_buffer))
|
|
{
|
|
auth_val = ssl_authenticate_check_status(dcb);
|
|
|
|
if (auth_val == MXS_AUTH_SSL_COMPLETE)
|
|
{
|
|
// TLS connection phase complete
|
|
auth_val = dcb->authfunc.authenticate(dcb);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
auth_val = MXS_AUTH_BAD_HANDSHAKE;
|
|
}
|
|
|
|
MySQLProtocol* protocol = (MySQLProtocol*)dcb->protocol;
|
|
|
|
/**
|
|
* At this point, if the auth_val return code indicates success
|
|
* the user authentication has been successfully completed.
|
|
* But in order to have a working connection, a session has to
|
|
* be created. Provided that is also successful (indicated by a
|
|
* non-null session) then the whole process has succeeded. In all
|
|
* other cases an error return is made.
|
|
*/
|
|
if (MXS_AUTH_SUCCEEDED == auth_val)
|
|
{
|
|
if (dcb->user == NULL)
|
|
{
|
|
/** User authentication complete, copy the username to the DCB */
|
|
MYSQL_session* ses = (MYSQL_session*)dcb->data;
|
|
if ((dcb->user = MXS_STRDUP(ses->user)) == NULL)
|
|
{
|
|
dcb_close(dcb);
|
|
gwbuf_free(read_buffer);
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
protocol->protocol_auth_state = MXS_AUTH_STATE_RESPONSE_SENT;
|
|
/**
|
|
* Create session, and a router session for it.
|
|
* If successful, there will be backend connection(s)
|
|
* after this point. The protocol authentication state
|
|
* is changed so that future data will go through the
|
|
* normal data handling function instead of this one.
|
|
*/
|
|
MXS_SESSION* session =
|
|
session_alloc_with_id(dcb->service, dcb, protocol->thread_id);
|
|
|
|
if (session != NULL)
|
|
{
|
|
mxb_assert(session->state != SESSION_STATE_ALLOC
|
|
&& session->state != SESSION_STATE_DUMMY);
|
|
// For the time being only the sql_mode is stored in MXS_SESSION::client_protocol_data.
|
|
session->client_protocol_data = QC_SQL_MODE_DEFAULT;
|
|
protocol->protocol_auth_state = MXS_AUTH_STATE_COMPLETE;
|
|
MXB_AT_DEBUG(bool check = ) mxs_rworker_register_session(session);
|
|
mxb_assert(check);
|
|
mxs_mysql_send_ok(dcb, next_sequence, 0, NULL);
|
|
|
|
if (dcb->readq)
|
|
{
|
|
// The user has already send more data, process it
|
|
poll_fake_read_event(dcb);
|
|
}
|
|
}
|
|
else
|
|
{
|
|
auth_val = MXS_AUTH_NO_SESSION;
|
|
}
|
|
}
|
|
/**
|
|
* If we did not get success throughout or authentication is not yet complete,
|
|
* then the protocol state is updated, the client is notified of the failure
|
|
* and the DCB is closed.
|
|
*/
|
|
if (MXS_AUTH_SUCCEEDED != auth_val
|
|
&& MXS_AUTH_INCOMPLETE != auth_val
|
|
&& MXS_AUTH_SSL_INCOMPLETE != auth_val)
|
|
{
|
|
protocol->protocol_auth_state = MXS_AUTH_STATE_FAILED;
|
|
mysql_client_auth_error_handling(dcb, auth_val, next_sequence);
|
|
/**
|
|
* Close DCB and which will release MYSQL_session
|
|
*/
|
|
dcb_close(dcb);
|
|
}
|
|
/* One way or another, the buffer is now fully processed */
|
|
gwbuf_free(read_buffer);
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* Helper function to split and store the buffer
|
|
* @param client_dcb Client DCB
|
|
* @param queue Buffer to split
|
|
* @param offset Offset where the split is made
|
|
* @return The first part of the buffer
|
|
*/
|
|
static GWBUF* split_and_store(DCB* client_dcb, GWBUF* queue, int offset)
|
|
{
|
|
GWBUF* newbuf = gwbuf_split(&queue, offset);
|
|
dcb_readq_append(client_dcb, queue);
|
|
return newbuf;
|
|
}
|
|
|
|
/**
|
|
* @brief Check if the DCB is idle from the protocol's point of view
|
|
*
|
|
* This checks if all expected data from the DCB has been read. The values
|
|
* prefixed with @c protocol_ should be manipulated by the protocol modules.
|
|
*
|
|
* @param dcb DCB to check
|
|
* @return True if the DCB protocol is not expecting any data
|
|
*/
|
|
static bool protocol_is_idle(DCB* dcb)
|
|
{
|
|
return dcb->protocol_bytes_processed == dcb->protocol_packet_length;
|
|
}
|
|
|
|
/**
|
|
* @brief Process the commands the client is executing
|
|
*
|
|
* The data read from the network is not guaranteed to contain a complete MySQL
|
|
* packet. This means that it is possible that a command sent by the client is
|
|
* split across multiple network packets and those packets need to be processed
|
|
* individually.
|
|
*
|
|
* The forwarding of the data to the routers starts once the length and command
|
|
* bytes have been read. The @c current_command field of the protocol
|
|
* structure is guaranteed to always represent the current command being executed
|
|
* by the client.
|
|
*
|
|
* Currently the gathered information is used by the readconnroute module to
|
|
* detect COM_CHANGE_USER packets.
|
|
*
|
|
* @param dcb Client MySQL protocol struct
|
|
* @param bytes_available Number of bytes available
|
|
* @param queue Data written by the client
|
|
* @return True if routing can proceed, false if processing should be attempted
|
|
* later when more data is available
|
|
*/
|
|
static bool process_client_commands(DCB* dcb, int bytes_available, GWBUF** buffer)
|
|
{
|
|
GWBUF* queue = *buffer;
|
|
|
|
/** Make sure we have enough data if the client is sending a new command */
|
|
if (protocol_is_idle(dcb) && bytes_available < MYSQL_HEADER_LEN)
|
|
{
|
|
dcb_readq_append(dcb, queue);
|
|
return false;
|
|
}
|
|
|
|
int offset = 0;
|
|
|
|
while (bytes_available)
|
|
{
|
|
if (protocol_is_idle(dcb))
|
|
{
|
|
int pktlen;
|
|
uint8_t cmd = (uint8_t)MXS_COM_QUERY; // Treat empty packets as COM_QUERY
|
|
|
|
uint8_t packet_header[MYSQL_HEADER_LEN];
|
|
|
|
if (gwbuf_copy_data(queue, offset, MYSQL_HEADER_LEN, packet_header) != MYSQL_HEADER_LEN)
|
|
{
|
|
mxb_assert(offset > 0);
|
|
queue = split_and_store(dcb, queue, offset);
|
|
break;
|
|
}
|
|
|
|
pktlen = gw_mysql_get_byte3(packet_header);
|
|
|
|
/**
|
|
* Check if the packet is empty, and if not, if we have the command byte.
|
|
* If we an empty packet or have at least 5 bytes of data, we can start
|
|
* sending the data to the router.
|
|
*/
|
|
if (pktlen && gwbuf_copy_data(queue, offset + MYSQL_HEADER_LEN, 1, &cmd) != 1)
|
|
{
|
|
if ((queue = split_and_store(dcb, queue, offset)) == NULL)
|
|
{
|
|
mxb_assert(bytes_available - offset == MYSQL_HEADER_LEN);
|
|
return false;
|
|
}
|
|
mxb_assert(offset > 0);
|
|
break;
|
|
}
|
|
|
|
MySQLProtocol* proto = (MySQLProtocol*)dcb->protocol;
|
|
if (dcb->protocol_packet_length - MYSQL_HEADER_LEN != GW_MYSQL_MAX_PACKET_LEN)
|
|
{
|
|
/** We're processing the first packet of a command */
|
|
proto->current_command = (mxs_mysql_cmd_t)cmd;
|
|
}
|
|
|
|
dcb->protocol_packet_length = pktlen + MYSQL_HEADER_LEN;
|
|
dcb->protocol_bytes_processed = 0;
|
|
}
|
|
|
|
int bytes_needed = dcb->protocol_packet_length - dcb->protocol_bytes_processed;
|
|
int packet_bytes = bytes_needed <= bytes_available ? bytes_needed : bytes_available;
|
|
|
|
bytes_available -= packet_bytes;
|
|
dcb->protocol_bytes_processed += packet_bytes;
|
|
offset += packet_bytes;
|
|
mxb_assert(dcb->protocol_bytes_processed <= dcb->protocol_packet_length);
|
|
}
|
|
|
|
mxb_assert(bytes_available >= 0);
|
|
mxb_assert(queue);
|
|
*buffer = queue;
|
|
return true;
|
|
}
|
|
|
|
/**
|
|
* Handle relevant variables
|
|
*
|
|
* @param session The session for which the query classifier mode is adjusted.
|
|
* @param read_buffer Pointer to a buffer, assumed to contain a statement.
|
|
* May be reallocated if not contiguous.
|
|
*
|
|
* @return NULL if successful, otherwise dynamically allocated error message.
|
|
*/
|
|
char* handle_variables(MXS_SESSION* session, GWBUF** read_buffer)
|
|
{
|
|
char* message = NULL;
|
|
|
|
SetParser set_parser;
|
|
SetParser::Result result;
|
|
|
|
switch (set_parser.check(read_buffer, &result))
|
|
{
|
|
case SetParser::ERROR:
|
|
// In practice only OOM.
|
|
break;
|
|
|
|
case SetParser::IS_SET_SQL_MODE:
|
|
{
|
|
SqlModeParser sql_mode_parser;
|
|
|
|
const SetParser::Result::Items& values = result.values();
|
|
|
|
for (SetParser::Result::Items::const_iterator i = values.begin(); i != values.end(); ++i)
|
|
{
|
|
const SetParser::Result::Item& value = *i;
|
|
|
|
switch (sql_mode_parser.get_sql_mode(value.first, value.second))
|
|
{
|
|
case SqlModeParser::ORACLE:
|
|
session_set_autocommit(session, false);
|
|
session->client_protocol_data = QC_SQL_MODE_ORACLE;
|
|
break;
|
|
|
|
case SqlModeParser::DEFAULT:
|
|
session_set_autocommit(session, true);
|
|
session->client_protocol_data = QC_SQL_MODE_DEFAULT;
|
|
break;
|
|
|
|
case SqlModeParser::SOMETHING:
|
|
break;
|
|
|
|
default:
|
|
mxb_assert(!true);
|
|
}
|
|
}
|
|
}
|
|
break;
|
|
|
|
case SetParser::IS_SET_MAXSCALE:
|
|
{
|
|
const SetParser::Result::Items& variables = result.variables();
|
|
const SetParser::Result::Items& values = result.values();
|
|
|
|
SetParser::Result::Items::const_iterator i = variables.begin();
|
|
SetParser::Result::Items::const_iterator j = values.begin();
|
|
|
|
while (!message && (i != variables.end()))
|
|
{
|
|
const SetParser::Result::Item& variable = *i;
|
|
const SetParser::Result::Item& value = *j;
|
|
|
|
message = session_set_variable_value(session,
|
|
variable.first,
|
|
variable.second,
|
|
value.first,
|
|
value.second);
|
|
|
|
++i;
|
|
++j;
|
|
}
|
|
}
|
|
break;
|
|
|
|
case SetParser::NOT_RELEVANT:
|
|
break;
|
|
|
|
default:
|
|
mxb_assert(!true);
|
|
}
|
|
|
|
return message;
|
|
}
|
|
|
|
/**
|
|
* @brief Client read event, process data, client already authenticated
|
|
*
|
|
* First do some checks and get the router capabilities. If the router
|
|
* wants to process each individual statement, then the data must be split
|
|
* into individual SQL statements. Any data that is left over is held in the
|
|
* DCB read queue.
|
|
*
|
|
* Finally, the general client data processing function is called.
|
|
*
|
|
* @param dcb Descriptor control block
|
|
* @param read_buffer A buffer containing the data read from client
|
|
* @param nbytes_read The number of bytes of data read
|
|
* @return 0 if succeed, 1 otherwise
|
|
*/
|
|
static int gw_read_normal_data(DCB* dcb, GWBUF* read_buffer, int nbytes_read)
|
|
{
|
|
MXS_SESSION* session;
|
|
mxs_session_state_t session_state_value;
|
|
uint64_t capabilities = 0;
|
|
|
|
session = dcb->session;
|
|
session_state_value = session->state;
|
|
if (session_state_value != SESSION_STATE_ROUTER_READY)
|
|
{
|
|
if (session_state_value != SESSION_STATE_STOPPING)
|
|
{
|
|
MXS_ERROR("Session received a query in incorrect state %s",
|
|
STRSESSIONSTATE(session_state_value));
|
|
}
|
|
gwbuf_free(read_buffer);
|
|
dcb_close(dcb);
|
|
return 1;
|
|
}
|
|
|
|
/** Ask what type of input the router/filter chain expects */
|
|
capabilities = service_get_capabilities(session->service);
|
|
MySQLProtocol* proto = static_cast<MySQLProtocol*>(dcb->protocol);
|
|
|
|
/** If the router requires statement input we need to make sure that
|
|
* a complete SQL packet is read before continuing. The current command
|
|
* that is tracked by the protocol module is updated in route_by_statement() */
|
|
if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT)
|
|
|| proto->current_command == MXS_COM_CHANGE_USER)
|
|
{
|
|
uint8_t pktlen[MYSQL_HEADER_LEN];
|
|
size_t n_copied = gwbuf_copy_data(read_buffer, 0, MYSQL_HEADER_LEN, pktlen);
|
|
|
|
if (n_copied != sizeof(pktlen)
|
|
|| (uint32_t)nbytes_read < MYSQL_GET_PAYLOAD_LEN(pktlen) + MYSQL_HEADER_LEN)
|
|
{
|
|
dcb_readq_append(dcb, read_buffer);
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* Update the current command, required by KILL command processing.
|
|
* If a COM_CHANGE_USER is in progress, this must not be done as the client
|
|
* is sending authentication data that does not have the command byte.
|
|
*/
|
|
MySQLProtocol* proto = (MySQLProtocol*)dcb->protocol;
|
|
|
|
if (!proto->changing_user && !session_is_load_active(session))
|
|
{
|
|
proto->current_command = (mxs_mysql_cmd_t)mxs_mysql_get_command(read_buffer);
|
|
}
|
|
|
|
char* message = handle_variables(session, &read_buffer);
|
|
|
|
if (message)
|
|
{
|
|
int rv = dcb->func.write(dcb, modutil_create_mysql_err_msg(1, 0, 1193, "HY000", message));
|
|
MXS_FREE(message);
|
|
return rv;
|
|
}
|
|
else
|
|
{
|
|
// Must be done whether or not there were any changes, as the query classifier
|
|
// is thread and not session specific.
|
|
qc_set_sql_mode(static_cast<qc_sql_mode_t>(session->client_protocol_data));
|
|
}
|
|
}
|
|
/** Update the current protocol command being executed */
|
|
else if (!process_client_commands(dcb, nbytes_read, &read_buffer))
|
|
{
|
|
return 0;
|
|
}
|
|
|
|
/** The query classifier classifies according to the service's server that has
|
|
* the smallest version number. */
|
|
qc_set_server_version(service_get_version(session->service, SERVICE_VERSION_MIN));
|
|
|
|
spec_com_res_t res = RES_CONTINUE;
|
|
|
|
if (!proto->changing_user)
|
|
{
|
|
res = process_special_commands(dcb, read_buffer, nbytes_read);
|
|
}
|
|
|
|
int rval = 1;
|
|
switch (res)
|
|
{
|
|
case RES_MORE_DATA:
|
|
dcb_readq_set(dcb, read_buffer);
|
|
rval = 0;
|
|
break;
|
|
|
|
case RES_END:
|
|
// Do not send this packet for routing
|
|
gwbuf_free(read_buffer);
|
|
rval = 0;
|
|
break;
|
|
|
|
case RES_CONTINUE:
|
|
rval = gw_read_finish_processing(dcb, read_buffer, capabilities);
|
|
break;
|
|
|
|
default:
|
|
mxb_assert(!true);
|
|
}
|
|
return rval;
|
|
}
|
|
|
|
/**
|
|
* Check if a connection qualifies to be added into the persistent connection pool
|
|
*
|
|
* @param dcb The client DCB to check
|
|
*/
|
|
void check_pool_candidate(DCB* dcb)
|
|
{
|
|
MXS_SESSION* session = dcb->session;
|
|
MySQLProtocol* proto = (MySQLProtocol*)dcb->protocol;
|
|
|
|
if (proto->current_command == MXS_COM_QUIT)
|
|
{
|
|
/** The client is closing the connection. We know that this will be the
|
|
* last command the client sends so the backend connections are very likely
|
|
* to be in an idle state.
|
|
*
|
|
* If the client is pipelining the queries (i.e. sending N request as
|
|
* a batch and then expecting N responses) then it is possible that
|
|
* the backend connections are not idle when the COM_QUIT is received.
|
|
* In most cases we can assume that the connections are idle. */
|
|
session_qualify_for_pool(session);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @brief Client read event, common processing after single statement handling
|
|
*
|
|
* @param dcb Descriptor control block
|
|
* @param read_buffer A buffer containing the data read from client
|
|
* @param capabilities The router capabilities flags
|
|
* @return 0 if succeed, 1 otherwise
|
|
*/
|
|
static int gw_read_finish_processing(DCB* dcb, GWBUF* read_buffer, uint64_t capabilities)
|
|
{
|
|
MXS_SESSION* session = dcb->session;
|
|
uint8_t* payload = GWBUF_DATA(read_buffer);
|
|
MySQLProtocol* proto = (MySQLProtocol*)dcb->protocol;
|
|
int return_code = 0;
|
|
|
|
/** Reset error handler when routing of the new query begins */
|
|
dcb->dcb_errhandle_called = false;
|
|
|
|
if (rcap_type_required(capabilities, RCAP_TYPE_STMT_INPUT)
|
|
|| proto->current_command == MXS_COM_CHANGE_USER)
|
|
{
|
|
/**
|
|
* Feed each statement completely and separately to router.
|
|
*/
|
|
return_code = route_by_statement(session, capabilities, &read_buffer) ? 0 : 1;
|
|
|
|
if (read_buffer != NULL)
|
|
{
|
|
/*
|
|
* Must have been data left over
|
|
* Add incomplete mysql packet to read queue
|
|
*/
|
|
|
|
dcb_readq_append(dcb, read_buffer);
|
|
}
|
|
}
|
|
else if (NULL != session->router_session || (rcap_type_required(capabilities, RCAP_TYPE_NO_RSESSION)))
|
|
{
|
|
/** Check if this connection qualifies for the connection pool */
|
|
check_pool_candidate(dcb);
|
|
|
|
/** Feed the whole buffer to the router */
|
|
return_code = MXS_SESSION_ROUTE_QUERY(session, read_buffer) ? 0 : 1;
|
|
}
|
|
/*
|
|
* else return_code is still 0 from when it was originally set
|
|
* Note that read_buffer has been freed or transferred by this point
|
|
*/
|
|
|
|
if (return_code != 0)
|
|
{
|
|
/** Routing failed, close the client connection */
|
|
dcb->session->close_reason = SESSION_CLOSE_ROUTING_FAILED;
|
|
dcb_close(dcb);
|
|
MXS_ERROR("Routing the query failed. Session will be closed.");
|
|
}
|
|
else if (proto->current_command == MXS_COM_QUIT)
|
|
{
|
|
/** Close router session which causes closing of backends */
|
|
mxb_assert_message(session_valid_for_pool(dcb->session), "Session should qualify for pooling");
|
|
dcb_close(dcb);
|
|
}
|
|
|
|
return return_code;
|
|
}
|
|
|
|
/**
|
|
* @brief Analyse authentication errors and write appropriate log messages
|
|
*
|
|
* @param dcb Request handler DCB connected to the client
|
|
* @param auth_val The type of authentication failure
|
|
* @note Authentication status codes are defined in maxscale/protocol/mysql.h
|
|
*/
|
|
static void mysql_client_auth_error_handling(DCB* dcb, int auth_val, int packet_number)
|
|
{
|
|
int message_len;
|
|
char* fail_str = NULL;
|
|
MYSQL_session* session = (MYSQL_session*)dcb->data;
|
|
|
|
switch (auth_val)
|
|
{
|
|
case MXS_AUTH_NO_SESSION:
|
|
MXS_DEBUG("session creation failed. fd %d, state = MYSQL_AUTH_NO_SESSION.", dcb->fd);
|
|
|
|
/** Send ERR 1045 to client */
|
|
mysql_send_auth_error(dcb, packet_number, 0, "failed to create new session");
|
|
break;
|
|
|
|
case MXS_AUTH_FAILED_DB:
|
|
MXS_DEBUG("database specified was not valid. fd %d, state = MYSQL_FAILED_AUTH_DB.", dcb->fd);
|
|
/** Send error 1049 to client */
|
|
message_len = 25 + MYSQL_DATABASE_MAXLEN;
|
|
|
|
fail_str = (char*)MXS_CALLOC(1, message_len + 1);
|
|
MXS_ABORT_IF_NULL(fail_str);
|
|
snprintf(fail_str, message_len, "Unknown database '%s'", session->db);
|
|
|
|
modutil_send_mysql_err_packet(dcb, packet_number, 0, 1049, "42000", fail_str);
|
|
break;
|
|
|
|
case MXS_AUTH_FAILED_SSL:
|
|
MXS_DEBUG("client is "
|
|
"not SSL capable for SSL listener. fd %d, "
|
|
"state = MYSQL_FAILED_AUTH_SSL.",
|
|
dcb->fd);
|
|
|
|
/** Send ERR 1045 to client */
|
|
mysql_send_auth_error(dcb, packet_number, 0, "Access without SSL denied");
|
|
break;
|
|
|
|
case MXS_AUTH_SSL_INCOMPLETE:
|
|
MXS_DEBUG("unable to complete SSL authentication. fd %d, "
|
|
"state = MYSQL_AUTH_SSL_INCOMPLETE.",
|
|
dcb->fd);
|
|
|
|
/** Send ERR 1045 to client */
|
|
mysql_send_auth_error(dcb,
|
|
packet_number,
|
|
0,
|
|
"failed to complete SSL authentication");
|
|
break;
|
|
|
|
case MXS_AUTH_FAILED:
|
|
MXS_DEBUG("authentication failed. fd %d, state = MYSQL_FAILED_AUTH.", dcb->fd);
|
|
/** Send error 1045 to client */
|
|
fail_str = create_auth_fail_str(session->user,
|
|
dcb->remote,
|
|
session->auth_token_len > 0,
|
|
session->db,
|
|
auth_val);
|
|
modutil_send_mysql_err_packet(dcb, packet_number, 0, 1045, "28000", fail_str);
|
|
break;
|
|
|
|
case MXS_AUTH_BAD_HANDSHAKE:
|
|
modutil_send_mysql_err_packet(dcb, packet_number, 0, 1045, "08S01", "Bad handshake");
|
|
break;
|
|
|
|
default:
|
|
MXS_DEBUG("authentication failed. fd %d, state unrecognized.", dcb->fd);
|
|
/** Send error 1045 to client */
|
|
fail_str = create_auth_fail_str(session->user,
|
|
dcb->remote,
|
|
session->auth_token_len > 0,
|
|
session->db,
|
|
auth_val);
|
|
modutil_send_mysql_err_packet(dcb, packet_number, 0, 1045, "28000", fail_str);
|
|
}
|
|
MXS_FREE(fail_str);
|
|
}
|
|
|
|
static int gw_connection_limit(DCB* dcb, int limit)
|
|
{
|
|
return mysql_send_standard_error(dcb, 0, 1040, "Too many connections");
|
|
}
|
|
///////////////////////////////////////////////
|
|
// client write event to Client triggered by EPOLLOUT
|
|
//////////////////////////////////////////////
|
|
/**
|
|
* @node Client's fd became writable, and EPOLLOUT event
|
|
* arrived. As a consequence, client input buffer (writeq) is flushed.
|
|
*
|
|
* Parameters:
|
|
* @param dcb - in, use
|
|
* client dcb
|
|
*
|
|
* @return constantly 1
|
|
*
|
|
*
|
|
* @details (write detailed description here)
|
|
*
|
|
*/
|
|
int gw_write_client_event(DCB* dcb)
|
|
{
|
|
MySQLProtocol* protocol = NULL;
|
|
|
|
mxb_assert(dcb->state != DCB_STATE_DISCONNECTED);
|
|
|
|
if (dcb == NULL)
|
|
{
|
|
goto return_1;
|
|
}
|
|
|
|
if (dcb->state == DCB_STATE_DISCONNECTED)
|
|
{
|
|
goto return_1;
|
|
}
|
|
|
|
if (dcb->protocol == NULL)
|
|
{
|
|
goto return_1;
|
|
}
|
|
protocol = (MySQLProtocol*)dcb->protocol;
|
|
|
|
if (protocol->protocol_auth_state == MXS_AUTH_STATE_COMPLETE)
|
|
{
|
|
dcb_drain_writeq(dcb);
|
|
goto return_1;
|
|
}
|
|
|
|
return_1:
|
|
return 1;
|
|
}
|
|
|
|
/**
|
|
* Bind the DCB to a network port or a UNIX Domain Socket.
|
|
* @param listen_dcb Listener DCB
|
|
* @param config_bind Bind address in either IP:PORT format for network sockets or PATH
|
|
* for UNIX Domain Sockets
|
|
* @return 1 on success, 0 on error
|
|
*/
|
|
int gw_MySQLListener(DCB* listen_dcb, char* config_bind)
|
|
{
|
|
if (dcb_listen(listen_dcb, config_bind, "MySQL") < 0)
|
|
{
|
|
return 0;
|
|
}
|
|
listen_dcb->func.accept = gw_MySQLAccept;
|
|
|
|
return 1;
|
|
}
|
|
|
|
|
|
/**
|
|
* @node Accept a new connection, using the DCB code for the basic work
|
|
*
|
|
* For as long as dcb_accept can return new client DCBs for new connections,
|
|
* continue to loop. The code will always give a failure return, since it
|
|
* continues to try to create new connections until a failure occurs.
|
|
*
|
|
* @param listener - The Listener DCB that picks up new connection requests
|
|
* @return 0 in success, 1 in failure
|
|
*
|
|
*/
|
|
int gw_MySQLAccept(DCB* listener)
|
|
{
|
|
DCB* client_dcb;
|
|
|
|
while ((client_dcb = dcb_accept(listener)) != NULL)
|
|
{
|
|
gw_process_one_new_client(client_dcb);
|
|
} /**< while client_dcb != NULL */
|
|
|
|
/* Must have broken out of while loop or received NULL client_dcb */
|
|
return 1;
|
|
}
|
|
|
|
static void gw_process_one_new_client(DCB* client_dcb)
|
|
{
|
|
/**
|
|
* The worker who owns the DCB is chosen here, before any epoll events for it can be processed.
|
|
* This guarantees that the first event for the DCB is processed only after the following
|
|
* task has been processed by the owning thread.
|
|
*/
|
|
mxs::RoutingWorker* worker = mxs::RoutingWorker::pick_worker();
|
|
|
|
worker->execute([=]() {
|
|
client_dcb->protocol = mysql_protocol_init(client_dcb, client_dcb->fd);
|
|
MXS_ABORT_IF_NULL(client_dcb->protocol);
|
|
|
|
/**
|
|
* Set new descriptor to event set. At the same time,
|
|
* change state to DCB_STATE_POLLING so that
|
|
* thread which wakes up sees correct state.
|
|
*/
|
|
if (poll_add_dcb(client_dcb) == -1)
|
|
{
|
|
/* Send a custom error as MySQL command reply */
|
|
mysql_send_custom_error(client_dcb, 1, 0,
|
|
"MaxScale encountered system limit while "
|
|
"attempting to register on an epoll instance.");
|
|
|
|
/** close client_dcb */
|
|
dcb_close(client_dcb);
|
|
|
|
/** Previous state is recovered in poll_add_dcb. */
|
|
MXS_ERROR("Failed to add dcb %p for fd %d to epoll set.",
|
|
client_dcb, client_dcb->fd);
|
|
}
|
|
else
|
|
{
|
|
MySQLSendHandshake(client_dcb);
|
|
}
|
|
}, mxs::RoutingWorker::EXECUTE_AUTO);
|
|
}
|
|
|
|
static int gw_error_client_event(DCB* dcb)
|
|
{
|
|
MXS_SESSION* session;
|
|
|
|
session = dcb->session;
|
|
|
|
if (session != NULL && session->state == SESSION_STATE_STOPPING)
|
|
{
|
|
goto retblock;
|
|
}
|
|
|
|
#if defined (SS_DEBUG)
|
|
MXS_DEBUG("Client error event handling.");
|
|
#endif
|
|
dcb_close(dcb);
|
|
|
|
retblock:
|
|
return 1;
|
|
}
|
|
|
|
static int gw_client_close(DCB* dcb)
|
|
{
|
|
mxb_assert(dcb->protocol);
|
|
|
|
if (mysql_protocol_done(dcb))
|
|
{
|
|
MXS_SESSION* target = dcb->session;
|
|
|
|
if (target->state != SESSION_STATE_TO_BE_FREED
|
|
&& target->state != SESSION_STATE_DUMMY)
|
|
{
|
|
mxb_assert(target->state == SESSION_STATE_ROUTER_READY
|
|
|| target->state == SESSION_STATE_STOPPING);
|
|
MXB_AT_DEBUG(bool removed = ) mxs_rworker_deregister_session(target->ses_id);
|
|
mxb_assert(removed);
|
|
session_close(target);
|
|
}
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
|
|
/**
|
|
* Handle a hangup event on the client side descriptor.
|
|
*
|
|
* We simply close the DCB, this will propagate the closure to any
|
|
* backend descriptors and perform the session cleanup.
|
|
*
|
|
* @param dcb The DCB of the connection
|
|
*/
|
|
static int gw_client_hangup_event(DCB* dcb)
|
|
{
|
|
MXS_SESSION* session = dcb->session;
|
|
|
|
if (session)
|
|
{
|
|
if (session->state != SESSION_STATE_DUMMY && !session_valid_for_pool(session))
|
|
{
|
|
if (session_get_dump_statements() == SESSION_DUMP_STATEMENTS_ON_ERROR)
|
|
{
|
|
session_dump_statements(session);
|
|
}
|
|
|
|
// The client did not send a COM_QUIT packet
|
|
std::string errmsg {"Connection killed by MaxScale"};
|
|
std::string extra {session_get_close_reason(dcb->session)};
|
|
|
|
if (!extra.empty())
|
|
{
|
|
errmsg += ": " + extra;
|
|
}
|
|
|
|
modutil_send_mysql_err_packet(dcb, 0, 0, 1927, "08S01", errmsg.c_str());
|
|
}
|
|
dcb_close(dcb);
|
|
}
|
|
|
|
return 1;
|
|
}
|
|
|
|
/**
|
|
* Update protocol tracking information for an individual statement
|
|
*
|
|
* @param dcb Client DCB
|
|
* @param buffer Buffer containing a single packet
|
|
*/
|
|
void update_current_command(DCB* dcb, GWBUF* buffer)
|
|
{
|
|
MySQLProtocol* proto = (MySQLProtocol*)dcb->protocol;
|
|
uint8_t cmd = (uint8_t)MXS_COM_QUERY;
|
|
|
|
/**
|
|
* As we are routing individual packets, we can extract the command byte here.
|
|
* Empty packets are treated as COM_QUERY packets by default.
|
|
*/
|
|
gwbuf_copy_data(buffer, MYSQL_HEADER_LEN, 1, &cmd);
|
|
proto->current_command = (mxs_mysql_cmd_t)cmd;
|
|
|
|
/**
|
|
* Now that we have the current command, we can check if this connection
|
|
* can be a candidate for the connection pool.
|
|
*/
|
|
check_pool_candidate(dcb);
|
|
}
|
|
|
|
/**
|
|
* Perform re-authentication of the client
|
|
*
|
|
* @param session Client session
|
|
* @param packetbuf Client's response to the AuthSwitchRequest
|
|
*
|
|
* @return True if the user is allowed access
|
|
*/
|
|
static bool reauthenticate_client(MXS_SESSION* session, GWBUF* packetbuf)
|
|
{
|
|
bool rval = false;
|
|
|
|
if (session->client_dcb->authfunc.reauthenticate)
|
|
{
|
|
uint64_t payloadlen = gwbuf_length(packetbuf) - MYSQL_HEADER_LEN;
|
|
MySQLProtocol* proto = (MySQLProtocol*)session->client_dcb->protocol;
|
|
std::vector<uint8_t> payload;
|
|
payload.resize(payloadlen);
|
|
gwbuf_copy_data(packetbuf, MYSQL_HEADER_LEN, payloadlen, &payload[0]);
|
|
|
|
// Will contains extra data but the username is null-terminated
|
|
char user[MYSQL_USER_MAXLEN + 1];
|
|
gwbuf_copy_data(proto->stored_query, MYSQL_HEADER_LEN + 1, sizeof(user), (uint8_t*)user);
|
|
|
|
char* end = user + sizeof(user);
|
|
|
|
if (std::find(user, end, '\0') == end)
|
|
{
|
|
mysql_send_auth_error(session->client_dcb, 3, 0, "Malformed AuthSwitchRequest packet");
|
|
return false;
|
|
}
|
|
|
|
// Copy the new username to the session data
|
|
MYSQL_session* data = (MYSQL_session*)session->client_dcb->data;
|
|
strcpy(data->user, user);
|
|
|
|
int rc = session->client_dcb->authfunc.reauthenticate(session->client_dcb,
|
|
data->user,
|
|
&payload[0],
|
|
payload.size(),
|
|
proto->scramble,
|
|
sizeof(proto->scramble),
|
|
data->client_sha1,
|
|
sizeof(data->client_sha1));
|
|
|
|
if (rc == MXS_AUTH_SUCCEEDED)
|
|
{
|
|
// Re-authentication successful, route the original COM_CHANGE_USER
|
|
rval = true;
|
|
}
|
|
else
|
|
{
|
|
/**
|
|
* Authentication failed. To prevent the COM_CHANGE_USER from reaching
|
|
* the backend servers (and possibly causing problems) the client
|
|
* connection will be closed.
|
|
*
|
|
* First packet is COM_CHANGE_USER, the second is AuthSwitchRequest,
|
|
* third is the response and the fourth is the following error.
|
|
*/
|
|
mysql_client_auth_error_handling(session->client_dcb, rc, 3);
|
|
}
|
|
}
|
|
|
|
return rval;
|
|
}
|
|
|
|
/**
|
|
* Detect if buffer includes partial mysql packet or multiple packets.
|
|
* Store partial packet to dcb_readqueue. Send complete packets one by one
|
|
* to router.
|
|
*
|
|
* It is assumed readbuf includes at least one complete packet.
|
|
* Return 1 in success. If the last packet is incomplete return success but
|
|
* leave incomplete packet to readbuf.
|
|
*
|
|
* @param session Session pointer
|
|
* @param capabilities The capabilities of the service.
|
|
* @param p_readbuf Pointer to the address of GWBUF including the query
|
|
*
|
|
* @return 1 if succeed,
|
|
*/
|
|
static int route_by_statement(MXS_SESSION* session, uint64_t capabilities, GWBUF** p_readbuf)
|
|
{
|
|
int rc = 1;
|
|
GWBUF* packetbuf;
|
|
do
|
|
{
|
|
// Process client request one packet at a time
|
|
packetbuf = modutil_get_next_MySQL_packet(p_readbuf);
|
|
|
|
if (packetbuf != NULL)
|
|
{
|
|
// TODO: Do this only when RCAP_TYPE_CONTIGUOUS_INPUT is requested
|
|
packetbuf = gwbuf_make_contiguous(packetbuf);
|
|
session_retain_statement(session, packetbuf);
|
|
|
|
MySQLProtocol* proto = (MySQLProtocol*)session->client_dcb->protocol;
|
|
|
|
/**
|
|
* Update the currently command being executed.
|
|
*/
|
|
if (!proto->changing_user && !session_is_load_active(session))
|
|
{
|
|
update_current_command(session->client_dcb, packetbuf);
|
|
}
|
|
|
|
if (rcap_type_required(capabilities, RCAP_TYPE_CONTIGUOUS_INPUT))
|
|
{
|
|
mxb_assert(GWBUF_IS_CONTIGUOUS(packetbuf));
|
|
SERVICE* service = session->client_dcb->service;
|
|
|
|
if (rcap_type_required(capabilities, RCAP_TYPE_TRANSACTION_TRACKING)
|
|
&& !service->session_track_trx_state
|
|
&& !session_is_load_active(session))
|
|
{
|
|
if (session_trx_is_ending(session))
|
|
{
|
|
session_set_trx_state(session, SESSION_TRX_INACTIVE);
|
|
}
|
|
|
|
if (mxs_mysql_get_command(packetbuf) == MXS_COM_QUERY)
|
|
{
|
|
uint32_t type = qc_get_trx_type_mask(packetbuf);
|
|
|
|
if (type & QUERY_TYPE_BEGIN_TRX)
|
|
{
|
|
if (type & QUERY_TYPE_DISABLE_AUTOCOMMIT)
|
|
{
|
|
session_set_autocommit(session, false);
|
|
session_set_trx_state(session, SESSION_TRX_INACTIVE);
|
|
}
|
|
else
|
|
{
|
|
mxs_session_trx_state_t trx_state;
|
|
if (type & QUERY_TYPE_WRITE)
|
|
{
|
|
trx_state = SESSION_TRX_READ_WRITE;
|
|
}
|
|
else if (type & QUERY_TYPE_READ)
|
|
{
|
|
trx_state = SESSION_TRX_READ_ONLY;
|
|
}
|
|
else
|
|
{
|
|
trx_state = SESSION_TRX_ACTIVE;
|
|
}
|
|
|
|
session_set_trx_state(session, trx_state);
|
|
}
|
|
}
|
|
else if ((type & QUERY_TYPE_COMMIT) || (type & QUERY_TYPE_ROLLBACK))
|
|
{
|
|
uint32_t trx_state = session_get_trx_state(session);
|
|
trx_state |= SESSION_TRX_ENDING_BIT;
|
|
session_set_trx_state(session, (mxs_session_trx_state_t)trx_state);
|
|
|
|
if (type & QUERY_TYPE_ENABLE_AUTOCOMMIT)
|
|
{
|
|
session_set_autocommit(session, true);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
bool changed_user = false;
|
|
|
|
if (!proto->changing_user && proto->current_command == MXS_COM_CHANGE_USER)
|
|
{
|
|
changed_user = true;
|
|
send_auth_switch_request_packet(session->client_dcb);
|
|
|
|
// Store the original COM_CHANGE_USER for later
|
|
proto->stored_query = packetbuf;
|
|
packetbuf = NULL;
|
|
rc = 1;
|
|
}
|
|
else if (proto->changing_user)
|
|
{
|
|
mxb_assert(proto->current_command == MXS_COM_CHANGE_USER);
|
|
proto->changing_user = false;
|
|
bool ok = reauthenticate_client(session, packetbuf);
|
|
gwbuf_free(packetbuf);
|
|
packetbuf = proto->stored_query;
|
|
proto->stored_query = NULL;
|
|
|
|
if (ok)
|
|
{
|
|
// Authentication was successful, route the original COM_CHANGE_USER
|
|
rc = 1;
|
|
}
|
|
else
|
|
{
|
|
// Authentication failed, close the connection
|
|
rc = 0;
|
|
gwbuf_free(packetbuf);
|
|
packetbuf = NULL;
|
|
MXS_ERROR("User reauthentication failed for '%s'@'%s'",
|
|
session->client_dcb->user,
|
|
session->client_dcb->remote);
|
|
}
|
|
}
|
|
|
|
if (packetbuf)
|
|
{
|
|
/** Route query */
|
|
rc = MXS_SESSION_ROUTE_QUERY(session, packetbuf);
|
|
}
|
|
|
|
proto->changing_user = changed_user;
|
|
}
|
|
else
|
|
{
|
|
rc = 1;
|
|
goto return_rc;
|
|
}
|
|
}
|
|
while (rc == 1 && *p_readbuf != NULL);
|
|
|
|
return_rc:
|
|
return rc;
|
|
}
|
|
|
|
/**
|
|
* Some SQL commands/queries need to be detected and handled by the protocol
|
|
* and MaxScale instead of being routed forward as is.
|
|
*
|
|
* @param dcb Client dcb
|
|
* @param read_buffer the current read buffer
|
|
* @param nbytes_read How many bytes were read
|
|
* @return see @c spec_com_res_t
|
|
*/
|
|
static spec_com_res_t process_special_commands(DCB* dcb, GWBUF* read_buffer, int nbytes_read)
|
|
{
|
|
spec_com_res_t rval = RES_CONTINUE;
|
|
bool is_complete = false;
|
|
unsigned int packet_len =
|
|
MYSQL_GET_PAYLOAD_LEN((uint8_t*)GWBUF_DATA(read_buffer)) + MYSQL_HEADER_LEN;
|
|
if (gwbuf_length(read_buffer) == packet_len)
|
|
{
|
|
is_complete = true;
|
|
}
|
|
|
|
/**
|
|
* Handle COM_SET_OPTION. This seems to be only used by some versions of PHP.
|
|
*
|
|
* The option is stored as a two byte integer with the values 0 for enabling
|
|
* multi-statements and 1 for disabling it.
|
|
*/
|
|
MySQLProtocol* proto = (MySQLProtocol*)dcb->protocol;
|
|
uint8_t opt;
|
|
|
|
if (proto->current_command == MXS_COM_SET_OPTION
|
|
&& gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 2, 1, &opt))
|
|
{
|
|
if (opt)
|
|
{
|
|
proto->client_capabilities &= ~GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS;
|
|
}
|
|
else
|
|
{
|
|
proto->client_capabilities |= GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS;
|
|
}
|
|
}
|
|
/**
|
|
* Handle COM_PROCESS_KILL
|
|
*/
|
|
else if (proto->current_command == MXS_COM_PROCESS_KILL)
|
|
{
|
|
/* Make sure we have a complete SQL packet before trying to read the
|
|
* process id. If not, try again next time. */
|
|
if (!is_complete)
|
|
{
|
|
rval = RES_MORE_DATA;
|
|
}
|
|
else
|
|
{
|
|
uint8_t bytes[4];
|
|
if (gwbuf_copy_data(read_buffer, MYSQL_HEADER_LEN + 1, sizeof(bytes), bytes)
|
|
== sizeof(bytes))
|
|
{
|
|
uint64_t process_id = gw_mysql_get_byte4(bytes);
|
|
mxs_mysql_execute_kill(dcb->session, process_id, KT_CONNECTION);
|
|
rval = RES_END;
|
|
}
|
|
}
|
|
}
|
|
else if (proto->current_command == MXS_COM_QUERY)
|
|
{
|
|
/* Limits on the length of the queries in which "KILL" is searched for. Reducing
|
|
* LONGEST_KILL will reduce overhead but also limit the range of accepted queries. */
|
|
const int SHORTEST_KILL = sizeof("KILL 1") - 1;
|
|
const int LONGEST_KILL = sizeof("KILL CONNECTION 12345678901234567890 ;");
|
|
/* Is length within limits for a kill-type query? */
|
|
if (packet_len >= (MYSQL_HEADER_LEN + 1 + SHORTEST_KILL)
|
|
&& packet_len <= (MYSQL_HEADER_LEN + 1 + LONGEST_KILL))
|
|
{
|
|
rval = handle_query_kill(dcb, read_buffer, rval, is_complete, packet_len);
|
|
}
|
|
}
|
|
return rval;
|
|
}
|
|
|
|
/**
|
|
* Handle text version of KILL [CONNECTION | QUERY] <process_id>. Only detects
|
|
* commands in the beginning of the packet and with no comments.
|
|
* Increased parsing would slow down the handling of every single query.
|
|
*
|
|
* @param dcb Client dcb
|
|
* @param read_buffer Input buffer
|
|
* @param current Latest value of rval in calling function
|
|
* @param is_complete Is read_buffer a complete sql packet
|
|
* @param packet_len Read from sql header
|
|
* @return Updated (or old) value of rval
|
|
*/
|
|
spec_com_res_t handle_query_kill(DCB* dcb,
|
|
GWBUF* read_buffer,
|
|
spec_com_res_t current,
|
|
bool is_complete,
|
|
unsigned int packet_len)
|
|
{
|
|
spec_com_res_t rval = current;
|
|
/* First, we need to detect the text "KILL" (ignorecase) in the start
|
|
* of the packet. Copy just enough characters. */
|
|
const size_t KILL_BEGIN_LEN = sizeof(WORD_KILL) - 1;
|
|
char startbuf[KILL_BEGIN_LEN]; // Not 0-terminated, careful...
|
|
size_t copied_len = gwbuf_copy_data(read_buffer,
|
|
MYSQL_HEADER_LEN + 1,
|
|
KILL_BEGIN_LEN,
|
|
(uint8_t*)startbuf);
|
|
if (is_complete)
|
|
{
|
|
if (strncasecmp(WORD_KILL, startbuf, KILL_BEGIN_LEN) == 0)
|
|
{
|
|
/* Good chance that the query is a KILL-query. Copy the entire
|
|
* buffer and process. */
|
|
size_t buffer_len = packet_len - (MYSQL_HEADER_LEN + 1);
|
|
char querybuf[buffer_len + 1]; // 0-terminated
|
|
copied_len = gwbuf_copy_data(read_buffer,
|
|
MYSQL_HEADER_LEN + 1,
|
|
buffer_len,
|
|
(uint8_t*)querybuf);
|
|
querybuf[copied_len] = '\0';
|
|
kill_type_t kt = KT_CONNECTION;
|
|
uint64_t thread_id = 0;
|
|
rval = RES_END;
|
|
std::string user;
|
|
|
|
if (parse_kill_query(querybuf, &thread_id, &kt, &user))
|
|
{
|
|
if (thread_id > 0)
|
|
{
|
|
mxs_mysql_execute_kill(dcb->session, thread_id, kt);
|
|
}
|
|
else if (!user.empty())
|
|
{
|
|
mxs_mysql_execute_kill_user(dcb->session, user.c_str(), kt);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
else
|
|
{
|
|
/* Look at the start of the query and see if it might contain "KILL" */
|
|
if (strncasecmp(WORD_KILL, startbuf, copied_len) == 0)
|
|
{
|
|
rval = RES_MORE_DATA;
|
|
}
|
|
}
|
|
return rval;
|
|
}
|
|
|
|
static void extract_user(char* token, std::string* user)
|
|
{
|
|
char* end = strchr(token, ';');
|
|
|
|
if (end)
|
|
{
|
|
user->assign(token, end - token);
|
|
}
|
|
else
|
|
{
|
|
user->assign(token);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Parse a "KILL [CONNECTION | QUERY] [ <process_id> | USER <username> ]" query.
|
|
* Will modify the argument string even if unsuccessful.
|
|
*
|
|
* @param query Query string to parse
|
|
* @paran thread_id_out Thread id output
|
|
* @param kt_out Kill command type output
|
|
* @return true on success, false on error
|
|
*/
|
|
static bool parse_kill_query(char* query, uint64_t* thread_id_out, kill_type_t* kt_out, std::string* user)
|
|
{
|
|
const char WORD_CONNECTION[] = "CONNECTION";
|
|
const char WORD_QUERY[] = "QUERY";
|
|
const char WORD_HARD[] = "HARD";
|
|
const char WORD_SOFT[] = "SOFT";
|
|
const char WORD_USER[] = "USER";
|
|
const char DELIM[] = " \n\t";
|
|
|
|
int kill_type = KT_CONNECTION;
|
|
unsigned long long int thread_id = 0;
|
|
std::string tmpuser;
|
|
|
|
enum kill_parse_state_t
|
|
{
|
|
KILL,
|
|
CONN_QUERY,
|
|
ID,
|
|
USER,
|
|
SEMICOLON,
|
|
DONE
|
|
} state = KILL;
|
|
char* saveptr = NULL;
|
|
bool error = false;
|
|
|
|
char* token = strtok_r(query, DELIM, &saveptr);
|
|
|
|
while (token && !error)
|
|
{
|
|
bool get_next = false;
|
|
switch (state)
|
|
{
|
|
case KILL:
|
|
if (strncasecmp(token, WORD_KILL, sizeof(WORD_KILL) - 1) == 0)
|
|
{
|
|
state = CONN_QUERY;
|
|
get_next = true;
|
|
}
|
|
else
|
|
{
|
|
error = true;
|
|
}
|
|
break;
|
|
|
|
case CONN_QUERY:
|
|
if (strncasecmp(token, WORD_QUERY, sizeof(WORD_QUERY) - 1) == 0)
|
|
{
|
|
kill_type &= ~KT_CONNECTION;
|
|
kill_type |= KT_QUERY;
|
|
get_next = true;
|
|
}
|
|
else if (strncasecmp(token, WORD_CONNECTION, sizeof(WORD_CONNECTION) - 1) == 0)
|
|
{
|
|
get_next = true;
|
|
}
|
|
|
|
if (strncasecmp(token, WORD_HARD, sizeof(WORD_HARD) - 1) == 0)
|
|
{
|
|
kill_type |= KT_HARD;
|
|
get_next = true;
|
|
}
|
|
else if (strncasecmp(token, WORD_SOFT, sizeof(WORD_SOFT) - 1) == 0)
|
|
{
|
|
kill_type |= KT_SOFT;
|
|
get_next = true;
|
|
}
|
|
else
|
|
{
|
|
/* Move to next state regardless of comparison result. The current
|
|
* part is optional and the process id may already be in the token. */
|
|
state = ID;
|
|
}
|
|
break;
|
|
|
|
case ID:
|
|
if (strncasecmp(token, WORD_USER, sizeof(WORD_USER) - 1) == 0)
|
|
{
|
|
state = USER;
|
|
get_next = true;
|
|
break;
|
|
}
|
|
else
|
|
{
|
|
char* endptr_id = NULL;
|
|
|
|
long long int l = strtoll(token, &endptr_id, 0);
|
|
|
|
if ((l == LLONG_MAX && errno == ERANGE)
|
|
|| (*endptr_id != '\0' && *endptr_id != ';')
|
|
|| l <= 0 || endptr_id == token)
|
|
{
|
|
// Not a positive 32-bit integer
|
|
error = true;
|
|
}
|
|
else
|
|
{
|
|
mxb_assert(*endptr_id == '\0' || *endptr_id == ';');
|
|
state = SEMICOLON; // In case we have space before ;
|
|
get_next = true;
|
|
thread_id = l;
|
|
}
|
|
}
|
|
break;
|
|
|
|
case USER:
|
|
extract_user(token, &tmpuser);
|
|
state = SEMICOLON;
|
|
get_next = true;
|
|
break;
|
|
|
|
case SEMICOLON:
|
|
if (strncmp(token, ";", 1) == 0)
|
|
{
|
|
state = DONE;
|
|
get_next = true;
|
|
}
|
|
else
|
|
{
|
|
error = true;
|
|
}
|
|
break;
|
|
|
|
default:
|
|
error = true;
|
|
break;
|
|
}
|
|
|
|
if (get_next)
|
|
{
|
|
token = strtok_r(NULL, DELIM, &saveptr);
|
|
}
|
|
}
|
|
|
|
if (error || (state != DONE && state != SEMICOLON))
|
|
{
|
|
return false;
|
|
}
|
|
else
|
|
{
|
|
*thread_id_out = thread_id;
|
|
*kt_out = (kill_type_t)kill_type;
|
|
*user = tmpuser;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Mapping three session tracker's info to mxs_session_trx_state_t
|
|
* SESSION_TRACK_STATE_CHANGE:
|
|
* Get lasted autocommit value;
|
|
* https://dev.mysql.com/worklog/task/?id=6885
|
|
* SESSION_TRACK_TRANSACTION_TYPE:
|
|
* Get transaction boundaries
|
|
* TX_EMPTY => SESSION_TRX_INACTIVE
|
|
* TX_EXPLICIT | TX_IMPLICIT => SESSION_TRX_ACTIVE
|
|
* https://dev.mysql.com/worklog/task/?id=6885
|
|
* SESSION_TRACK_TRANSACTION_CHARACTERISTICS
|
|
* Get trx characteristics such as read only, read write, snapshot ...
|
|
*
|
|
*/
|
|
static void parse_and_set_trx_state(MXS_SESSION* ses, GWBUF* data)
|
|
{
|
|
char* autocommit = gwbuf_get_property(data, (char*)"autocommit");
|
|
|
|
if (autocommit)
|
|
{
|
|
MXS_DEBUG("autocommit:%s", autocommit);
|
|
if (strncasecmp(autocommit, "ON", 2) == 0)
|
|
{
|
|
session_set_autocommit(ses, true);
|
|
}
|
|
if (strncasecmp(autocommit, "OFF", 3) == 0)
|
|
{
|
|
session_set_autocommit(ses, false);
|
|
}
|
|
}
|
|
char* trx_state = gwbuf_get_property(data, (char*)"trx_state");
|
|
if (trx_state)
|
|
{
|
|
mysql_tx_state_t s = parse_trx_state(trx_state);
|
|
|
|
if (s == TX_EMPTY)
|
|
{
|
|
session_set_trx_state(ses, SESSION_TRX_INACTIVE);
|
|
}
|
|
else if ((s & TX_EXPLICIT) || (s & TX_IMPLICIT))
|
|
{
|
|
session_set_trx_state(ses, SESSION_TRX_ACTIVE);
|
|
}
|
|
}
|
|
char* trx_characteristics = gwbuf_get_property(data, (char*)"trx_characteristics");
|
|
if (trx_characteristics)
|
|
{
|
|
if (strncmp(trx_characteristics, "START TRANSACTION READ ONLY;", 28) == 0)
|
|
{
|
|
session_set_trx_state(ses, SESSION_TRX_READ_ONLY);
|
|
}
|
|
|
|
if (strncmp(trx_characteristics, "START TRANSACTION READ WRITE;", 29) == 0)
|
|
{
|
|
session_set_trx_state(ses, SESSION_TRX_READ_WRITE);
|
|
}
|
|
}
|
|
MXS_DEBUG("trx state:%s", session_trx_state_to_string(ses->trx_state));
|
|
MXS_DEBUG("autcommit:%s", session_is_autocommit(ses) ? "ON" : "OFF");
|
|
}
|