MXS-852: Refactor gathering of routing information
The routing information is now gathered into a struct before the routing process is started. This allows the requirements of the query to be gathered before the actual target is selected.
This commit is contained in:
@ -896,13 +896,16 @@ static int routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
|
||||
}
|
||||
else
|
||||
{
|
||||
/** Gather the information required to make routing decisions */
|
||||
RouteInfo info(rses, querybuf);
|
||||
|
||||
if (rses->query_queue == NULL &&
|
||||
(rses->expected_responses == 0 ||
|
||||
mxs_mysql_get_command(querybuf) == MYSQL_COM_STMT_FETCH ||
|
||||
info.command == MYSQL_COM_STMT_FETCH ||
|
||||
rses->load_data_state == LOAD_DATA_ACTIVE))
|
||||
{
|
||||
/** No active or pending queries */
|
||||
if (route_single_stmt(inst, rses, querybuf))
|
||||
if (route_single_stmt(inst, rses, querybuf, info))
|
||||
{
|
||||
rval = 1;
|
||||
}
|
||||
|
@ -52,7 +52,6 @@ bool execute_sescmd_in_backend(SRWBackend& backend_ref);
|
||||
bool handle_target_is_all(route_target_t route_target,
|
||||
RWSplit *inst, RWSplitSession *rses,
|
||||
GWBUF *querybuf, int packet_type, uint32_t qtype);
|
||||
uint8_t determine_packet_type(GWBUF *querybuf, bool *non_empty_packet);
|
||||
void log_transaction_status(RWSplitSession *rses, GWBUF *querybuf, uint32_t qtype);
|
||||
bool is_packet_a_query(int packet_type);
|
||||
bool send_readonly_error(DCB *dcb);
|
||||
@ -70,7 +69,7 @@ int rses_get_max_replication_lag(RWSplitSession *rses);
|
||||
*/
|
||||
|
||||
bool route_single_stmt(RWSplit *inst, RWSplitSession *rses,
|
||||
GWBUF *querybuf);
|
||||
GWBUF *querybuf, const RouteInfo& info);
|
||||
SRWBackend get_target_backend(RWSplitSession *rses, backend_type_t btype,
|
||||
char *name, int max_rlag);
|
||||
route_target_t get_route_target(RWSplitSession *rses, uint8_t command,
|
||||
@ -119,6 +118,21 @@ bool is_read_tmp_table(RWSplitSession *router_cli_ses,
|
||||
void check_create_tmp_table(RWSplitSession *router_cli_ses,
|
||||
GWBUF *querybuf, uint32_t type);
|
||||
bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type);
|
||||
uint32_t determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet);
|
||||
|
||||
void close_all_connections(RWSplitSession* rses);
|
||||
|
||||
uint32_t determine_query_type(GWBUF *querybuf, int command);
|
||||
|
||||
/**
|
||||
* @brief Get the routing requirements for a query
|
||||
*
|
||||
* @param rses Router client session
|
||||
* @param buffer Buffer containing the query
|
||||
* @param command Output parameter where the packet command is stored
|
||||
* @param type Output parameter where the query type is stored
|
||||
* @param stmt_id Output parameter where statement ID, if the query is a binary protocol command, is stored
|
||||
*
|
||||
* @return The target type where this query should be routed
|
||||
*/
|
||||
route_target_t get_target_type(RWSplitSession* rses, GWBUF* buffer, uint8_t* command,
|
||||
uint32_t* type, uint32_t* stmt_id);
|
||||
|
@ -51,37 +51,65 @@
|
||||
*/
|
||||
|
||||
/**
|
||||
* @brief Determine packet type
|
||||
* @brief Determine the type of a query
|
||||
*
|
||||
* Examine the packet in the buffer to extract the type, if possible. At the
|
||||
* same time set the second parameter to indicate whether the packet was
|
||||
* empty.
|
||||
* @param querybuf GWBUF containing the query
|
||||
* @param packet_type Integer denoting DB specific enum
|
||||
* @param non_empty_packet Boolean to be set by this function
|
||||
*
|
||||
* It is assumed that the packet length and type are contained within a single
|
||||
* buffer, the one indicated by the first parameter.
|
||||
*
|
||||
* @param querybuf Buffer containing the packet
|
||||
* @param non_empty_packet bool indicating whether the packet is non-empty
|
||||
* @return The packet type, or MYSQL_COM_UNDEFINED; also the second parameter is set
|
||||
* @return uint32_t the query type; also the non_empty_packet bool is set
|
||||
*/
|
||||
uint8_t
|
||||
determine_packet_type(GWBUF *querybuf, bool *non_empty_packet)
|
||||
uint32_t determine_query_type(GWBUF *querybuf, int command)
|
||||
{
|
||||
uint8_t packet_type;
|
||||
uint8_t *packet = GWBUF_DATA(querybuf);
|
||||
uint32_t type = QUERY_TYPE_UNKNOWN;
|
||||
|
||||
if (gw_mysql_get_byte3(packet) == 0)
|
||||
switch (command)
|
||||
{
|
||||
/** Empty packet signals end of LOAD DATA LOCAL INFILE, send it to master*/
|
||||
*non_empty_packet = false;
|
||||
packet_type = (uint8_t)MYSQL_COM_UNDEFINED;
|
||||
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
|
||||
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
|
||||
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
|
||||
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
|
||||
case MYSQL_COM_PING: /*< 0e all servers are pinged */
|
||||
case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
|
||||
case MYSQL_COM_SET_OPTION: /*< 1b send options to all servers */
|
||||
type = QUERY_TYPE_SESSION_WRITE;
|
||||
break;
|
||||
|
||||
case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */
|
||||
case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */
|
||||
case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */
|
||||
case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */
|
||||
case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */
|
||||
type = QUERY_TYPE_WRITE;
|
||||
break;
|
||||
|
||||
case MYSQL_COM_QUERY:
|
||||
type = qc_get_type_mask(querybuf);
|
||||
break;
|
||||
|
||||
case MYSQL_COM_STMT_PREPARE:
|
||||
type = qc_get_type_mask(querybuf);
|
||||
type |= QUERY_TYPE_PREPARE_STMT;
|
||||
break;
|
||||
|
||||
case MYSQL_COM_STMT_EXECUTE:
|
||||
/** Parsing is not needed for this type of packet */
|
||||
type = QUERY_TYPE_EXEC_STMT;
|
||||
break;
|
||||
|
||||
case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
|
||||
case MYSQL_COM_STATISTICS: /**< 9 ? */
|
||||
case MYSQL_COM_PROCESS_INFO: /**< 0a ? */
|
||||
case MYSQL_COM_CONNECT: /**< 0b ? */
|
||||
case MYSQL_COM_PROCESS_KILL: /**< 0c ? */
|
||||
case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */
|
||||
case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */
|
||||
case MYSQL_COM_DAEMON: /**< 1d ? */
|
||||
default:
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
*non_empty_packet = true;
|
||||
packet_type = packet[4];
|
||||
}
|
||||
return packet_type;
|
||||
|
||||
return type;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -91,37 +91,21 @@ void handle_connection_keepalive(RWSplit *inst, RWSplitSession *rses,
|
||||
ss_dassert(nserv < rses->rses_nbackends);
|
||||
}
|
||||
|
||||
/**
|
||||
* Routing function. Find out query type, backend type, and target DCB(s).
|
||||
* Then route query to found target(s).
|
||||
* @param inst router instance
|
||||
* @param rses router session
|
||||
* @param querybuf GWBUF including the query
|
||||
*
|
||||
* @return true if routing succeed or if it failed due to unsupported query.
|
||||
* false if backend failure was encountered.
|
||||
*/
|
||||
bool route_single_stmt(RWSplit *inst, RWSplitSession *rses,
|
||||
GWBUF *querybuf)
|
||||
route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer,
|
||||
uint8_t* command, uint32_t* type, uint32_t* stmt_id)
|
||||
{
|
||||
route_target_t route_target;
|
||||
bool succp = false;
|
||||
bool non_empty_packet;
|
||||
uint32_t stmt_id = 0;
|
||||
route_target_t route_target = TARGET_MASTER;
|
||||
|
||||
ss_dassert(querybuf->next == NULL); // The buffer must be contiguous.
|
||||
|
||||
/* packet_type is a problem as it is MySQL specific */
|
||||
uint8_t command = determine_packet_type(querybuf, &non_empty_packet);
|
||||
uint32_t qtype = determine_query_type(querybuf, command, non_empty_packet);
|
||||
|
||||
if (non_empty_packet)
|
||||
if (gwbuf_length(buffer) > MYSQL_HEADER_LEN)
|
||||
{
|
||||
handle_multi_temp_and_load(rses, querybuf, command, &qtype);
|
||||
*command = mxs_mysql_get_command(buffer);
|
||||
*type = determine_query_type(buffer, *command);
|
||||
|
||||
handle_multi_temp_and_load(rses, buffer, *command, type);
|
||||
|
||||
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
|
||||
{
|
||||
log_transaction_status(rses, querybuf, qtype);
|
||||
log_transaction_status(rses, buffer, *type);
|
||||
}
|
||||
/**
|
||||
* Find out where to route the query. Result may not be clear; it is
|
||||
@ -141,28 +125,53 @@ bool route_single_stmt(RWSplit *inst, RWSplitSession *rses,
|
||||
* eventually to master
|
||||
*/
|
||||
|
||||
if (command == MYSQL_COM_QUERY &&
|
||||
qc_get_operation(querybuf) == QUERY_OP_EXECUTE)
|
||||
if (*command == MYSQL_COM_QUERY &&
|
||||
qc_get_operation(buffer) == QUERY_OP_EXECUTE)
|
||||
{
|
||||
std::string id = get_text_ps_id(querybuf);
|
||||
qtype = rses->ps_manager.get_type(id);
|
||||
std::string id = get_text_ps_id(buffer);
|
||||
*type = rses->ps_manager.get_type(id);
|
||||
}
|
||||
else if (is_ps_command(command))
|
||||
else if (is_ps_command(*command))
|
||||
{
|
||||
stmt_id = get_internal_ps_id(rses, querybuf);
|
||||
qtype = rses->ps_manager.get_type(stmt_id);
|
||||
replace_binary_ps_id(querybuf, stmt_id);
|
||||
*stmt_id = get_internal_ps_id(rses, buffer);
|
||||
*type = rses->ps_manager.get_type(*stmt_id);
|
||||
}
|
||||
|
||||
route_target = get_route_target(rses, command, qtype, querybuf->hint);
|
||||
route_target = get_route_target(rses, *command, *type, buffer->hint);
|
||||
}
|
||||
else
|
||||
{
|
||||
/** Empty packet signals end of LOAD DATA LOCAL INFILE, send it to master*/
|
||||
route_target = TARGET_MASTER;
|
||||
rses->load_data_state = LOAD_DATA_END;
|
||||
MXS_INFO("> LOAD DATA LOCAL INFILE finished: %lu bytes sent.",
|
||||
rses->rses_load_data_sent + gwbuf_length(querybuf));
|
||||
rses->rses_load_data_sent + gwbuf_length(buffer));
|
||||
}
|
||||
|
||||
return route_target;
|
||||
}
|
||||
|
||||
/**
|
||||
* Routing function. Find out query type, backend type, and target DCB(s).
|
||||
* Then route query to found target(s).
|
||||
* @param inst router instance
|
||||
* @param rses router session
|
||||
* @param querybuf GWBUF including the query
|
||||
*
|
||||
* @return true if routing succeed or if it failed due to unsupported query.
|
||||
* false if backend failure was encountered.
|
||||
*/
|
||||
bool route_single_stmt(RWSplit *inst, RWSplitSession *rses, GWBUF *querybuf, const RouteInfo& info)
|
||||
{
|
||||
bool succp = false;
|
||||
uint32_t stmt_id = info.stmt_id;
|
||||
uint8_t command = info.command;
|
||||
uint32_t qtype = info.type;
|
||||
route_target_t route_target = info.target;
|
||||
|
||||
if (is_ps_command(command))
|
||||
{
|
||||
/** Replace the client statement ID with our internal one */
|
||||
replace_binary_ps_id(querybuf, stmt_id);
|
||||
}
|
||||
|
||||
SRWBackend target;
|
||||
|
@ -197,69 +197,3 @@ bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type)
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Determine the type of a query
|
||||
*
|
||||
* @param querybuf GWBUF containing the query
|
||||
* @param packet_type Integer denoting DB specific enum
|
||||
* @param non_empty_packet Boolean to be set by this function
|
||||
*
|
||||
* @return uint32_t the query type; also the non_empty_packet bool is set
|
||||
*/
|
||||
uint32_t
|
||||
determine_query_type(GWBUF *querybuf, int packet_type, bool non_empty_packet)
|
||||
{
|
||||
uint32_t qtype = QUERY_TYPE_UNKNOWN;
|
||||
|
||||
if (non_empty_packet)
|
||||
{
|
||||
uint8_t my_packet_type = (uint8_t)packet_type;
|
||||
switch (my_packet_type)
|
||||
{
|
||||
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
|
||||
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
|
||||
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
|
||||
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
|
||||
case MYSQL_COM_PING: /*< 0e all servers are pinged */
|
||||
case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
|
||||
case MYSQL_COM_SET_OPTION: /*< 1b send options to all servers */
|
||||
qtype = QUERY_TYPE_SESSION_WRITE;
|
||||
break;
|
||||
|
||||
case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */
|
||||
case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */
|
||||
case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */
|
||||
case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */
|
||||
case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */
|
||||
qtype = QUERY_TYPE_WRITE;
|
||||
break;
|
||||
|
||||
case MYSQL_COM_QUERY:
|
||||
qtype = qc_get_type_mask(querybuf);
|
||||
break;
|
||||
|
||||
case MYSQL_COM_STMT_PREPARE:
|
||||
qtype = qc_get_type_mask(querybuf);
|
||||
qtype |= QUERY_TYPE_PREPARE_STMT;
|
||||
break;
|
||||
|
||||
case MYSQL_COM_STMT_EXECUTE:
|
||||
/** Parsing is not needed for this type of packet */
|
||||
qtype = QUERY_TYPE_EXEC_STMT;
|
||||
break;
|
||||
|
||||
case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
|
||||
case MYSQL_COM_STATISTICS: /**< 9 ? */
|
||||
case MYSQL_COM_PROCESS_INFO: /**< 0a ? */
|
||||
case MYSQL_COM_CONNECT: /**< 0b ? */
|
||||
case MYSQL_COM_PROCESS_KILL: /**< 0c ? */
|
||||
case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */
|
||||
case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */
|
||||
case MYSQL_COM_DAEMON: /**< 1d ? */
|
||||
default:
|
||||
break;
|
||||
} /**< switch by packet type */
|
||||
}
|
||||
return qtype;
|
||||
}
|
||||
|
@ -100,3 +100,12 @@ uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer)
|
||||
|
||||
return rval;
|
||||
}
|
||||
|
||||
RouteInfo::RouteInfo(RWSplitSession* rses, GWBUF* buffer):
|
||||
target(TARGET_UNDEFINED),
|
||||
command(0xff),
|
||||
type(QUERY_TYPE_UNKNOWN),
|
||||
stmt_id(0)
|
||||
{
|
||||
target = get_target_type(rses, buffer, &command, &type, &stmt_id);
|
||||
}
|
||||
|
@ -102,6 +102,17 @@ public:
|
||||
skygw_chk_t rses_chk_tail;
|
||||
};
|
||||
|
||||
/** Struct for holding routing related information */
|
||||
struct RouteInfo
|
||||
{
|
||||
RouteInfo(RWSplitSession* rses, GWBUF* buffer);
|
||||
|
||||
route_target_t target; /**< Route target type, TARGET_UNDEFINED for unknown */
|
||||
uint8_t command; /**< The command byte, 0xff for unknown commands */
|
||||
uint32_t type; /**< The query type, QUERY_TYPE_UNKNOWN for unknown types*/
|
||||
uint32_t stmt_id; /**< Prepared statement ID, 0 for unknown */
|
||||
};
|
||||
|
||||
/**
|
||||
* Helper function to convert reply_state_t to string
|
||||
*/
|
||||
|
Reference in New Issue
Block a user