Introduce internal protocol command enum

The enums exposed by the connector are not intended to be used by the
users of the library. The fact that the protocol, and other, modules used
it was in violation of how the library is intended to be used.

Adding an internal mapping into MaxScale also removes some of the
dependencies that the core has on the connector.
This commit is contained in:
Markus Mäkelä
2017-09-12 03:40:42 +03:00
parent 914ebb046a
commit 45e0e8bb59
29 changed files with 272 additions and 256 deletions

View File

@ -684,7 +684,7 @@ blr_make_query(DCB *dcb, char *query)
// This is hack to get the result set processing in order for binlogrouter
MySQLProtocol *proto = (MySQLProtocol*)dcb->protocol;
proto->current_command = MYSQL_COM_QUERY;
proto->current_command = MXS_COM_QUERY;
return buf;
}
@ -753,7 +753,7 @@ blr_make_registration(ROUTER_INSTANCE *router)
// This is hack to get the result set processing in order for binlogrouter
MySQLProtocol *proto = (MySQLProtocol*)router->master->protocol;
proto->current_command = MYSQL_COM_REGISTER_SLAVE;
proto->current_command = MXS_COM_REGISTER_SLAVE;
return buf;
}
@ -805,7 +805,7 @@ blr_make_binlog_dump(ROUTER_INSTANCE *router)
// This is hack to get the result set processing in order for binlogrouter
MySQLProtocol *proto = (MySQLProtocol*)router->master->protocol;
proto->current_command = MYSQL_COM_BINLOG_DUMP;
proto->current_command = MXS_COM_BINLOG_DUMP;
return buf;
}

View File

