MXS-1625 All RouteInfo functionality moved to routeinfo.cc
Provides a clearer separation between what deals with query classification and what deals with query routing. Functions have only been moved. No other cleanup has been done.
This commit is contained in:
@ -1,5 +1,6 @@
|
|||||||
add_library(readwritesplit SHARED
|
add_library(readwritesplit SHARED
|
||||||
readwritesplit.cc
|
readwritesplit.cc
|
||||||
|
routeinfo.cc
|
||||||
rwsplitsession.cc
|
rwsplitsession.cc
|
||||||
rwsplit_mysql.cc
|
rwsplit_mysql.cc
|
||||||
rwsplit_route_stmt.cc
|
rwsplit_route_stmt.cc
|
||||||
|
|||||||
@ -34,6 +34,7 @@
|
|||||||
|
|
||||||
#include "rwsplit_internal.hh"
|
#include "rwsplit_internal.hh"
|
||||||
#include "rwsplitsession.hh"
|
#include "rwsplitsession.hh"
|
||||||
|
#include "routeinfo.hh"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The entry points for the read/write query splitting router module.
|
* The entry points for the read/write query splitting router module.
|
||||||
|
|||||||
764
server/modules/routing/readwritesplit/routeinfo.cc
Normal file
764
server/modules/routing/readwritesplit/routeinfo.cc
Normal file
@ -0,0 +1,764 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2020-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "routeinfo.hh"
|
||||||
|
#include <maxscale/alloc.h>
|
||||||
|
#include "rwsplitsession.hh"
|
||||||
|
// Only for is_ps_command
|
||||||
|
#include "rwsplit_internal.hh"
|
||||||
|
|
||||||
|
#define RWSPLIT_TRACE_MSG_LEN 1000
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Examine the query type, transaction state and routing hints. Find out the
|
||||||
|
* target for query routing.
|
||||||
|
*
|
||||||
|
* @param qtype Type of query
|
||||||
|
* @param trx_active Is transacation active or not
|
||||||
|
* @param hint Pointer to list of hints attached to the query buffer
|
||||||
|
*
|
||||||
|
* @return bitfield including the routing target, or the target server name
|
||||||
|
* if the query would otherwise be routed to slave.
|
||||||
|
*/
|
||||||
|
route_target_t get_route_target(RWSplitSession *rses, uint8_t command,
|
||||||
|
uint32_t qtype, HINT *query_hints)
|
||||||
|
{
|
||||||
|
bool trx_active = session_trx_is_active(rses->client_dcb->session);
|
||||||
|
bool load_active = rses->load_data_state != LOAD_DATA_INACTIVE;
|
||||||
|
mxs_target_t use_sql_variables_in = rses->rses_config.use_sql_variables_in;
|
||||||
|
int target = TARGET_UNDEFINED;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Prepared statements preparations should go to all servers
|
||||||
|
*/
|
||||||
|
if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT) ||
|
||||||
|
command == MXS_COM_STMT_CLOSE ||
|
||||||
|
command == MXS_COM_STMT_RESET)
|
||||||
|
{
|
||||||
|
target = TARGET_ALL;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* These queries should be routed to all servers
|
||||||
|
*/
|
||||||
|
else if (!load_active &&
|
||||||
|
(qc_query_is_type(qtype, QUERY_TYPE_SESSION_WRITE) ||
|
||||||
|
/** Configured to allow writing user variables to all nodes */
|
||||||
|
(use_sql_variables_in == TYPE_ALL &&
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_USERVAR_WRITE)) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_WRITE) ||
|
||||||
|
/** enable or disable autocommit are always routed to all */
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT)))
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* This is problematic query because it would be routed to all
|
||||||
|
* backends but since this is SELECT that is not possible:
|
||||||
|
* 1. response set is not handled correctly in clientReply and
|
||||||
|
* 2. multiple results can degrade performance.
|
||||||
|
*
|
||||||
|
* Prepared statements are an exception to this since they do not
|
||||||
|
* actually do anything but only prepare the statement to be used.
|
||||||
|
* They can be safely routed to all backends since the execution
|
||||||
|
* is done later.
|
||||||
|
*
|
||||||
|
* With prepared statement caching the task of routing
|
||||||
|
* the execution of the prepared statements to the right server would be
|
||||||
|
* an easy one. Currently this is not supported.
|
||||||
|
*/
|
||||||
|
if (qc_query_is_type(qtype, QUERY_TYPE_READ))
|
||||||
|
{
|
||||||
|
MXS_WARNING("The query can't be routed to all "
|
||||||
|
"backend servers because it includes SELECT and "
|
||||||
|
"SQL variable modifications which is not supported. "
|
||||||
|
"Set use_sql_variables_in=master or split the "
|
||||||
|
"query to two, where SQL variable modifications "
|
||||||
|
"are done in the first and the SELECT in the "
|
||||||
|
"second one.");
|
||||||
|
|
||||||
|
target = TARGET_MASTER;
|
||||||
|
}
|
||||||
|
target |= TARGET_ALL;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Hints may affect on routing of the following queries
|
||||||
|
*/
|
||||||
|
else if (!trx_active && !load_active &&
|
||||||
|
!qc_query_is_type(qtype, QUERY_TYPE_MASTER_READ) &&
|
||||||
|
!qc_query_is_type(qtype, QUERY_TYPE_WRITE) &&
|
||||||
|
(qc_query_is_type(qtype, QUERY_TYPE_READ) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_SHOW_TABLES) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_USERVAR_READ) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_SYSVAR_READ) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_READ)))
|
||||||
|
{
|
||||||
|
if (qc_query_is_type(qtype, QUERY_TYPE_USERVAR_READ))
|
||||||
|
{
|
||||||
|
if (use_sql_variables_in == TYPE_ALL)
|
||||||
|
{
|
||||||
|
target = TARGET_SLAVE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (qc_query_is_type(qtype, QUERY_TYPE_READ) || // Normal read
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_SHOW_TABLES) || // SHOW TABLES
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_SYSVAR_READ) || // System variable
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_READ)) // Global system variable
|
||||||
|
{
|
||||||
|
target = TARGET_SLAVE;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** If nothing matches then choose the master */
|
||||||
|
if ((target & (TARGET_ALL | TARGET_SLAVE | TARGET_MASTER)) == 0)
|
||||||
|
{
|
||||||
|
target = TARGET_MASTER;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (session_trx_is_read_only(rses->client_dcb->session))
|
||||||
|
{
|
||||||
|
/* Force TARGET_SLAVE for READ ONLY transaction (active or ending) */
|
||||||
|
target = TARGET_SLAVE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ss_dassert(trx_active || load_active ||
|
||||||
|
(qc_query_is_type(qtype, QUERY_TYPE_WRITE) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_MASTER_READ) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_SESSION_WRITE) ||
|
||||||
|
(qc_query_is_type(qtype, QUERY_TYPE_USERVAR_READ) &&
|
||||||
|
use_sql_variables_in == TYPE_MASTER) ||
|
||||||
|
(qc_query_is_type(qtype, QUERY_TYPE_SYSVAR_READ) &&
|
||||||
|
use_sql_variables_in == TYPE_MASTER) ||
|
||||||
|
(qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_READ) &&
|
||||||
|
use_sql_variables_in == TYPE_MASTER) ||
|
||||||
|
(qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_WRITE) &&
|
||||||
|
use_sql_variables_in == TYPE_MASTER) ||
|
||||||
|
(qc_query_is_type(qtype, QUERY_TYPE_USERVAR_WRITE) &&
|
||||||
|
use_sql_variables_in == TYPE_MASTER) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_BEGIN_TRX) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_ROLLBACK) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_COMMIT) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_EXEC_STMT) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_CREATE_TMP_TABLE) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_READ_TMP_TABLE) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_UNKNOWN)) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_EXEC_STMT));
|
||||||
|
|
||||||
|
target = TARGET_MASTER;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Process routing hints */
|
||||||
|
for (HINT* hint = query_hints; hint; hint = hint->next)
|
||||||
|
{
|
||||||
|
if (hint->type == HINT_ROUTE_TO_MASTER)
|
||||||
|
{
|
||||||
|
target = TARGET_MASTER; /*< override */
|
||||||
|
MXS_DEBUG("Hint: route to master");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else if (hint->type == HINT_ROUTE_TO_NAMED_SERVER)
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Searching for a named server. If it can't be
|
||||||
|
* found, the oroginal target is chosen.
|
||||||
|
*/
|
||||||
|
target |= TARGET_NAMED_SERVER;
|
||||||
|
MXS_DEBUG("Hint: route to named server: %s", (char*)hint->data);
|
||||||
|
}
|
||||||
|
else if (hint->type == HINT_ROUTE_TO_UPTODATE_SERVER)
|
||||||
|
{
|
||||||
|
/** not implemented */
|
||||||
|
ss_dassert(false);
|
||||||
|
}
|
||||||
|
else if (hint->type == HINT_ROUTE_TO_ALL)
|
||||||
|
{
|
||||||
|
/** not implemented */
|
||||||
|
ss_dassert(false);
|
||||||
|
}
|
||||||
|
else if (hint->type == HINT_PARAMETER)
|
||||||
|
{
|
||||||
|
if (strncasecmp((char*)hint->data, "max_slave_replication_lag",
|
||||||
|
strlen("max_slave_replication_lag")) == 0)
|
||||||
|
{
|
||||||
|
target |= TARGET_RLAG_MAX;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXS_ERROR("Unknown hint parameter '%s' when "
|
||||||
|
"'max_slave_replication_lag' was expected.",
|
||||||
|
(char*)hint->data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (hint->type == HINT_ROUTE_TO_SLAVE)
|
||||||
|
{
|
||||||
|
target = TARGET_SLAVE;
|
||||||
|
MXS_DEBUG("Hint: route to slave.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return (route_target_t)target;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Log the transaction status
|
||||||
|
*
|
||||||
|
* The router session and the query buffer are used to log the transaction
|
||||||
|
* status, along with the query type (which is a generic description that
|
||||||
|
* should be usable across all DB types).
|
||||||
|
*
|
||||||
|
* @param rses Router session
|
||||||
|
* @param querybuf Query buffer
|
||||||
|
* @param qtype Query type
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
log_transaction_status(RWSplitSession *rses, GWBUF *querybuf, uint32_t qtype)
|
||||||
|
{
|
||||||
|
if (rses->large_query)
|
||||||
|
{
|
||||||
|
MXS_INFO("> Processing large request with more than 2^24 bytes of data");
|
||||||
|
}
|
||||||
|
else if (rses->load_data_state == LOAD_DATA_INACTIVE)
|
||||||
|
{
|
||||||
|
uint8_t *packet = GWBUF_DATA(querybuf);
|
||||||
|
unsigned char command = packet[4];
|
||||||
|
int len = 0;
|
||||||
|
char* sql;
|
||||||
|
char *qtypestr = qc_typemask_to_string(qtype);
|
||||||
|
if (!modutil_extract_SQL(querybuf, &sql, &len))
|
||||||
|
{
|
||||||
|
sql = (char*)"<non-SQL>";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (len > RWSPLIT_TRACE_MSG_LEN)
|
||||||
|
{
|
||||||
|
len = RWSPLIT_TRACE_MSG_LEN;
|
||||||
|
}
|
||||||
|
|
||||||
|
MXS_SESSION *ses = rses->client_dcb->session;
|
||||||
|
const char *autocommit = session_is_autocommit(ses) ? "[enabled]" : "[disabled]";
|
||||||
|
const char *transaction = session_trx_is_active(ses) ? "[open]" : "[not open]";
|
||||||
|
uint32_t plen = MYSQL_GET_PACKET_LEN(querybuf);
|
||||||
|
const char *querytype = qtypestr == NULL ? "N/A" : qtypestr;
|
||||||
|
const char *hint = querybuf->hint == NULL ? "" : ", Hint:";
|
||||||
|
const char *hint_type = querybuf->hint == NULL ? "" : STRHINTTYPE(querybuf->hint->type);
|
||||||
|
|
||||||
|
MXS_INFO("> Autocommit: %s, trx is %s, cmd: (0x%02x) %s, plen: %u, type: %s, stmt: %.*s%s %s",
|
||||||
|
autocommit, transaction, command, STRPACKETTYPE(command), plen,
|
||||||
|
querytype, len, sql, hint, hint_type);
|
||||||
|
|
||||||
|
MXS_FREE(qtypestr);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
MXS_INFO("> Processing LOAD DATA LOCAL INFILE: %lu bytes sent.",
|
||||||
|
rses->rses_load_data_sent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Determine the type of a query
|
||||||
|
*
|
||||||
|
* @param querybuf GWBUF containing the query
|
||||||
|
* @param packet_type Integer denoting DB specific enum
|
||||||
|
* @param non_empty_packet Boolean to be set by this function
|
||||||
|
*
|
||||||
|
* @return uint32_t the query type; also the non_empty_packet bool is set
|
||||||
|
*/
|
||||||
|
uint32_t determine_query_type(GWBUF *querybuf, int command)
|
||||||
|
{
|
||||||
|
uint32_t type = QUERY_TYPE_UNKNOWN;
|
||||||
|
|
||||||
|
switch (command)
|
||||||
|
{
|
||||||
|
case MXS_COM_QUIT: /*< 1 QUIT will close all sessions */
|
||||||
|
case MXS_COM_INIT_DB: /*< 2 DDL must go to the master */
|
||||||
|
case MXS_COM_REFRESH: /*< 7 - I guess this is session but not sure */
|
||||||
|
case MXS_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
|
||||||
|
case MXS_COM_PING: /*< 0e all servers are pinged */
|
||||||
|
case MXS_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
|
||||||
|
case MXS_COM_SET_OPTION: /*< 1b send options to all servers */
|
||||||
|
type = QUERY_TYPE_SESSION_WRITE;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MXS_COM_CREATE_DB: /**< 5 DDL must go to the master */
|
||||||
|
case MXS_COM_DROP_DB: /**< 6 DDL must go to the master */
|
||||||
|
case MXS_COM_STMT_CLOSE: /*< free prepared statement */
|
||||||
|
case MXS_COM_STMT_SEND_LONG_DATA: /*< send data to column */
|
||||||
|
case MXS_COM_STMT_RESET: /*< resets the data of a prepared statement */
|
||||||
|
type = QUERY_TYPE_WRITE;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MXS_COM_QUERY:
|
||||||
|
type = qc_get_type_mask(querybuf);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MXS_COM_STMT_PREPARE:
|
||||||
|
type = qc_get_type_mask(querybuf);
|
||||||
|
type |= QUERY_TYPE_PREPARE_STMT;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MXS_COM_STMT_EXECUTE:
|
||||||
|
/** Parsing is not needed for this type of packet */
|
||||||
|
type = QUERY_TYPE_EXEC_STMT;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MXS_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
|
||||||
|
case MXS_COM_STATISTICS: /**< 9 ? */
|
||||||
|
case MXS_COM_PROCESS_INFO: /**< 0a ? */
|
||||||
|
case MXS_COM_CONNECT: /**< 0b ? */
|
||||||
|
case MXS_COM_PROCESS_KILL: /**< 0c ? */
|
||||||
|
case MXS_COM_TIME: /**< 0f should this be run in gateway ? */
|
||||||
|
case MXS_COM_DELAYED_INSERT: /**< 10 ? */
|
||||||
|
case MXS_COM_DAEMON: /**< 1d ? */
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If query is of type QUERY_TYPE_CREATE_TMP_TABLE then find out
|
||||||
|
* the database and table name, create a hashvalue and
|
||||||
|
* add it to the router client session's property. If property
|
||||||
|
* doesn't exist then create it first.
|
||||||
|
* @param router_cli_ses Router client session
|
||||||
|
* @param querybuf GWBUF containing the query
|
||||||
|
* @param type The type of the query resolved so far
|
||||||
|
*/
|
||||||
|
void check_create_tmp_table(RWSplitSession *router_cli_ses,
|
||||||
|
GWBUF *querybuf, uint32_t type)
|
||||||
|
{
|
||||||
|
if (qc_query_is_type(type, QUERY_TYPE_CREATE_TMP_TABLE))
|
||||||
|
{
|
||||||
|
ss_dassert(router_cli_ses && querybuf && router_cli_ses->client_dcb &&
|
||||||
|
router_cli_ses->client_dcb->data);
|
||||||
|
|
||||||
|
router_cli_ses->have_tmp_tables = true;
|
||||||
|
char* tblname = qc_get_created_table_name(querybuf);
|
||||||
|
std::string table;
|
||||||
|
|
||||||
|
if (tblname && *tblname && strchr(tblname, '.') == NULL)
|
||||||
|
{
|
||||||
|
const char* db = mxs_mysql_get_current_db(router_cli_ses->client_dcb->session);
|
||||||
|
table += db;
|
||||||
|
table += ".";
|
||||||
|
table += tblname;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Add the table to the set of temporary tables */
|
||||||
|
router_cli_ses->temp_tables.insert(table);
|
||||||
|
|
||||||
|
MXS_FREE(tblname);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find callback for foreach_table
|
||||||
|
*/
|
||||||
|
bool find_table(RWSplitSession* rses, const std::string& table)
|
||||||
|
{
|
||||||
|
if (rses->temp_tables.find(table) != rses->temp_tables.end())
|
||||||
|
{
|
||||||
|
MXS_INFO("Query targets a temporary table: %s", table.c_str());
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Map a function over the list of tables in the query
|
||||||
|
*
|
||||||
|
* @param rses Router client session
|
||||||
|
* @param querybuf The query to inspect
|
||||||
|
* @param func Callback that is called for each table
|
||||||
|
*
|
||||||
|
* @return True if all tables were iterated, false if the iteration was stopped early
|
||||||
|
*/
|
||||||
|
static bool foreach_table(RWSplitSession* rses, GWBUF* querybuf, bool (*func)(RWSplitSession*,
|
||||||
|
const std::string&))
|
||||||
|
{
|
||||||
|
bool rval = true;
|
||||||
|
int n_tables;
|
||||||
|
char** tables = qc_get_table_names(querybuf, &n_tables, true);
|
||||||
|
|
||||||
|
for (int i = 0; i < n_tables; i++)
|
||||||
|
{
|
||||||
|
const char* db = mxs_mysql_get_current_db(rses->client_dcb->session);
|
||||||
|
std::string table;
|
||||||
|
|
||||||
|
if (strchr(tables[i], '.') == NULL)
|
||||||
|
{
|
||||||
|
table += db;
|
||||||
|
table += ".";
|
||||||
|
}
|
||||||
|
|
||||||
|
table += tables[i];
|
||||||
|
|
||||||
|
if (!func(rses, table))
|
||||||
|
{
|
||||||
|
rval = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the query targets a temporary table.
|
||||||
|
* @param router_cli_ses Router client session
|
||||||
|
* @param querybuf GWBUF containing the query
|
||||||
|
* @param type The type of the query resolved so far
|
||||||
|
* @return The type of the query
|
||||||
|
*/
|
||||||
|
bool is_read_tmp_table(RWSplitSession *rses,
|
||||||
|
GWBUF *querybuf,
|
||||||
|
uint32_t qtype)
|
||||||
|
{
|
||||||
|
ss_dassert(rses && querybuf && rses->client_dcb);
|
||||||
|
bool rval = false;
|
||||||
|
|
||||||
|
if (qc_query_is_type(qtype, QUERY_TYPE_READ) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_LOCAL_READ) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_USERVAR_READ) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_SYSVAR_READ) ||
|
||||||
|
qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_READ))
|
||||||
|
{
|
||||||
|
if (!foreach_table(rses, querybuf, find_table))
|
||||||
|
{
|
||||||
|
rval = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Delete callback for foreach_table
|
||||||
|
*/
|
||||||
|
bool delete_table(RWSplitSession *rses, const std::string& table)
|
||||||
|
{
|
||||||
|
rses->temp_tables.erase(table);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Check for dropping of temporary tables
|
||||||
|
*
|
||||||
|
* Check if the query is a DROP TABLE... query and
|
||||||
|
* if it targets a temporary table, remove it from the hashtable.
|
||||||
|
* @param router_cli_ses Router client session
|
||||||
|
* @param querybuf GWBUF containing the query
|
||||||
|
* @param type The type of the query resolved so far
|
||||||
|
*/
|
||||||
|
void check_drop_tmp_table(RWSplitSession *rses, GWBUF *querybuf)
|
||||||
|
{
|
||||||
|
if (qc_is_drop_table_query(querybuf))
|
||||||
|
{
|
||||||
|
foreach_table(rses, querybuf, delete_table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Determine if a packet contains a SQL query
|
||||||
|
*
|
||||||
|
* Packet type tells us this, but in a DB specific way. This function is
|
||||||
|
* provided so that code that is not DB specific can find out whether a packet
|
||||||
|
* contains a SQL query. Clearly, to be effective different functions must be
|
||||||
|
* called for different DB types.
|
||||||
|
*
|
||||||
|
* @param packet_type Type of packet (integer)
|
||||||
|
* @return bool indicating whether packet contains a SQL query
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
is_packet_a_query(int packet_type)
|
||||||
|
{
|
||||||
|
return (packet_type == MXS_COM_QUERY);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool check_for_sp_call(GWBUF *buf, uint8_t packet_type)
|
||||||
|
{
|
||||||
|
return packet_type == MXS_COM_QUERY && qc_get_operation(buf) == QUERY_OP_CALL;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline bool have_semicolon(const char* ptr, int len)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < len; i++)
|
||||||
|
{
|
||||||
|
if (ptr[i] == ';')
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Detect multi-statement queries
|
||||||
|
*
|
||||||
|
* It is possible that the session state is modified inside a multi-statement
|
||||||
|
* query which would leave any slave sessions in an inconsistent state. Due to
|
||||||
|
* this, for the duration of this session, all queries will be sent to the
|
||||||
|
* master
|
||||||
|
* if the current query contains a multi-statement query.
|
||||||
|
* @param rses Router client session
|
||||||
|
* @param buf Buffer containing the full query
|
||||||
|
* @return True if the query contains multiple statements
|
||||||
|
*/
|
||||||
|
bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type)
|
||||||
|
{
|
||||||
|
MySQLProtocol *proto = (MySQLProtocol *)protocol;
|
||||||
|
bool rval = false;
|
||||||
|
|
||||||
|
if (proto->client_capabilities & GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS &&
|
||||||
|
packet_type == MXS_COM_QUERY)
|
||||||
|
{
|
||||||
|
char *ptr, *data = (char*)GWBUF_DATA(buf) + 5;
|
||||||
|
/** Payload size without command byte */
|
||||||
|
int buflen = gw_mysql_get_byte3((uint8_t *)GWBUF_DATA(buf)) - 1;
|
||||||
|
|
||||||
|
if (have_semicolon(data, buflen) && (ptr = strnchr_esc_mysql(data, ';', buflen)))
|
||||||
|
{
|
||||||
|
/** Skip stored procedures etc. */
|
||||||
|
while (ptr && is_mysql_sp_end(ptr, buflen - (ptr - data)))
|
||||||
|
{
|
||||||
|
ptr = strnchr_esc_mysql(ptr + 1, ';', buflen - (ptr - data) - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ptr)
|
||||||
|
{
|
||||||
|
if (ptr < data + buflen &&
|
||||||
|
!is_mysql_statement_end(ptr, buflen - (ptr - data)))
|
||||||
|
{
|
||||||
|
rval = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return rval;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Handle multi statement queries and load statements
|
||||||
|
*
|
||||||
|
* One of the possible types of handling required when a request is routed
|
||||||
|
*
|
||||||
|
* @param ses Router session
|
||||||
|
* @param querybuf Buffer containing query to be routed
|
||||||
|
* @param packet_type Type of packet (database specific)
|
||||||
|
* @param qtype Query type
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
handle_multi_temp_and_load(RWSplitSession *rses, GWBUF *querybuf,
|
||||||
|
uint8_t packet_type, uint32_t *qtype)
|
||||||
|
{
|
||||||
|
/** Check for multi-statement queries. If no master server is available
|
||||||
|
* and a multi-statement is issued, an error is returned to the client
|
||||||
|
* when the query is routed.
|
||||||
|
*
|
||||||
|
* If we do not have a master node, assigning the forced node is not
|
||||||
|
* effective since we don't have a node to force queries to. In this
|
||||||
|
* situation, assigning QUERY_TYPE_WRITE for the query will trigger
|
||||||
|
* the error processing. */
|
||||||
|
if ((rses->target_node == NULL || rses->target_node != rses->current_master) &&
|
||||||
|
(check_for_multi_stmt(querybuf, rses->client_dcb->protocol, packet_type) ||
|
||||||
|
check_for_sp_call(querybuf, packet_type)))
|
||||||
|
{
|
||||||
|
if (rses->current_master && rses->current_master->in_use())
|
||||||
|
{
|
||||||
|
rses->target_node = rses->current_master;
|
||||||
|
MXS_INFO("Multi-statement query or stored procedure call, routing "
|
||||||
|
"all future queries to master.");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
*qtype |= QUERY_TYPE_WRITE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Make checks prior to calling temp tables functions
|
||||||
|
*/
|
||||||
|
|
||||||
|
if (rses == NULL || querybuf == NULL ||
|
||||||
|
rses->client_dcb == NULL || rses->client_dcb->data == NULL)
|
||||||
|
{
|
||||||
|
if (rses == NULL || querybuf == NULL)
|
||||||
|
{
|
||||||
|
MXS_ERROR("[%s] Error: NULL variables for temp table checks: %p %p", __FUNCTION__,
|
||||||
|
rses, querybuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rses->client_dcb == NULL)
|
||||||
|
{
|
||||||
|
MXS_ERROR("[%s] Error: Client DCB is NULL.", __FUNCTION__);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rses->client_dcb->data == NULL)
|
||||||
|
{
|
||||||
|
MXS_ERROR("[%s] Error: User data in master server DBC is NULL.",
|
||||||
|
__FUNCTION__);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Check if the query has anything to do with temporary tables.
|
||||||
|
*/
|
||||||
|
if (rses->have_tmp_tables && is_packet_a_query(packet_type))
|
||||||
|
{
|
||||||
|
check_drop_tmp_table(rses, querybuf);
|
||||||
|
if (is_read_tmp_table(rses, querybuf, *qtype))
|
||||||
|
{
|
||||||
|
*qtype |= QUERY_TYPE_MASTER_READ;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
check_create_tmp_table(rses, querybuf, *qtype);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if this is a LOAD DATA LOCAL INFILE query. If so, send all queries
|
||||||
|
* to the master until the last, empty packet arrives.
|
||||||
|
*/
|
||||||
|
if (rses->load_data_state == LOAD_DATA_ACTIVE)
|
||||||
|
{
|
||||||
|
rses->rses_load_data_sent += gwbuf_length(querybuf);
|
||||||
|
}
|
||||||
|
else if (is_packet_a_query(packet_type))
|
||||||
|
{
|
||||||
|
qc_query_op_t queryop = qc_get_operation(querybuf);
|
||||||
|
if (queryop == QUERY_OP_LOAD)
|
||||||
|
{
|
||||||
|
rses->load_data_state = LOAD_DATA_START;
|
||||||
|
rses->rses_load_data_sent = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Get the routing requirements for a query
|
||||||
|
*
|
||||||
|
* @param rses Router client session
|
||||||
|
* @param buffer Buffer containing the query
|
||||||
|
* @param command Output parameter where the packet command is stored
|
||||||
|
* @param type Output parameter where the query type is stored
|
||||||
|
* @param stmt_id Output parameter where statement ID, if the query is a binary protocol command, is stored
|
||||||
|
*
|
||||||
|
* @return The target type where this query should be routed
|
||||||
|
*/
|
||||||
|
route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer,
|
||||||
|
uint8_t* command, uint32_t* type, uint32_t* stmt_id)
|
||||||
|
{
|
||||||
|
route_target_t route_target = TARGET_MASTER;
|
||||||
|
bool in_read_only_trx = rses->target_node && session_trx_is_read_only(rses->client_dcb->session);
|
||||||
|
|
||||||
|
if (gwbuf_length(buffer) > MYSQL_HEADER_LEN)
|
||||||
|
{
|
||||||
|
*command = mxs_mysql_get_command(buffer);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the session is inside a read-only transaction, we trust that the
|
||||||
|
* server acts properly even when non-read-only queries are executed.
|
||||||
|
* For this reason, we can skip the parsing of the statement completely.
|
||||||
|
*/
|
||||||
|
if (in_read_only_trx)
|
||||||
|
{
|
||||||
|
*type = QUERY_TYPE_READ;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
*type = determine_query_type(buffer, *command);
|
||||||
|
handle_multi_temp_and_load(rses, buffer, *command, type);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
|
||||||
|
{
|
||||||
|
log_transaction_status(rses, buffer, *type);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Find out where to route the query. Result may not be clear; it is
|
||||||
|
* possible to have a hint for routing to a named server which can
|
||||||
|
* be either slave or master.
|
||||||
|
* If query would otherwise be routed to slave then the hint determines
|
||||||
|
* actual target server if it exists.
|
||||||
|
*
|
||||||
|
* route_target is a bitfield and may include :
|
||||||
|
* TARGET_ALL
|
||||||
|
* - route to all connected backend servers
|
||||||
|
* TARGET_SLAVE[|TARGET_NAMED_SERVER|TARGET_RLAG_MAX]
|
||||||
|
* - route primarily according to hints, then to slave and if those
|
||||||
|
* failed, eventually to master
|
||||||
|
* TARGET_MASTER[|TARGET_NAMED_SERVER|TARGET_RLAG_MAX]
|
||||||
|
* - route primarily according to the hints and if they failed,
|
||||||
|
* eventually to master
|
||||||
|
*/
|
||||||
|
|
||||||
|
if (rses->target_node && rses->target_node == rses->current_master)
|
||||||
|
{
|
||||||
|
/** The session is locked to the master */
|
||||||
|
route_target = TARGET_MASTER;
|
||||||
|
|
||||||
|
if (qc_query_is_type(*type, QUERY_TYPE_PREPARE_NAMED_STMT) ||
|
||||||
|
qc_query_is_type(*type, QUERY_TYPE_PREPARE_STMT))
|
||||||
|
{
|
||||||
|
gwbuf_set_type(buffer, GWBUF_TYPE_COLLECT_RESULT);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (!in_read_only_trx &&
|
||||||
|
*command == MXS_COM_QUERY &&
|
||||||
|
qc_get_operation(buffer) == QUERY_OP_EXECUTE)
|
||||||
|
{
|
||||||
|
std::string id = get_text_ps_id(buffer);
|
||||||
|
*type = rses->ps_manager.get_type(id);
|
||||||
|
}
|
||||||
|
else if (is_ps_command(*command))
|
||||||
|
{
|
||||||
|
*stmt_id = get_internal_ps_id(rses, buffer);
|
||||||
|
*type = rses->ps_manager.get_type(*stmt_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
route_target = get_route_target(rses, *command, *type, buffer->hint);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/** Empty packet signals end of LOAD DATA LOCAL INFILE, send it to master*/
|
||||||
|
rses->load_data_state = LOAD_DATA_END;
|
||||||
|
MXS_INFO("> LOAD DATA LOCAL INFILE finished: %lu bytes sent.",
|
||||||
|
rses->rses_load_data_sent + gwbuf_length(buffer));
|
||||||
|
}
|
||||||
|
|
||||||
|
return route_target;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
RouteInfo::RouteInfo(RWSplitSession* rses, GWBUF* buffer)
|
||||||
|
: target(TARGET_UNDEFINED)
|
||||||
|
, command(0xff)
|
||||||
|
, type(QUERY_TYPE_UNKNOWN)
|
||||||
|
, stmt_id(0)
|
||||||
|
{
|
||||||
|
target = get_target_type(rses, buffer, &command, &type, &stmt_id);
|
||||||
|
}
|
||||||
27
server/modules/routing/readwritesplit/routeinfo.hh
Normal file
27
server/modules/routing/readwritesplit/routeinfo.hh
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
#pragma once
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2016 MariaDB Corporation Ab
|
||||||
|
*
|
||||||
|
* Use of this software is governed by the Business Source License included
|
||||||
|
* in the LICENSE.TXT file and at www.mariadb.com/bsl11.
|
||||||
|
*
|
||||||
|
* Change Date: 2020-01-01
|
||||||
|
*
|
||||||
|
* On the date above, in accordance with the Business Source License, use
|
||||||
|
* of this software will be governed by version 2 or later of the General
|
||||||
|
* Public License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "readwritesplit.hh"
|
||||||
|
|
||||||
|
class RWSplitSession;
|
||||||
|
|
||||||
|
struct RouteInfo
|
||||||
|
{
|
||||||
|
RouteInfo(RWSplitSession* rses, GWBUF* buffer);
|
||||||
|
|
||||||
|
route_target_t target; /**< Route target type, TARGET_UNDEFINED for unknown */
|
||||||
|
uint8_t command; /**< The command byte, 0xff for unknown commands */
|
||||||
|
uint32_t type; /**< The query type, QUERY_TYPE_UNKNOWN for unknown types*/
|
||||||
|
uint32_t stmt_id; /**< Prepared statement ID, 0 for unknown */
|
||||||
|
};
|
||||||
@ -21,6 +21,8 @@
|
|||||||
|
|
||||||
#include "rwsplitsession.hh"
|
#include "rwsplitsession.hh"
|
||||||
|
|
||||||
|
class RouteInfo;
|
||||||
|
|
||||||
#define RW_CHK_DCB(b, d) \
|
#define RW_CHK_DCB(b, d) \
|
||||||
do{ \
|
do{ \
|
||||||
if(d->state == DCB_STATE_DISCONNECTED){ \
|
if(d->state == DCB_STATE_DISCONNECTED){ \
|
||||||
@ -52,8 +54,6 @@ bool execute_sescmd_in_backend(SRWBackend& backend_ref);
|
|||||||
bool handle_target_is_all(route_target_t route_target,
|
bool handle_target_is_all(route_target_t route_target,
|
||||||
RWSplit *inst, RWSplitSession *rses,
|
RWSplit *inst, RWSplitSession *rses,
|
||||||
GWBUF *querybuf, int packet_type, uint32_t qtype);
|
GWBUF *querybuf, int packet_type, uint32_t qtype);
|
||||||
void log_transaction_status(RWSplitSession *rses, GWBUF *querybuf, uint32_t qtype);
|
|
||||||
bool is_packet_a_query(int packet_type);
|
|
||||||
bool send_readonly_error(DCB *dcb);
|
bool send_readonly_error(DCB *dcb);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -70,10 +70,6 @@ bool route_single_stmt(RWSplit *inst, RWSplitSession *rses,
|
|||||||
GWBUF *querybuf, const RouteInfo& info);
|
GWBUF *querybuf, const RouteInfo& info);
|
||||||
SRWBackend get_target_backend(RWSplitSession *rses, backend_type_t btype,
|
SRWBackend get_target_backend(RWSplitSession *rses, backend_type_t btype,
|
||||||
char *name, int max_rlag);
|
char *name, int max_rlag);
|
||||||
route_target_t get_route_target(RWSplitSession *rses, uint8_t command,
|
|
||||||
uint32_t qtype, HINT *hint);
|
|
||||||
void handle_multi_temp_and_load(RWSplitSession *rses, GWBUF *querybuf,
|
|
||||||
uint8_t packet_type, uint32_t *qtype);
|
|
||||||
SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf,
|
SRWBackend handle_hinted_target(RWSplitSession *rses, GWBUF *querybuf,
|
||||||
route_target_t route_target);
|
route_target_t route_target);
|
||||||
SRWBackend handle_slave_is_target(RWSplit *inst, RWSplitSession *rses,
|
SRWBackend handle_slave_is_target(RWSplit *inst, RWSplitSession *rses,
|
||||||
@ -110,29 +106,4 @@ bool select_connect_backend_servers(int router_nservers,
|
|||||||
/*
|
/*
|
||||||
* The following are implemented in rwsplit_tmp_table_multi.c
|
* The following are implemented in rwsplit_tmp_table_multi.c
|
||||||
*/
|
*/
|
||||||
void check_drop_tmp_table(RWSplitSession *router_cli_ses, GWBUF *querybuf);
|
|
||||||
bool is_read_tmp_table(RWSplitSession *router_cli_ses,
|
|
||||||
GWBUF *querybuf,
|
|
||||||
uint32_t type);
|
|
||||||
void check_create_tmp_table(RWSplitSession *router_cli_ses,
|
|
||||||
GWBUF *querybuf, uint32_t type);
|
|
||||||
bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type);
|
|
||||||
bool check_for_sp_call(GWBUF *buf, uint8_t packet_type);
|
|
||||||
|
|
||||||
void close_all_connections(SRWBackendList& backends);
|
void close_all_connections(SRWBackendList& backends);
|
||||||
|
|
||||||
uint32_t determine_query_type(GWBUF *querybuf, int command);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Get the routing requirements for a query
|
|
||||||
*
|
|
||||||
* @param rses Router client session
|
|
||||||
* @param buffer Buffer containing the query
|
|
||||||
* @param command Output parameter where the packet command is stored
|
|
||||||
* @param type Output parameter where the query type is stored
|
|
||||||
* @param stmt_id Output parameter where statement ID, if the query is a binary protocol command, is stored
|
|
||||||
*
|
|
||||||
* @return The target type where this query should be routed
|
|
||||||
*/
|
|
||||||
route_target_t get_target_type(RWSplitSession* rses, GWBUF* buffer, uint8_t* command,
|
|
||||||
uint32_t* type, uint32_t* stmt_id);
|
|
||||||
|
|||||||
@ -29,8 +29,6 @@
|
|||||||
#include <maxscale/protocol/mysql.h>
|
#include <maxscale/protocol/mysql.h>
|
||||||
#include <maxscale/alloc.h>
|
#include <maxscale/alloc.h>
|
||||||
|
|
||||||
#define RWSPLIT_TRACE_MSG_LEN 1000
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Functions within the read-write split router that are specific to
|
* Functions within the read-write split router that are specific to
|
||||||
* MySQL. The aim is to either remove these into a separate module or to
|
* MySQL. The aim is to either remove these into a separate module or to
|
||||||
@ -50,148 +48,13 @@
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Determine the type of a query
|
|
||||||
*
|
|
||||||
* @param querybuf GWBUF containing the query
|
|
||||||
* @param packet_type Integer denoting DB specific enum
|
|
||||||
* @param non_empty_packet Boolean to be set by this function
|
|
||||||
*
|
|
||||||
* @return uint32_t the query type; also the non_empty_packet bool is set
|
|
||||||
*/
|
|
||||||
uint32_t determine_query_type(GWBUF *querybuf, int command)
|
|
||||||
{
|
|
||||||
uint32_t type = QUERY_TYPE_UNKNOWN;
|
|
||||||
|
|
||||||
switch (command)
|
|
||||||
{
|
|
||||||
case MXS_COM_QUIT: /*< 1 QUIT will close all sessions */
|
|
||||||
case MXS_COM_INIT_DB: /*< 2 DDL must go to the master */
|
|
||||||
case MXS_COM_REFRESH: /*< 7 - I guess this is session but not sure */
|
|
||||||
case MXS_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
|
|
||||||
case MXS_COM_PING: /*< 0e all servers are pinged */
|
|
||||||
case MXS_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
|
|
||||||
case MXS_COM_SET_OPTION: /*< 1b send options to all servers */
|
|
||||||
type = QUERY_TYPE_SESSION_WRITE;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case MXS_COM_CREATE_DB: /**< 5 DDL must go to the master */
|
|
||||||
case MXS_COM_DROP_DB: /**< 6 DDL must go to the master */
|
|
||||||
case MXS_COM_STMT_CLOSE: /*< free prepared statement */
|
|
||||||
case MXS_COM_STMT_SEND_LONG_DATA: /*< send data to column */
|
|
||||||
case MXS_COM_STMT_RESET: /*< resets the data of a prepared statement */
|
|
||||||
type = QUERY_TYPE_WRITE;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case MXS_COM_QUERY:
|
|
||||||
type = qc_get_type_mask(querybuf);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case MXS_COM_STMT_PREPARE:
|
|
||||||
type = qc_get_type_mask(querybuf);
|
|
||||||
type |= QUERY_TYPE_PREPARE_STMT;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case MXS_COM_STMT_EXECUTE:
|
|
||||||
/** Parsing is not needed for this type of packet */
|
|
||||||
type = QUERY_TYPE_EXEC_STMT;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case MXS_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
|
|
||||||
case MXS_COM_STATISTICS: /**< 9 ? */
|
|
||||||
case MXS_COM_PROCESS_INFO: /**< 0a ? */
|
|
||||||
case MXS_COM_CONNECT: /**< 0b ? */
|
|
||||||
case MXS_COM_PROCESS_KILL: /**< 0c ? */
|
|
||||||
case MXS_COM_TIME: /**< 0f should this be run in gateway ? */
|
|
||||||
case MXS_COM_DELAYED_INSERT: /**< 10 ? */
|
|
||||||
case MXS_COM_DAEMON: /**< 1d ? */
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
return type;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This appears to be MySQL specific
|
* This appears to be MySQL specific
|
||||||
*/
|
*/
|
||||||
/**
|
|
||||||
* @brief Determine if a packet contains a SQL query
|
|
||||||
*
|
|
||||||
* Packet type tells us this, but in a DB specific way. This function is
|
|
||||||
* provided so that code that is not DB specific can find out whether a packet
|
|
||||||
* contains a SQL query. Clearly, to be effective different functions must be
|
|
||||||
* called for different DB types.
|
|
||||||
*
|
|
||||||
* @param packet_type Type of packet (integer)
|
|
||||||
* @return bool indicating whether packet contains a SQL query
|
|
||||||
*/
|
|
||||||
bool
|
|
||||||
is_packet_a_query(int packet_type)
|
|
||||||
{
|
|
||||||
return (packet_type == MXS_COM_QUERY);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This one is problematic because it is MySQL specific, but also router
|
* This one is problematic because it is MySQL specific, but also router
|
||||||
* specific.
|
* specific.
|
||||||
*/
|
*/
|
||||||
/**
|
|
||||||
* @brief Log the transaction status
|
|
||||||
*
|
|
||||||
* The router session and the query buffer are used to log the transaction
|
|
||||||
* status, along with the query type (which is a generic description that
|
|
||||||
* should be usable across all DB types).
|
|
||||||
*
|
|
||||||
* @param rses Router session
|
|
||||||
* @param querybuf Query buffer
|
|
||||||
* @param qtype Query type
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
log_transaction_status(RWSplitSession *rses, GWBUF *querybuf, uint32_t qtype)
|
|
||||||
{
|
|
||||||
if (rses->large_query)
|
|
||||||
{
|
|
||||||
MXS_INFO("> Processing large request with more than 2^24 bytes of data");
|
|
||||||
}
|
|
||||||
else if (rses->load_data_state == LOAD_DATA_INACTIVE)
|
|
||||||
{
|
|
||||||
uint8_t *packet = GWBUF_DATA(querybuf);
|
|
||||||
unsigned char command = packet[4];
|
|
||||||
int len = 0;
|
|
||||||
char* sql;
|
|
||||||
char *qtypestr = qc_typemask_to_string(qtype);
|
|
||||||
if (!modutil_extract_SQL(querybuf, &sql, &len))
|
|
||||||
{
|
|
||||||
sql = (char*)"<non-SQL>";
|
|
||||||
}
|
|
||||||
|
|
||||||
if (len > RWSPLIT_TRACE_MSG_LEN)
|
|
||||||
{
|
|
||||||
len = RWSPLIT_TRACE_MSG_LEN;
|
|
||||||
}
|
|
||||||
|
|
||||||
MXS_SESSION *ses = rses->client_dcb->session;
|
|
||||||
const char *autocommit = session_is_autocommit(ses) ? "[enabled]" : "[disabled]";
|
|
||||||
const char *transaction = session_trx_is_active(ses) ? "[open]" : "[not open]";
|
|
||||||
uint32_t plen = MYSQL_GET_PACKET_LEN(querybuf);
|
|
||||||
const char *querytype = qtypestr == NULL ? "N/A" : qtypestr;
|
|
||||||
const char *hint = querybuf->hint == NULL ? "" : ", Hint:";
|
|
||||||
const char *hint_type = querybuf->hint == NULL ? "" : STRHINTTYPE(querybuf->hint->type);
|
|
||||||
|
|
||||||
MXS_INFO("> Autocommit: %s, trx is %s, cmd: (0x%02x) %s, plen: %u, type: %s, stmt: %.*s%s %s",
|
|
||||||
autocommit, transaction, command, STRPACKETTYPE(command), plen,
|
|
||||||
querytype, len, sql, hint, hint_type);
|
|
||||||
|
|
||||||
MXS_FREE(qtypestr);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
MXS_INFO("> Processing LOAD DATA LOCAL INFILE: %lu bytes sent.",
|
|
||||||
rses->rses_load_data_sent);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This is mostly router code, but it contains MySQL specific operations that
|
* This is mostly router code, but it contains MySQL specific operations that
|
||||||
* maybe could be moved to the protocol module. The modutil functions are mostly
|
* maybe could be moved to the protocol module. The modutil functions are mostly
|
||||||
|
|||||||
@ -12,7 +12,6 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include "readwritesplit.hh"
|
#include "readwritesplit.hh"
|
||||||
#include "rwsplit_internal.hh"
|
|
||||||
|
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
@ -25,6 +24,9 @@
|
|||||||
#include <maxscale/router.h>
|
#include <maxscale/router.h>
|
||||||
#include <maxscale/server.h>
|
#include <maxscale/server.h>
|
||||||
|
|
||||||
|
#include "routeinfo.hh"
|
||||||
|
#include "rwsplit_internal.hh"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The functions that support the routing of queries to back end
|
* The functions that support the routing of queries to back end
|
||||||
* servers. All the functions in this module are internal to the read
|
* servers. All the functions in this module are internal to the read
|
||||||
@ -93,93 +95,6 @@ void handle_connection_keepalive(RWSplit *inst, RWSplitSession *rses,
|
|||||||
ss_dassert(nserv < rses->rses_nbackends);
|
ss_dassert(nserv < rses->rses_nbackends);
|
||||||
}
|
}
|
||||||
|
|
||||||
route_target_t get_target_type(RWSplitSession *rses, GWBUF *buffer,
|
|
||||||
uint8_t* command, uint32_t* type, uint32_t* stmt_id)
|
|
||||||
{
|
|
||||||
route_target_t route_target = TARGET_MASTER;
|
|
||||||
bool in_read_only_trx = rses->target_node && session_trx_is_read_only(rses->client_dcb->session);
|
|
||||||
|
|
||||||
if (gwbuf_length(buffer) > MYSQL_HEADER_LEN)
|
|
||||||
{
|
|
||||||
*command = mxs_mysql_get_command(buffer);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If the session is inside a read-only transaction, we trust that the
|
|
||||||
* server acts properly even when non-read-only queries are executed.
|
|
||||||
* For this reason, we can skip the parsing of the statement completely.
|
|
||||||
*/
|
|
||||||
if (in_read_only_trx)
|
|
||||||
{
|
|
||||||
*type = QUERY_TYPE_READ;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
*type = determine_query_type(buffer, *command);
|
|
||||||
handle_multi_temp_and_load(rses, buffer, *command, type);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
|
|
||||||
{
|
|
||||||
log_transaction_status(rses, buffer, *type);
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Find out where to route the query. Result may not be clear; it is
|
|
||||||
* possible to have a hint for routing to a named server which can
|
|
||||||
* be either slave or master.
|
|
||||||
* If query would otherwise be routed to slave then the hint determines
|
|
||||||
* actual target server if it exists.
|
|
||||||
*
|
|
||||||
* route_target is a bitfield and may include :
|
|
||||||
* TARGET_ALL
|
|
||||||
* - route to all connected backend servers
|
|
||||||
* TARGET_SLAVE[|TARGET_NAMED_SERVER|TARGET_RLAG_MAX]
|
|
||||||
* - route primarily according to hints, then to slave and if those
|
|
||||||
* failed, eventually to master
|
|
||||||
* TARGET_MASTER[|TARGET_NAMED_SERVER|TARGET_RLAG_MAX]
|
|
||||||
* - route primarily according to the hints and if they failed,
|
|
||||||
* eventually to master
|
|
||||||
*/
|
|
||||||
|
|
||||||
if (rses->target_node && rses->target_node == rses->current_master)
|
|
||||||
{
|
|
||||||
/** The session is locked to the master */
|
|
||||||
route_target = TARGET_MASTER;
|
|
||||||
|
|
||||||
if (qc_query_is_type(*type, QUERY_TYPE_PREPARE_NAMED_STMT) ||
|
|
||||||
qc_query_is_type(*type, QUERY_TYPE_PREPARE_STMT))
|
|
||||||
{
|
|
||||||
gwbuf_set_type(buffer, GWBUF_TYPE_COLLECT_RESULT);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (!in_read_only_trx &&
|
|
||||||
*command == MXS_COM_QUERY &&
|
|
||||||
qc_get_operation(buffer) == QUERY_OP_EXECUTE)
|
|
||||||
{
|
|
||||||
std::string id = get_text_ps_id(buffer);
|
|
||||||
*type = rses->ps_manager.get_type(id);
|
|
||||||
}
|
|
||||||
else if (is_ps_command(*command))
|
|
||||||
{
|
|
||||||
*stmt_id = get_internal_ps_id(rses, buffer);
|
|
||||||
*type = rses->ps_manager.get_type(*stmt_id);
|
|
||||||
}
|
|
||||||
|
|
||||||
route_target = get_route_target(rses, *command, *type, buffer->hint);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/** Empty packet signals end of LOAD DATA LOCAL INFILE, send it to master*/
|
|
||||||
rses->load_data_state = LOAD_DATA_END;
|
|
||||||
MXS_INFO("> LOAD DATA LOCAL INFILE finished: %lu bytes sent.",
|
|
||||||
rses->rses_load_data_sent + gwbuf_length(buffer));
|
|
||||||
}
|
|
||||||
|
|
||||||
return route_target;
|
|
||||||
}
|
|
||||||
|
|
||||||
static inline bool locked_to_master(RWSplitSession *rses)
|
static inline bool locked_to_master(RWSplitSession *rses)
|
||||||
{
|
{
|
||||||
return rses->large_query || (rses->current_master && rses->target_node == rses->current_master);
|
return rses->large_query || (rses->current_master && rses->target_node == rses->current_master);
|
||||||
@ -580,295 +495,6 @@ SRWBackend get_target_backend(RWSplitSession *rses, backend_type_t btype,
|
|||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Examine the query type, transaction state and routing hints. Find out the
|
|
||||||
* target for query routing.
|
|
||||||
*
|
|
||||||
* @param qtype Type of query
|
|
||||||
* @param trx_active Is transacation active or not
|
|
||||||
* @param hint Pointer to list of hints attached to the query buffer
|
|
||||||
*
|
|
||||||
* @return bitfield including the routing target, or the target server name
|
|
||||||
* if the query would otherwise be routed to slave.
|
|
||||||
*/
|
|
||||||
route_target_t get_route_target(RWSplitSession *rses, uint8_t command,
|
|
||||||
uint32_t qtype, HINT *query_hints)
|
|
||||||
{
|
|
||||||
bool trx_active = session_trx_is_active(rses->client_dcb->session);
|
|
||||||
bool load_active = rses->load_data_state != LOAD_DATA_INACTIVE;
|
|
||||||
mxs_target_t use_sql_variables_in = rses->rses_config.use_sql_variables_in;
|
|
||||||
int target = TARGET_UNDEFINED;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Prepared statements preparations should go to all servers
|
|
||||||
*/
|
|
||||||
if (qc_query_is_type(qtype, QUERY_TYPE_PREPARE_STMT) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_PREPARE_NAMED_STMT) ||
|
|
||||||
command == MXS_COM_STMT_CLOSE ||
|
|
||||||
command == MXS_COM_STMT_RESET)
|
|
||||||
{
|
|
||||||
target = TARGET_ALL;
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* These queries should be routed to all servers
|
|
||||||
*/
|
|
||||||
else if (!load_active &&
|
|
||||||
(qc_query_is_type(qtype, QUERY_TYPE_SESSION_WRITE) ||
|
|
||||||
/** Configured to allow writing user variables to all nodes */
|
|
||||||
(use_sql_variables_in == TYPE_ALL &&
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_USERVAR_WRITE)) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_WRITE) ||
|
|
||||||
/** enable or disable autocommit are always routed to all */
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT)))
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* This is problematic query because it would be routed to all
|
|
||||||
* backends but since this is SELECT that is not possible:
|
|
||||||
* 1. response set is not handled correctly in clientReply and
|
|
||||||
* 2. multiple results can degrade performance.
|
|
||||||
*
|
|
||||||
* Prepared statements are an exception to this since they do not
|
|
||||||
* actually do anything but only prepare the statement to be used.
|
|
||||||
* They can be safely routed to all backends since the execution
|
|
||||||
* is done later.
|
|
||||||
*
|
|
||||||
* With prepared statement caching the task of routing
|
|
||||||
* the execution of the prepared statements to the right server would be
|
|
||||||
* an easy one. Currently this is not supported.
|
|
||||||
*/
|
|
||||||
if (qc_query_is_type(qtype, QUERY_TYPE_READ))
|
|
||||||
{
|
|
||||||
MXS_WARNING("The query can't be routed to all "
|
|
||||||
"backend servers because it includes SELECT and "
|
|
||||||
"SQL variable modifications which is not supported. "
|
|
||||||
"Set use_sql_variables_in=master or split the "
|
|
||||||
"query to two, where SQL variable modifications "
|
|
||||||
"are done in the first and the SELECT in the "
|
|
||||||
"second one.");
|
|
||||||
|
|
||||||
target = TARGET_MASTER;
|
|
||||||
}
|
|
||||||
target |= TARGET_ALL;
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Hints may affect on routing of the following queries
|
|
||||||
*/
|
|
||||||
else if (!trx_active && !load_active &&
|
|
||||||
!qc_query_is_type(qtype, QUERY_TYPE_MASTER_READ) &&
|
|
||||||
!qc_query_is_type(qtype, QUERY_TYPE_WRITE) &&
|
|
||||||
(qc_query_is_type(qtype, QUERY_TYPE_READ) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_SHOW_TABLES) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_USERVAR_READ) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_SYSVAR_READ) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_READ)))
|
|
||||||
{
|
|
||||||
if (qc_query_is_type(qtype, QUERY_TYPE_USERVAR_READ))
|
|
||||||
{
|
|
||||||
if (use_sql_variables_in == TYPE_ALL)
|
|
||||||
{
|
|
||||||
target = TARGET_SLAVE;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (qc_query_is_type(qtype, QUERY_TYPE_READ) || // Normal read
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_SHOW_TABLES) || // SHOW TABLES
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_SYSVAR_READ) || // System variable
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_READ)) // Global system variable
|
|
||||||
{
|
|
||||||
target = TARGET_SLAVE;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** If nothing matches then choose the master */
|
|
||||||
if ((target & (TARGET_ALL | TARGET_SLAVE | TARGET_MASTER)) == 0)
|
|
||||||
{
|
|
||||||
target = TARGET_MASTER;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (session_trx_is_read_only(rses->client_dcb->session))
|
|
||||||
{
|
|
||||||
/* Force TARGET_SLAVE for READ ONLY transaction (active or ending) */
|
|
||||||
target = TARGET_SLAVE;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ss_dassert(trx_active || load_active ||
|
|
||||||
(qc_query_is_type(qtype, QUERY_TYPE_WRITE) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_MASTER_READ) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_SESSION_WRITE) ||
|
|
||||||
(qc_query_is_type(qtype, QUERY_TYPE_USERVAR_READ) &&
|
|
||||||
use_sql_variables_in == TYPE_MASTER) ||
|
|
||||||
(qc_query_is_type(qtype, QUERY_TYPE_SYSVAR_READ) &&
|
|
||||||
use_sql_variables_in == TYPE_MASTER) ||
|
|
||||||
(qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_READ) &&
|
|
||||||
use_sql_variables_in == TYPE_MASTER) ||
|
|
||||||
(qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_WRITE) &&
|
|
||||||
use_sql_variables_in == TYPE_MASTER) ||
|
|
||||||
(qc_query_is_type(qtype, QUERY_TYPE_USERVAR_WRITE) &&
|
|
||||||
use_sql_variables_in == TYPE_MASTER) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_BEGIN_TRX) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_ROLLBACK) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_COMMIT) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_EXEC_STMT) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_CREATE_TMP_TABLE) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_READ_TMP_TABLE) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_UNKNOWN)) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_EXEC_STMT));
|
|
||||||
|
|
||||||
target = TARGET_MASTER;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Process routing hints */
|
|
||||||
for (HINT* hint = query_hints; hint; hint = hint->next)
|
|
||||||
{
|
|
||||||
if (hint->type == HINT_ROUTE_TO_MASTER)
|
|
||||||
{
|
|
||||||
target = TARGET_MASTER; /*< override */
|
|
||||||
MXS_DEBUG("Hint: route to master");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
else if (hint->type == HINT_ROUTE_TO_NAMED_SERVER)
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* Searching for a named server. If it can't be
|
|
||||||
* found, the oroginal target is chosen.
|
|
||||||
*/
|
|
||||||
target |= TARGET_NAMED_SERVER;
|
|
||||||
MXS_DEBUG("Hint: route to named server: %s", (char*)hint->data);
|
|
||||||
}
|
|
||||||
else if (hint->type == HINT_ROUTE_TO_UPTODATE_SERVER)
|
|
||||||
{
|
|
||||||
/** not implemented */
|
|
||||||
ss_dassert(false);
|
|
||||||
}
|
|
||||||
else if (hint->type == HINT_ROUTE_TO_ALL)
|
|
||||||
{
|
|
||||||
/** not implemented */
|
|
||||||
ss_dassert(false);
|
|
||||||
}
|
|
||||||
else if (hint->type == HINT_PARAMETER)
|
|
||||||
{
|
|
||||||
if (strncasecmp((char*)hint->data, "max_slave_replication_lag",
|
|
||||||
strlen("max_slave_replication_lag")) == 0)
|
|
||||||
{
|
|
||||||
target |= TARGET_RLAG_MAX;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
MXS_ERROR("Unknown hint parameter '%s' when "
|
|
||||||
"'max_slave_replication_lag' was expected.",
|
|
||||||
(char*)hint->data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (hint->type == HINT_ROUTE_TO_SLAVE)
|
|
||||||
{
|
|
||||||
target = TARGET_SLAVE;
|
|
||||||
MXS_DEBUG("Hint: route to slave.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return (route_target_t)target;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Handle multi statement queries and load statements
|
|
||||||
*
|
|
||||||
* One of the possible types of handling required when a request is routed
|
|
||||||
*
|
|
||||||
* @param ses Router session
|
|
||||||
* @param querybuf Buffer containing query to be routed
|
|
||||||
* @param packet_type Type of packet (database specific)
|
|
||||||
* @param qtype Query type
|
|
||||||
*/
|
|
||||||
void
|
|
||||||
handle_multi_temp_and_load(RWSplitSession *rses, GWBUF *querybuf,
|
|
||||||
uint8_t packet_type, uint32_t *qtype)
|
|
||||||
{
|
|
||||||
/** Check for multi-statement queries. If no master server is available
|
|
||||||
* and a multi-statement is issued, an error is returned to the client
|
|
||||||
* when the query is routed.
|
|
||||||
*
|
|
||||||
* If we do not have a master node, assigning the forced node is not
|
|
||||||
* effective since we don't have a node to force queries to. In this
|
|
||||||
* situation, assigning QUERY_TYPE_WRITE for the query will trigger
|
|
||||||
* the error processing. */
|
|
||||||
if ((rses->target_node == NULL || rses->target_node != rses->current_master) &&
|
|
||||||
(check_for_multi_stmt(querybuf, rses->client_dcb->protocol, packet_type) ||
|
|
||||||
check_for_sp_call(querybuf, packet_type)))
|
|
||||||
{
|
|
||||||
if (rses->current_master && rses->current_master->in_use())
|
|
||||||
{
|
|
||||||
rses->target_node = rses->current_master;
|
|
||||||
MXS_INFO("Multi-statement query or stored procedure call, routing "
|
|
||||||
"all future queries to master.");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
*qtype |= QUERY_TYPE_WRITE;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Make checks prior to calling temp tables functions
|
|
||||||
*/
|
|
||||||
|
|
||||||
if (rses == NULL || querybuf == NULL ||
|
|
||||||
rses->client_dcb == NULL || rses->client_dcb->data == NULL)
|
|
||||||
{
|
|
||||||
if (rses == NULL || querybuf == NULL)
|
|
||||||
{
|
|
||||||
MXS_ERROR("[%s] Error: NULL variables for temp table checks: %p %p", __FUNCTION__,
|
|
||||||
rses, querybuf);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (rses->client_dcb == NULL)
|
|
||||||
{
|
|
||||||
MXS_ERROR("[%s] Error: Client DCB is NULL.", __FUNCTION__);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (rses->client_dcb->data == NULL)
|
|
||||||
{
|
|
||||||
MXS_ERROR("[%s] Error: User data in master server DBC is NULL.",
|
|
||||||
__FUNCTION__);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* Check if the query has anything to do with temporary tables.
|
|
||||||
*/
|
|
||||||
if (rses->have_tmp_tables && is_packet_a_query(packet_type))
|
|
||||||
{
|
|
||||||
check_drop_tmp_table(rses, querybuf);
|
|
||||||
if (is_read_tmp_table(rses, querybuf, *qtype))
|
|
||||||
{
|
|
||||||
*qtype |= QUERY_TYPE_MASTER_READ;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
check_create_tmp_table(rses, querybuf, *qtype);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if this is a LOAD DATA LOCAL INFILE query. If so, send all queries
|
|
||||||
* to the master until the last, empty packet arrives.
|
|
||||||
*/
|
|
||||||
if (rses->load_data_state == LOAD_DATA_ACTIVE)
|
|
||||||
{
|
|
||||||
rses->rses_load_data_sent += gwbuf_length(querybuf);
|
|
||||||
}
|
|
||||||
else if (is_packet_a_query(packet_type))
|
|
||||||
{
|
|
||||||
qc_query_op_t queryop = qc_get_operation(querybuf);
|
|
||||||
if (queryop == QUERY_OP_LOAD)
|
|
||||||
{
|
|
||||||
rses->load_data_state = LOAD_DATA_START;
|
|
||||||
rses->rses_load_data_sent = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Handle hinted target query
|
* @brief Handle hinted target query
|
||||||
*
|
*
|
||||||
|
|||||||
@ -35,210 +35,3 @@
|
|||||||
* somewhere else, outside this router. Perhaps in the query classifier?
|
* somewhere else, outside this router. Perhaps in the query classifier?
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Map a function over the list of tables in the query
|
|
||||||
*
|
|
||||||
* @param rses Router client session
|
|
||||||
* @param querybuf The query to inspect
|
|
||||||
* @param func Callback that is called for each table
|
|
||||||
*
|
|
||||||
* @return True if all tables were iterated, false if the iteration was stopped early
|
|
||||||
*/
|
|
||||||
static bool foreach_table(RWSplitSession* rses, GWBUF* querybuf, bool (*func)(RWSplitSession*,
|
|
||||||
const std::string&))
|
|
||||||
{
|
|
||||||
bool rval = true;
|
|
||||||
int n_tables;
|
|
||||||
char** tables = qc_get_table_names(querybuf, &n_tables, true);
|
|
||||||
|
|
||||||
for (int i = 0; i < n_tables; i++)
|
|
||||||
{
|
|
||||||
const char* db = mxs_mysql_get_current_db(rses->client_dcb->session);
|
|
||||||
std::string table;
|
|
||||||
|
|
||||||
if (strchr(tables[i], '.') == NULL)
|
|
||||||
{
|
|
||||||
table += db;
|
|
||||||
table += ".";
|
|
||||||
}
|
|
||||||
|
|
||||||
table += tables[i];
|
|
||||||
|
|
||||||
if (!func(rses, table))
|
|
||||||
{
|
|
||||||
rval = false;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return rval;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Delete callback for foreach_table
|
|
||||||
*/
|
|
||||||
bool delete_table(RWSplitSession *rses, const std::string& table)
|
|
||||||
{
|
|
||||||
rses->temp_tables.erase(table);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Find callback for foreach_table
|
|
||||||
*/
|
|
||||||
bool find_table(RWSplitSession* rses, const std::string& table)
|
|
||||||
{
|
|
||||||
if (rses->temp_tables.find(table) != rses->temp_tables.end())
|
|
||||||
{
|
|
||||||
MXS_INFO("Query targets a temporary table: %s", table.c_str());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Check for dropping of temporary tables
|
|
||||||
*
|
|
||||||
* Check if the query is a DROP TABLE... query and
|
|
||||||
* if it targets a temporary table, remove it from the hashtable.
|
|
||||||
* @param router_cli_ses Router client session
|
|
||||||
* @param querybuf GWBUF containing the query
|
|
||||||
* @param type The type of the query resolved so far
|
|
||||||
*/
|
|
||||||
void check_drop_tmp_table(RWSplitSession *rses, GWBUF *querybuf)
|
|
||||||
{
|
|
||||||
if (qc_is_drop_table_query(querybuf))
|
|
||||||
{
|
|
||||||
foreach_table(rses, querybuf, delete_table);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if the query targets a temporary table.
|
|
||||||
* @param router_cli_ses Router client session
|
|
||||||
* @param querybuf GWBUF containing the query
|
|
||||||
* @param type The type of the query resolved so far
|
|
||||||
* @return The type of the query
|
|
||||||
*/
|
|
||||||
bool is_read_tmp_table(RWSplitSession *rses,
|
|
||||||
GWBUF *querybuf,
|
|
||||||
uint32_t qtype)
|
|
||||||
{
|
|
||||||
ss_dassert(rses && querybuf && rses->client_dcb);
|
|
||||||
bool rval = false;
|
|
||||||
|
|
||||||
if (qc_query_is_type(qtype, QUERY_TYPE_READ) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_LOCAL_READ) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_USERVAR_READ) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_SYSVAR_READ) ||
|
|
||||||
qc_query_is_type(qtype, QUERY_TYPE_GSYSVAR_READ))
|
|
||||||
{
|
|
||||||
if (!foreach_table(rses, querybuf, find_table))
|
|
||||||
{
|
|
||||||
rval = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return rval;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If query is of type QUERY_TYPE_CREATE_TMP_TABLE then find out
|
|
||||||
* the database and table name, create a hashvalue and
|
|
||||||
* add it to the router client session's property. If property
|
|
||||||
* doesn't exist then create it first.
|
|
||||||
* @param router_cli_ses Router client session
|
|
||||||
* @param querybuf GWBUF containing the query
|
|
||||||
* @param type The type of the query resolved so far
|
|
||||||
*/
|
|
||||||
void check_create_tmp_table(RWSplitSession *router_cli_ses,
|
|
||||||
GWBUF *querybuf, uint32_t type)
|
|
||||||
{
|
|
||||||
if (qc_query_is_type(type, QUERY_TYPE_CREATE_TMP_TABLE))
|
|
||||||
{
|
|
||||||
ss_dassert(router_cli_ses && querybuf && router_cli_ses->client_dcb &&
|
|
||||||
router_cli_ses->client_dcb->data);
|
|
||||||
|
|
||||||
router_cli_ses->have_tmp_tables = true;
|
|
||||||
char* tblname = qc_get_created_table_name(querybuf);
|
|
||||||
std::string table;
|
|
||||||
|
|
||||||
if (tblname && *tblname && strchr(tblname, '.') == NULL)
|
|
||||||
{
|
|
||||||
const char* db = mxs_mysql_get_current_db(router_cli_ses->client_dcb->session);
|
|
||||||
table += db;
|
|
||||||
table += ".";
|
|
||||||
table += tblname;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Add the table to the set of temporary tables */
|
|
||||||
router_cli_ses->temp_tables.insert(table);
|
|
||||||
|
|
||||||
MXS_FREE(tblname);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
inline bool have_semicolon(const char* ptr, int len)
|
|
||||||
{
|
|
||||||
for (int i = 0; i < len; i++)
|
|
||||||
{
|
|
||||||
if (ptr[i] == ';')
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Detect multi-statement queries
|
|
||||||
*
|
|
||||||
* It is possible that the session state is modified inside a multi-statement
|
|
||||||
* query which would leave any slave sessions in an inconsistent state. Due to
|
|
||||||
* this, for the duration of this session, all queries will be sent to the
|
|
||||||
* master
|
|
||||||
* if the current query contains a multi-statement query.
|
|
||||||
* @param rses Router client session
|
|
||||||
* @param buf Buffer containing the full query
|
|
||||||
* @return True if the query contains multiple statements
|
|
||||||
*/
|
|
||||||
bool check_for_multi_stmt(GWBUF *buf, void *protocol, uint8_t packet_type)
|
|
||||||
{
|
|
||||||
MySQLProtocol *proto = (MySQLProtocol *)protocol;
|
|
||||||
bool rval = false;
|
|
||||||
|
|
||||||
if (proto->client_capabilities & GW_MYSQL_CAPABILITIES_MULTI_STATEMENTS &&
|
|
||||||
packet_type == MXS_COM_QUERY)
|
|
||||||
{
|
|
||||||
char *ptr, *data = (char*)GWBUF_DATA(buf) + 5;
|
|
||||||
/** Payload size without command byte */
|
|
||||||
int buflen = gw_mysql_get_byte3((uint8_t *)GWBUF_DATA(buf)) - 1;
|
|
||||||
|
|
||||||
if (have_semicolon(data, buflen) && (ptr = strnchr_esc_mysql(data, ';', buflen)))
|
|
||||||
{
|
|
||||||
/** Skip stored procedures etc. */
|
|
||||||
while (ptr && is_mysql_sp_end(ptr, buflen - (ptr - data)))
|
|
||||||
{
|
|
||||||
ptr = strnchr_esc_mysql(ptr + 1, ';', buflen - (ptr - data) - 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ptr)
|
|
||||||
{
|
|
||||||
if (ptr < data + buflen &&
|
|
||||||
!is_mysql_statement_end(ptr, buflen - (ptr - data)))
|
|
||||||
{
|
|
||||||
rval = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return rval;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool check_for_sp_call(GWBUF *buf, uint8_t packet_type)
|
|
||||||
{
|
|
||||||
return packet_type == MXS_COM_QUERY && qc_get_operation(buf) == QUERY_OP_CALL;
|
|
||||||
}
|
|
||||||
|
|||||||
@ -13,6 +13,7 @@
|
|||||||
|
|
||||||
#include "rwsplitsession.hh"
|
#include "rwsplitsession.hh"
|
||||||
#include "rwsplit_internal.hh"
|
#include "rwsplit_internal.hh"
|
||||||
|
#include "routeinfo.hh"
|
||||||
|
|
||||||
RWBackend::RWBackend(SERVER_REF* ref):
|
RWBackend::RWBackend(SERVER_REF* ref):
|
||||||
mxs::Backend(ref),
|
mxs::Backend(ref),
|
||||||
@ -98,12 +99,3 @@ uint32_t get_internal_ps_id(RWSplitSession* rses, GWBUF* buffer)
|
|||||||
|
|
||||||
return rval;
|
return rval;
|
||||||
}
|
}
|
||||||
|
|
||||||
RouteInfo::RouteInfo(RWSplitSession* rses, GWBUF* buffer):
|
|
||||||
target(TARGET_UNDEFINED),
|
|
||||||
command(0xff),
|
|
||||||
type(QUERY_TYPE_UNKNOWN),
|
|
||||||
stmt_id(0)
|
|
||||||
{
|
|
||||||
target = get_target_type(rses, buffer, &command, &type, &stmt_id);
|
|
||||||
}
|
|
||||||
|
|||||||
@ -160,17 +160,6 @@ private:
|
|||||||
const SRWBackendList& backends, const SRWBackend& master);
|
const SRWBackendList& backends, const SRWBackend& master);
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Struct for holding routing related information */
|
|
||||||
struct RouteInfo
|
|
||||||
{
|
|
||||||
RouteInfo(RWSplitSession* rses, GWBUF* buffer);
|
|
||||||
|
|
||||||
route_target_t target; /**< Route target type, TARGET_UNDEFINED for unknown */
|
|
||||||
uint8_t command; /**< The command byte, 0xff for unknown commands */
|
|
||||||
uint32_t type; /**< The query type, QUERY_TYPE_UNKNOWN for unknown types*/
|
|
||||||
uint32_t stmt_id; /**< Prepared statement ID, 0 for unknown */
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper function to convert reply_state_t to string
|
* Helper function to convert reply_state_t to string
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user