@ -32,6 +32,7 @@
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <maxscale/alloc.h>
#include <maxscale/service.h>
#include <maxscale/server.h>
@ -49,6 +50,7 @@
#include <maxscale/resultset.h>
#include <maxscale/secrets.h>
#include <maxscale/users.h>
#include <maxscale/protocol/mysql.h>
#include "../../../core/maxscale/modules.h"
#include "../../../core/maxscale/monitor.h"
@ -345,13 +347,13 @@ execute(MXS_ROUTER *rinstance, MXS_ROUTER_SESSION *router_session, GWBUF *queue)
{
switch (MYSQL_COMMAND(queue))
{
case COM_PING:
case MXS_COM_PING:
rc = maxinfo_ping(instance, session, queue);
break;
case COM_STATISTICS:
case MXS_COM_STATISTICS:
rc = maxinfo_statistics(instance, session, queue);
break;
case COM_QUIT:
case MXS_COM_QUIT:
break;
default:
MXS_ERROR("Unexpected MySQL command 0x%x",

View File

@ -100,13 +100,6 @@ typedef struct maxinfo_tree
#define MYSQL_COMMAND(buf) (*((uint8_t *)GWBUF_DATA(buf) + 4))
/**
* MySQL protocol OpCodes needed for replication
*/
#define COM_QUIT 0x01
#define COM_QUERY 0x03
#define COM_STATISTICS 0x09
#define COM_PING 0x0e
/**
* Token values for the tokeniser used by the parser for maxinfo

View File

@ -506,7 +506,7 @@ closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session)
}
/** Log routing failure due to closed session */
static void log_closed_session(mysql_server_cmd_t mysql_command, bool is_closed,
static void log_closed_session(mxs_mysql_cmd_t mysql_command, bool is_closed,
SERVER_REF *ref)
{
char msg[MAX_SERVER_ADDRESS_LEN + 200] = ""; // Extra space for message
@ -550,7 +550,7 @@ routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queu
int rc = 0;
DCB* backend_dcb;
MySQLProtocol *proto = (MySQLProtocol*)router_cli_ses->client_dcb->protocol;
mysql_server_cmd_t mysql_command = proto->current_command;
mxs_mysql_cmd_t mysql_command = proto->current_command;
bool rses_is_closed;
inst->stats.n_queries++;
@ -589,11 +589,11 @@ routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queu
switch (mysql_command)
{
case MYSQL_COM_CHANGE_USER:
case MXS_COM_CHANGE_USER:
rc = backend_dcb->func.auth(backend_dcb, NULL, backend_dcb->session,
queue);
break;
case MYSQL_COM_QUERY:
case MXS_COM_QUERY:
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
{
trc = modutil_get_SQL(queue);

View File

@ -509,11 +509,11 @@ static bool route_stored_query(RWSplitSession *rses)
*/
bool reply_is_complete(SRWBackend backend, GWBUF *buffer)
{
mysql_server_cmd_t cmd = mxs_mysql_current_command(backend->dcb()->session);
mxs_mysql_cmd_t cmd = mxs_mysql_current_command(backend->dcb()->session);
if (backend->get_reply_state() == REPLY_STATE_START && !mxs_mysql_is_result_set(buffer))
{
if (cmd == MYSQL_COM_STMT_PREPARE || !mxs_mysql_more_results_after_ok(buffer))
if (cmd == MXS_COM_STMT_PREPARE || !mxs_mysql_more_results_after_ok(buffer))
{
/** Not a result set, we have the complete response */
LOG_RS(backend, REPLY_STATE_DONE);
@ -534,7 +534,7 @@ bool reply_is_complete(SRWBackend backend, GWBUF *buffer)
LOG_RS(backend, REPLY_STATE_RSET_COLDEF);
backend->set_reply_state(REPLY_STATE_RSET_COLDEF);
}
else if (n_eof == 1 && cmd != MYSQL_COM_FIELD_LIST)
else if (n_eof == 1 && cmd != MXS_COM_FIELD_LIST)
{
/** Waiting for the EOF packet after the rows */
LOG_RS(backend, REPLY_STATE_RSET_ROWS);
@ -544,7 +544,7 @@ bool reply_is_complete(SRWBackend backend, GWBUF *buffer)
{
/** We either have a complete result set or a response to
* a COM_FIELD_LIST command */
ss_dassert(n_eof == 2 || (n_eof == 1 && cmd == MYSQL_COM_FIELD_LIST));
ss_dassert(n_eof == 2 || (n_eof == 1 && cmd == MXS_COM_FIELD_LIST));
LOG_RS(backend, REPLY_STATE_DONE);
backend->set_reply_state(REPLY_STATE_DONE);
@ -894,7 +894,7 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
if (rses->query_queue == NULL &&
(rses->expected_responses == 0 ||
info.command == MYSQL_COM_STMT_FETCH ||
info.command == MXS_COM_STMT_FETCH ||
rses->load_data_state == LOAD_DATA_ACTIVE ||
rses->large_query))
{

View File

@ -33,11 +33,11 @@ do{ \
static inline bool is_ps_command(uint8_t cmd)
{
return cmd == MYSQL_COM_STMT_EXECUTE ||
cmd == MYSQL_COM_STMT_SEND_LONG_DATA ||
cmd == MYSQL_COM_STMT_CLOSE ||
cmd == MYSQL_COM_STMT_FETCH ||
cmd == MYSQL_COM_STMT_RESET;
return cmd == MXS_COM_STMT_EXECUTE ||
cmd == MXS_COM_STMT_SEND_LONG_DATA ||
cmd == MXS_COM_STMT_CLOSE ||
cmd == MXS_COM_STMT_FETCH ||
cmd == MXS_COM_STMT_RESET;
}
/*

View File

@ -65,46 +65,46 @@ uint32_t determine_query_type(GWBUF *querybuf, int command)
switch (command)
{
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
case MYSQL_COM_PING: /*< 0e all servers are pinged */
case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
case MYSQL_COM_SET_OPTION: /*< 1b send options to all servers */
case MXS_COM_QUIT: /*< 1 QUIT will close all sessions */
case MXS_COM_INIT_DB: /*< 2 DDL must go to the master */
case MXS_COM_REFRESH: /*< 7 - I guess this is session but not sure */
case MXS_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
case MXS_COM_PING: /*< 0e all servers are pinged */
case MXS_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
case MXS_COM_SET_OPTION: /*< 1b send options to all servers */
type = QUERY_TYPE_SESSION_WRITE;
break;
case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */
case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */
case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */
case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */
case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */
case MXS_COM_CREATE_DB: /**< 5 DDL must go to the master */
case MXS_COM_DROP_DB: /**< 6 DDL must go to the master */
case MXS_COM_STMT_CLOSE: /*< free prepared statement */
case MXS_COM_STMT_SEND_LONG_DATA: /*< send data to column */
case MXS_COM_STMT_RESET: /*< resets the data of a prepared statement */
type = QUERY_TYPE_WRITE;
break;
case MYSQL_COM_QUERY:
case MXS_COM_QUERY:
type = qc_get_type_mask(querybuf);
break;
case MYSQL_COM_STMT_PREPARE:
case MXS_COM_STMT_PREPARE:
type = qc_get_type_mask(querybuf);
type |= QUERY_TYPE_PREPARE_STMT;
break;
case MYSQL_COM_STMT_EXECUTE:
case MXS_COM_STMT_EXECUTE:
/** Parsing is not needed for this type of packet */
type = QUERY_TYPE_EXEC_STMT;
break;
case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
case MYSQL_COM_STATISTICS: /**< 9 ? */
case MYSQL_COM_PROCESS_INFO: /**< 0a ? */
case MYSQL_COM_CONNECT: /**< 0b ? */
case MYSQL_COM_PROCESS_KILL: /**< 0c ? */
case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */
case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */
case MYSQL_COM_DAEMON: /**< 1d ? */
case MXS_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
case MXS_COM_STATISTICS: /**< 9 ? */
case MXS_COM_PROCESS_INFO: /**< 0a ? */
case MXS_COM_CONNECT: /**< 0b ? */
case MXS_COM_PROCESS_KILL: /**< 0c ? */
case MXS_COM_TIME: /**< 0f should this be run in gateway ? */
case MXS_COM_DELAYED_INSERT: /**< 10 ? */
case MXS_COM_DAEMON: /**< 1d ? */
default:
break;
}
@ -129,7 +129,7 @@ uint32_t determine_query_type(GWBUF *querybuf, int command)
bool
is_packet_a_query(int packet_type)
{
return (packet_type == MYSQL_COM_QUERY);
return (packet_type == MXS_COM_QUERY);
}
/*

View File

@ -23,7 +23,7 @@ uint32_t get_prepare_type(GWBUF* buffer)
{
uint32_t type;
if (mxs_mysql_get_command(buffer) == MYSQL_COM_STMT_PREPARE)
if (mxs_mysql_get_command(buffer) == MXS_COM_STMT_PREPARE)
{
// TODO: This could be done inside the query classifier
size_t packet_len = gwbuf_length(buffer);
@ -38,7 +38,7 @@ uint32_t get_prepare_type(GWBUF* buffer)
// Sequence id
*ptr++ = 0x00;
// Command
*ptr++ = MYSQL_COM_QUERY;
*ptr++ = MXS_COM_QUERY;
gwbuf_copy_data(buffer, MYSQL_HEADER_LEN + 1, payload_len - 1, ptr);
type = qc_get_type_mask(stmt);
@ -139,17 +139,17 @@ uint32_t PSManager::get_type(uint32_t id) const
void PSManager::store(GWBUF* buffer, uint32_t id)
{
ss_dassert(mxs_mysql_get_command(buffer) == MYSQL_COM_STMT_PREPARE ||
ss_dassert(mxs_mysql_get_command(buffer) == MXS_COM_STMT_PREPARE ||
qc_query_is_type(qc_get_type_mask(buffer),
QUERY_TYPE_PREPARE_NAMED_STMT));
switch (mxs_mysql_get_command(buffer))
{
case MYSQL_COM_QUERY:
case MXS_COM_QUERY:
m_text_ps[get_text_ps_id(buffer)] = get_prepare_type(buffer);
break;
case MYSQL_COM_STMT_PREPARE:
case MXS_COM_STMT_PREPARE:
m_binary_ps[id] = get_prepare_type(buffer);
break;

View File

@ -139,7 +139,7 @@ route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer,
}
else
{
if (*command == MYSQL_COM_QUERY &&
if (*command == MXS_COM_QUERY &&
qc_get_operation(buffer) == QUERY_OP_EXECUTE)
{
std::string id = get_text_ps_id(buffer);
@ -248,7 +248,7 @@ bool route_single_stmt(RWSplit *inst, RWSplitSession *rses, GWBUF *querybuf, con
ss_dassert(!store_stmt || TARGET_IS_SLAVE(route_target));
succp = handle_got_target(inst, rses, querybuf, target, store_stmt);
if (succp && command == MYSQL_COM_STMT_EXECUTE && not_locked_to_master)
if (succp && command == MXS_COM_STMT_EXECUTE && not_locked_to_master)
{
/** Track the targets of the COM_STMT_EXECUTE statements. This
* information is used to route all COM_STMT_FETCH commands
@ -584,8 +584,8 @@ route_target_t get_route_target(RWSplitSession *rses, uint8_t command,
*/
if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT) ||
qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT) ||
command == MYSQL_COM_STMT_CLOSE ||
command == MYSQL_COM_STMT_RESET)
command == MXS_COM_STMT_CLOSE ||
command == MXS_COM_STMT_RESET)
{
target = TARGET_ALL;
}
@ -944,7 +944,7 @@ SRWBackend handle_slave_is_target(RWSplit *inst, RWSplitSession *rses,
int rlag_max = rses_get_max_replication_lag(rses);
SRWBackend target;
if (cmd == MYSQL_COM_STMT_FETCH)
if (cmd == MXS_COM_STMT_FETCH)
{
/** The COM_STMT_FETCH must be executed on the same server as the
* COM_STMT_EXECUTE was executed on */
@ -1082,10 +1082,10 @@ bool handle_master_is_target(RWSplit *inst, RWSplitSession *rses,
static inline bool query_creates_reply(uint8_t cmd)
{
return cmd != MYSQL_COM_QUIT &&
cmd != MYSQL_COM_STMT_SEND_LONG_DATA &&
cmd != MYSQL_COM_STMT_CLOSE &&
cmd != MYSQL_COM_STMT_FETCH; // Fetch is done mid-result
return cmd != MXS_COM_QUIT &&
cmd != MXS_COM_STMT_SEND_LONG_DATA &&
cmd != MXS_COM_STMT_CLOSE &&
cmd != MXS_COM_STMT_FETCH; // Fetch is done mid-result
}
static inline bool is_large_query(GWBUF* buf)

View File

@ -40,7 +40,7 @@ void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend,
uint64_t id = backend->complete_session_command();
MXS_PS_RESPONSE resp = {};
if (command == MYSQL_COM_STMT_PREPARE)
if (command == MXS_COM_STMT_PREPARE)
{
// This should never fail or the backend protocol is broken
ss_debug(bool b = )mxs_mysql_extract_ps_response(*ppPacket, &resp);
@ -60,7 +60,7 @@ void process_sescmd_response(RWSplitSession* rses, SRWBackend& backend,
* be compared to it */
rses->sescmd_responses[id] = cmd;
if (command == MYSQL_COM_STMT_PREPARE)
if (command == MXS_COM_STMT_PREPARE)
{
/** Map the returned response to the internal ID */
rses->ps_handles[resp.id] = id;

View File

@ -197,7 +197,7 @@ bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type)
bool rval = false;
if (proto->client_capabilities & GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS &&
packet_type == MYSQL_COM_QUERY)
packet_type == MXS_COM_QUERY)
{
char *ptr, *data = (char*)GWBUF_DATA(buf) + 5;
/** Payload size without command byte */

View File

@ -137,46 +137,46 @@ static void inspect_query(GWBUF* pPacket, uint32_t* type, qc_query_op_t* op, uin
switch (*command)
{
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
case MYSQL_COM_PING: /*< 0e all servers are pinged */
case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */
case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */
case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */
case MXS_COM_QUIT: /*< 1 QUIT will close all sessions */
case MXS_COM_INIT_DB: /*< 2 DDL must go to the master */
case MXS_COM_REFRESH: /*< 7 - I guess this is session but not sure */
case MXS_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
case MXS_COM_PING: /*< 0e all servers are pinged */
case MXS_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
case MXS_COM_STMT_CLOSE: /*< free prepared statement */
case MXS_COM_STMT_SEND_LONG_DATA: /*< send data to column */
case MXS_COM_STMT_RESET: /*< resets the data of a prepared statement */
*type = QUERY_TYPE_SESSION_WRITE;
break;
case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */
case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */
case MXS_COM_CREATE_DB: /**< 5 DDL must go to the master */
case MXS_COM_DROP_DB: /**< 6 DDL must go to the master */
*type = QUERY_TYPE_WRITE;
break;
case MYSQL_COM_QUERY:
case MXS_COM_QUERY:
*type = qc_get_type_mask(pPacket);
*op = qc_get_operation(pPacket);
break;
case MYSQL_COM_STMT_PREPARE:
case MXS_COM_STMT_PREPARE:
*type = qc_get_type_mask(pPacket);
*type |= QUERY_TYPE_PREPARE_STMT;
break;
case MYSQL_COM_STMT_EXECUTE:
case MXS_COM_STMT_EXECUTE:
/** Parsing is not needed for this type of packet */
*type = QUERY_TYPE_EXEC_STMT;
break;
case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
case MYSQL_COM_STATISTICS: /**< 9 ? */
case MYSQL_COM_PROCESS_INFO: /**< 0a ? */
case MYSQL_COM_CONNECT: /**< 0b ? */
case MYSQL_COM_PROCESS_KILL: /**< 0c ? */
case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */
case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */
case MYSQL_COM_DAEMON: /**< 1d ? */
case MXS_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
case MXS_COM_STATISTICS: /**< 9 ? */
case MXS_COM_PROCESS_INFO: /**< 0a ? */
case MXS_COM_CONNECT: /**< 0b ? */
case MXS_COM_PROCESS_KILL: /**< 0c ? */
case MXS_COM_TIME: /**< 0f should this be run in gateway ? */
case MXS_COM_DELAYED_INSERT: /**< 10 ? */
case MXS_COM_DAEMON: /**< 1d ? */
default:
break;
}
@ -221,8 +221,8 @@ SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket,
* the current default database or to the first available server. */
target = get_shard_target(pPacket, type);
if ((target == NULL && command != MYSQL_COM_INIT_DB && m_current_db.length() == 0) ||
command == MYSQL_COM_FIELD_LIST ||
if ((target == NULL && command != MXS_COM_INIT_DB && m_current_db.length() == 0) ||
command == MXS_COM_FIELD_LIST ||
m_current_db.length() == 0)
{
/** No current database and no databases in query or the database is
@ -352,7 +352,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
}
/** The default database changes must be routed to a specific server */
if (command == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB)
if (command == MXS_COM_INIT_DB || op == QUERY_OP_CHANGE_DB)
{
if (!change_current_db(m_current_db, m_shard, pPacket))
{
@ -664,7 +664,7 @@ bool extract_database(GWBUF* buf, char* str)
plen = gw_mysql_get_byte3(packet) - 1;
/** Copy database name from MySQL packet to session */
if (mxs_mysql_get_command(buf) == MYSQL_COM_QUERY &&
if (mxs_mysql_get_command(buf) == MXS_COM_QUERY &&
qc_get_operation(buf) == QUERY_OP_CHANGE_DB)
{
const char *delim = "` \n\t;";
@ -1109,7 +1109,7 @@ void create_error_reply(char* fail_str, DCB* dcb)
}
/**
* Read new database name from MYSQL_COM_INIT_DB packet or a literal USE ... COM_QUERY
* Read new database name from COM_INIT_DB packet or a literal USE ... COM_QUERY
* packet, check that it exists in the hashtable and copy its name to MYSQL_session.
*
* @param dest Destination where the database name will be written
@ -1419,7 +1419,7 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
SERVER *rval = NULL;
bool has_dbs = false; /**If the query targets any database other than the current one*/
if (mxs_mysql_get_command(buffer) == MYSQL_COM_QUERY)
if (mxs_mysql_get_command(buffer) == MXS_COM_QUERY)
{
bool uses_current_database = false;
int n_tables = 0;