Fix candidate for #645, http://bugs.skysql.com/show_bug.cgi?id=645 and #648, http://bugs.skysql.com/show_bug.cgi?id=648
If readwritesplit.c:routeQuery gets a GWBUF whose type is UNDEFINED, then each MySQL packet is extracted from input buffer and passed to new function, route_single_stmt. Each extracted packet is stored in separate GWBUF and added types GWBUF_TYPE_MYSQL and GWBUG_TYPE_SINGLE_STMT which makes it possible to execute session commands and process reply packets properly. Code nedes still cleaning but this is for testing atm.
This commit is contained in:
@ -390,3 +390,72 @@ int modutil_send_mysql_err_packet (
|
|||||||
return dcb->func.write(dcb, buf);
|
return dcb->func.write(dcb, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Buffer contains at least one of the following:
|
||||||
|
* complete [complete] [partial] mysql packet
|
||||||
|
*
|
||||||
|
* return pointer to gwbuf containing a complete packet or
|
||||||
|
* NULL if no complete packet was found.
|
||||||
|
*/
|
||||||
|
GWBUF* modutil_get_next_MySQL_packet(
|
||||||
|
GWBUF** p_readbuf)
|
||||||
|
{
|
||||||
|
GWBUF* packetbuf;
|
||||||
|
GWBUF* readbuf;
|
||||||
|
size_t buflen;
|
||||||
|
size_t packetlen;
|
||||||
|
size_t totalbuflen;
|
||||||
|
uint8_t* data;
|
||||||
|
size_t nbytes_copied = 0;
|
||||||
|
uint8_t* target;
|
||||||
|
|
||||||
|
readbuf = *p_readbuf;
|
||||||
|
|
||||||
|
if (readbuf == NULL)
|
||||||
|
{
|
||||||
|
packetbuf = NULL;
|
||||||
|
goto return_packetbuf;
|
||||||
|
}
|
||||||
|
CHK_GWBUF(readbuf);
|
||||||
|
|
||||||
|
if (GWBUF_EMPTY(readbuf))
|
||||||
|
{
|
||||||
|
packetbuf = NULL;
|
||||||
|
goto return_packetbuf;
|
||||||
|
}
|
||||||
|
totalbuflen = gwbuf_length(readbuf);
|
||||||
|
data = (uint8_t *)GWBUF_DATA((readbuf));
|
||||||
|
packetlen = MYSQL_GET_PACKET_LEN(data)+4;
|
||||||
|
|
||||||
|
/** packet is incomplete */
|
||||||
|
if (packetlen > totalbuflen)
|
||||||
|
{
|
||||||
|
packetbuf = NULL;
|
||||||
|
goto return_packetbuf;
|
||||||
|
}
|
||||||
|
|
||||||
|
packetbuf = gwbuf_alloc(packetlen);
|
||||||
|
target = GWBUF_DATA(packetbuf);
|
||||||
|
packetbuf->gwbuf_type = readbuf->gwbuf_type; /*< Copy the type too */
|
||||||
|
/**
|
||||||
|
* Copy first MySQL packet to packetbuf and leave posible other
|
||||||
|
* packets to read buffer.
|
||||||
|
*/
|
||||||
|
while (nbytes_copied < packetlen && totalbuflen > 0)
|
||||||
|
{
|
||||||
|
uint8_t* src = GWBUF_DATA((*p_readbuf));
|
||||||
|
size_t bytestocopy;
|
||||||
|
|
||||||
|
buflen = GWBUF_LENGTH((*p_readbuf));
|
||||||
|
bytestocopy = MIN(buflen,packetlen-nbytes_copied);
|
||||||
|
|
||||||
|
memcpy(target+nbytes_copied, src, bytestocopy);
|
||||||
|
*p_readbuf = gwbuf_consume((*p_readbuf), bytestocopy);
|
||||||
|
totalbuflen = gwbuf_length((*p_readbuf));
|
||||||
|
nbytes_copied += bytestocopy;
|
||||||
|
}
|
||||||
|
ss_dassert(buflen == 0 || nbytes_copied == packetlen);
|
||||||
|
|
||||||
|
return_packetbuf:
|
||||||
|
return packetbuf;
|
||||||
|
}
|
||||||
|
@ -41,6 +41,7 @@ extern char *modutil_get_SQL(GWBUF *);
|
|||||||
extern GWBUF *modutil_replace_SQL(GWBUF *, char *);
|
extern GWBUF *modutil_replace_SQL(GWBUF *, char *);
|
||||||
extern char *modutil_get_query(GWBUF* buf);
|
extern char *modutil_get_query(GWBUF* buf);
|
||||||
extern int modutil_send_mysql_err_packet(DCB *, int, int, int, const char *, const char *);
|
extern int modutil_send_mysql_err_packet(DCB *, int, int, int, const char *, const char *);
|
||||||
|
GWBUF* modutil_get_next_MySQL_packet(GWBUF** p_readbuf);
|
||||||
|
|
||||||
GWBUF *modutil_create_mysql_err_msg(
|
GWBUF *modutil_create_mysql_err_msg(
|
||||||
int packet_number,
|
int packet_number,
|
||||||
|
@ -111,6 +111,21 @@ static backend_ref_t* check_candidate_bref(
|
|||||||
backend_ref_t* new_bref,
|
backend_ref_t* new_bref,
|
||||||
select_criteria_t sc);
|
select_criteria_t sc);
|
||||||
|
|
||||||
|
static skygw_query_type_t is_read_tmp_table(
|
||||||
|
ROUTER_CLIENT_SES* router_cli_ses,
|
||||||
|
GWBUF* querybuf,
|
||||||
|
skygw_query_type_t type);
|
||||||
|
|
||||||
|
static void check_create_tmp_table(
|
||||||
|
ROUTER_CLIENT_SES* router_cli_ses,
|
||||||
|
GWBUF* querybuf,
|
||||||
|
skygw_query_type_t type);
|
||||||
|
|
||||||
|
static bool route_single_stmt(
|
||||||
|
ROUTER_INSTANCE* inst,
|
||||||
|
ROUTER_CLIENT_SES* rses,
|
||||||
|
GWBUF* querybuf);
|
||||||
|
|
||||||
|
|
||||||
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
|
static uint8_t getCapabilities (ROUTER* inst, void* router_session);
|
||||||
|
|
||||||
@ -1505,14 +1520,12 @@ static route_target_t get_route_target (
|
|||||||
/**
|
/**
|
||||||
* Check if the query is a DROP TABLE... query and
|
* Check if the query is a DROP TABLE... query and
|
||||||
* if it targets a temporary table, remove it from the hashtable.
|
* if it targets a temporary table, remove it from the hashtable.
|
||||||
* @param instance Router instance
|
* @param router_cli_ses Router client session
|
||||||
* @param router_session Router client session
|
|
||||||
* @param querybuf GWBUF containing the query
|
* @param querybuf GWBUF containing the query
|
||||||
* @param type The type of the query resolved so far
|
* @param type The type of the query resolved so far
|
||||||
*/
|
*/
|
||||||
void check_drop_tmp_table(
|
void check_drop_tmp_table(
|
||||||
ROUTER* instance,
|
ROUTER_CLIENT_SES* router_cli_ses,
|
||||||
void* router_session,
|
|
||||||
GWBUF* querybuf,
|
GWBUF* querybuf,
|
||||||
skygw_query_type_t type)
|
skygw_query_type_t type)
|
||||||
{
|
{
|
||||||
@ -1522,7 +1535,6 @@ void check_drop_tmp_table(
|
|||||||
char *hkey,*dbname;
|
char *hkey,*dbname;
|
||||||
MYSQL_session* data;
|
MYSQL_session* data;
|
||||||
|
|
||||||
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
|
||||||
DCB* master_dcb = NULL;
|
DCB* master_dcb = NULL;
|
||||||
rses_property_t* rses_prop_tmp;
|
rses_property_t* rses_prop_tmp;
|
||||||
|
|
||||||
@ -1567,15 +1579,13 @@ void check_drop_tmp_table(
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the query targets a temporary table.
|
* Check if the query targets a temporary table.
|
||||||
* @param instance Router instance
|
* @param router_cli_ses Router client session
|
||||||
* @param router_session Router client session
|
|
||||||
* @param querybuf GWBUF containing the query
|
* @param querybuf GWBUF containing the query
|
||||||
* @param type The type of the query resolved so far
|
* @param type The type of the query resolved so far
|
||||||
* @return The type of the query
|
* @return The type of the query
|
||||||
*/
|
*/
|
||||||
skygw_query_type_t is_read_tmp_table(
|
static skygw_query_type_t is_read_tmp_table(
|
||||||
ROUTER* instance,
|
ROUTER_CLIENT_SES* router_cli_ses,
|
||||||
void* router_session,
|
|
||||||
GWBUF* querybuf,
|
GWBUF* querybuf,
|
||||||
skygw_query_type_t type)
|
skygw_query_type_t type)
|
||||||
{
|
{
|
||||||
@ -1586,7 +1596,6 @@ skygw_query_type_t is_read_tmp_table(
|
|||||||
char *hkey,*dbname;
|
char *hkey,*dbname;
|
||||||
MYSQL_session* data;
|
MYSQL_session* data;
|
||||||
|
|
||||||
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
|
||||||
DCB* master_dcb = NULL;
|
DCB* master_dcb = NULL;
|
||||||
skygw_query_type_t qtype = type;
|
skygw_query_type_t qtype = type;
|
||||||
rses_property_t* rses_prop_tmp;
|
rses_property_t* rses_prop_tmp;
|
||||||
@ -1656,14 +1665,12 @@ skygw_query_type_t is_read_tmp_table(
|
|||||||
* the database and table name, create a hashvalue and
|
* the database and table name, create a hashvalue and
|
||||||
* add it to the router client session's property. If property
|
* add it to the router client session's property. If property
|
||||||
* doesn't exist then create it first.
|
* doesn't exist then create it first.
|
||||||
* @param instance Router instance
|
* @param router_cli_ses Router client session
|
||||||
* @param router_session Router client session
|
|
||||||
* @param querybuf GWBUF containing the query
|
* @param querybuf GWBUF containing the query
|
||||||
* @param type The type of the query resolved so far
|
* @param type The type of the query resolved so far
|
||||||
*/
|
*/
|
||||||
void check_create_tmp_table(
|
static void check_create_tmp_table(
|
||||||
ROUTER* instance,
|
ROUTER_CLIENT_SES* router_cli_ses,
|
||||||
void* router_session,
|
|
||||||
GWBUF* querybuf,
|
GWBUF* querybuf,
|
||||||
skygw_query_type_t type)
|
skygw_query_type_t type)
|
||||||
{
|
{
|
||||||
@ -1673,7 +1680,6 @@ void check_create_tmp_table(
|
|||||||
char *hkey,*dbname;
|
char *hkey,*dbname;
|
||||||
MYSQL_session* data;
|
MYSQL_session* data;
|
||||||
|
|
||||||
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
|
||||||
DCB* master_dcb = NULL;
|
DCB* master_dcb = NULL;
|
||||||
rses_property_t* rses_prop_tmp;
|
rses_property_t* rses_prop_tmp;
|
||||||
HASHTABLE* h;
|
HASHTABLE* h;
|
||||||
@ -1822,6 +1828,35 @@ static int routeQuery(
|
|||||||
{
|
{
|
||||||
rses_is_closed = true;
|
rses_is_closed = true;
|
||||||
}
|
}
|
||||||
|
#if 1
|
||||||
|
if (GWBUF_IS_TYPE_UNDEFINED(querybuf))
|
||||||
|
{
|
||||||
|
GWBUF* tmpbuf = querybuf;
|
||||||
|
do
|
||||||
|
{
|
||||||
|
if ((querybuf = modutil_get_next_MySQL_packet(&tmpbuf)) == NULL)
|
||||||
|
{
|
||||||
|
ret = 1;
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
/** Mark buffer to as MySQL type */
|
||||||
|
gwbuf_set_type(querybuf, GWBUF_TYPE_MYSQL);
|
||||||
|
gwbuf_set_type(querybuf, GWBUF_TYPE_SINGLE_STMT);
|
||||||
|
succp = route_single_stmt(inst, router_cli_ses, querybuf);
|
||||||
|
}
|
||||||
|
while (tmpbuf != NULL && succp);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
succp = route_single_stmt(inst, router_cli_ses, querybuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (succp)
|
||||||
|
{
|
||||||
|
ret = 1;
|
||||||
|
}
|
||||||
|
goto retblock;
|
||||||
|
#else
|
||||||
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
||||||
|
|
||||||
packet = GWBUF_DATA(querybuf);
|
packet = GWBUF_DATA(querybuf);
|
||||||
@ -1925,9 +1960,9 @@ static int routeQuery(
|
|||||||
/**
|
/**
|
||||||
* Check if the query has anything to do with temporary tables.
|
* Check if the query has anything to do with temporary tables.
|
||||||
*/
|
*/
|
||||||
qtype = is_read_tmp_table(instance,router_session,querybuf,qtype);
|
qtype = is_read_tmp_table(inst,router_cli_ses,querybuf,qtype);
|
||||||
check_create_tmp_table(instance,router_session,querybuf,qtype);
|
check_create_tmp_table(inst,router_cli_ses,querybuf,qtype);
|
||||||
check_drop_tmp_table(instance,router_session,querybuf,qtype);
|
check_drop_tmp_table(inst,router_cli_ses,querybuf,qtype);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If autocommit is disabled or transaction is explicitly started
|
* If autocommit is disabled or transaction is explicitly started
|
||||||
@ -2263,6 +2298,7 @@ static int routeQuery(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
rses_end_locked_router_action(router_cli_ses);
|
rses_end_locked_router_action(router_cli_ses);
|
||||||
|
#endif
|
||||||
retblock:
|
retblock:
|
||||||
#if defined(SS_DEBUG2)
|
#if defined(SS_DEBUG2)
|
||||||
{
|
{
|
||||||
@ -2284,6 +2320,477 @@ retblock:
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool route_single_stmt(
|
||||||
|
ROUTER_INSTANCE* inst,
|
||||||
|
ROUTER_CLIENT_SES* rses,
|
||||||
|
GWBUF* querybuf)
|
||||||
|
{
|
||||||
|
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
|
||||||
|
mysql_server_cmd_t packet_type;
|
||||||
|
uint8_t* packet;
|
||||||
|
int ret = 0;
|
||||||
|
DCB* master_dcb = NULL;
|
||||||
|
DCB* target_dcb = NULL;
|
||||||
|
bool rses_is_closed = false;
|
||||||
|
route_target_t route_target;
|
||||||
|
bool succp;
|
||||||
|
int rlag_max = MAX_RLAG_UNDEFINED;
|
||||||
|
backend_type_t btype; /*< target backend type */
|
||||||
|
|
||||||
|
|
||||||
|
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
||||||
|
packet = GWBUF_DATA(querybuf);
|
||||||
|
packet_type = packet[4];
|
||||||
|
|
||||||
|
if (rses_is_closed)
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* MYSQL_COM_QUIT may have sent by client and as a part of backend
|
||||||
|
* closing procedure.
|
||||||
|
*/
|
||||||
|
if (packet_type != MYSQL_COM_QUIT)
|
||||||
|
{
|
||||||
|
char* query_str = modutil_get_query(querybuf);
|
||||||
|
|
||||||
|
LOGIF(LE,
|
||||||
|
(skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error: Can't route %s:%s:\"%s\" to "
|
||||||
|
"backend server. Router is closed.",
|
||||||
|
STRPACKETTYPE(packet_type),
|
||||||
|
STRQTYPE(qtype),
|
||||||
|
(query_str == NULL ? "(empty)" : query_str))));
|
||||||
|
free(query_str);
|
||||||
|
}
|
||||||
|
succp = false;
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read stored master DCB pointer. If master is not set, routing must
|
||||||
|
* be aborted
|
||||||
|
*/
|
||||||
|
if ((master_dcb = rses->rses_master_ref->bref_dcb) == NULL)
|
||||||
|
{
|
||||||
|
char* query_str = modutil_get_query(querybuf);
|
||||||
|
CHK_DCB(master_dcb);
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error: Can't route %s:%s:\"%s\" to "
|
||||||
|
"backend server. Session doesn't have a Master "
|
||||||
|
"node",
|
||||||
|
STRPACKETTYPE(packet_type),
|
||||||
|
STRQTYPE(qtype),
|
||||||
|
(query_str == NULL ? "(empty)" : query_str))));
|
||||||
|
free(query_str);
|
||||||
|
succp = false;
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** If buffer is not contiguous, make it such */
|
||||||
|
if (querybuf->next != NULL)
|
||||||
|
{
|
||||||
|
querybuf = gwbuf_make_contiguous(querybuf);
|
||||||
|
}
|
||||||
|
|
||||||
|
switch(packet_type) {
|
||||||
|
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
|
||||||
|
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
|
||||||
|
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
|
||||||
|
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
|
||||||
|
case MYSQL_COM_PING: /*< 0e all servers are pinged */
|
||||||
|
case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
|
||||||
|
case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */
|
||||||
|
case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */
|
||||||
|
case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */
|
||||||
|
qtype = QUERY_TYPE_SESSION_WRITE;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */
|
||||||
|
case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */
|
||||||
|
qtype = QUERY_TYPE_WRITE;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MYSQL_COM_QUERY:
|
||||||
|
qtype = query_classifier_get_type(querybuf);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MYSQL_COM_STMT_PREPARE:
|
||||||
|
qtype = query_classifier_get_type(querybuf);
|
||||||
|
qtype |= QUERY_TYPE_PREPARE_STMT;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MYSQL_COM_STMT_EXECUTE:
|
||||||
|
/** Parsing is not needed for this type of packet */
|
||||||
|
qtype = QUERY_TYPE_EXEC_STMT;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
|
||||||
|
case MYSQL_COM_STATISTICS: /**< 9 ? */
|
||||||
|
case MYSQL_COM_PROCESS_INFO: /**< 0a ? */
|
||||||
|
case MYSQL_COM_CONNECT: /**< 0b ? */
|
||||||
|
case MYSQL_COM_PROCESS_KILL: /**< 0c ? */
|
||||||
|
case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */
|
||||||
|
case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */
|
||||||
|
case MYSQL_COM_DAEMON: /**< 1d ? */
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
} /**< switch by packet type */
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the query has anything to do with temporary tables.
|
||||||
|
*/
|
||||||
|
qtype = is_read_tmp_table(rses, querybuf, qtype);
|
||||||
|
check_create_tmp_table(rses, querybuf, qtype);
|
||||||
|
check_drop_tmp_table(rses, querybuf,qtype);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If autocommit is disabled or transaction is explicitly started
|
||||||
|
* transaction becomes active and master gets all statements until
|
||||||
|
* transaction is committed and autocommit is enabled again.
|
||||||
|
*/
|
||||||
|
if (rses->rses_autocommit_enabled &&
|
||||||
|
QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT))
|
||||||
|
{
|
||||||
|
rses->rses_autocommit_enabled = false;
|
||||||
|
|
||||||
|
if (!rses->rses_transaction_active)
|
||||||
|
{
|
||||||
|
rses->rses_transaction_active = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (!rses->rses_transaction_active &&
|
||||||
|
QUERY_IS_TYPE(qtype, QUERY_TYPE_BEGIN_TRX))
|
||||||
|
{
|
||||||
|
rses->rses_transaction_active = true;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Explicit COMMIT and ROLLBACK, implicit COMMIT.
|
||||||
|
*/
|
||||||
|
if (rses->rses_autocommit_enabled &&
|
||||||
|
rses->rses_transaction_active &&
|
||||||
|
(QUERY_IS_TYPE(qtype,QUERY_TYPE_COMMIT) ||
|
||||||
|
QUERY_IS_TYPE(qtype,QUERY_TYPE_ROLLBACK)))
|
||||||
|
{
|
||||||
|
rses->rses_transaction_active = false;
|
||||||
|
}
|
||||||
|
else if (!rses->rses_autocommit_enabled &&
|
||||||
|
QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT))
|
||||||
|
{
|
||||||
|
rses->rses_autocommit_enabled = true;
|
||||||
|
rses->rses_transaction_active = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOG_IS_ENABLED(LOGFILE_TRACE))
|
||||||
|
{
|
||||||
|
uint8_t* packet = GWBUF_DATA(querybuf);
|
||||||
|
unsigned char ptype = packet[4];
|
||||||
|
size_t len = MIN(GWBUF_LENGTH(querybuf),
|
||||||
|
MYSQL_GET_PACKET_LEN((unsigned char *)querybuf->start)-1);
|
||||||
|
char* data = (char*)&packet[5];
|
||||||
|
char* contentstr = strndup(data, len);
|
||||||
|
char* qtypestr = skygw_get_qtype_str(qtype);
|
||||||
|
|
||||||
|
skygw_log_write(
|
||||||
|
LOGFILE_TRACE,
|
||||||
|
"> Autocommit: %s, trx is %s, cmd: %s, type: %s, "
|
||||||
|
"stmt: %s%s %s",
|
||||||
|
(rses->rses_autocommit_enabled ? "[enabled]" : "[disabled]"),
|
||||||
|
(rses->rses_transaction_active ? "[open]" : "[not open]"),
|
||||||
|
STRPACKETTYPE(ptype),
|
||||||
|
(qtypestr==NULL ? "N/A" : qtypestr),
|
||||||
|
contentstr,
|
||||||
|
(querybuf->hint == NULL ? "" : ", Hint:"),
|
||||||
|
(querybuf->hint == NULL ? "" : STRHINTTYPE(querybuf->hint->type)));
|
||||||
|
|
||||||
|
free(contentstr);
|
||||||
|
free(qtypestr);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Find out where to route the query. Result may not be clear; it is
|
||||||
|
* possible to have a hint for routing to a named server which can
|
||||||
|
* be either slave or master.
|
||||||
|
* If query would otherwise be routed to slave then the hint determines
|
||||||
|
* actual target server if it exists.
|
||||||
|
*
|
||||||
|
* route_target is a bitfield and may include :
|
||||||
|
* TARGET_ALL
|
||||||
|
* - route to all connected backend servers
|
||||||
|
* TARGET_SLAVE[|TARGET_NAMED_SERVER|TARGET_RLAG_MAX]
|
||||||
|
* - route primarily according to hints, then to slave and if those
|
||||||
|
* failed, eventually to master
|
||||||
|
* TARGET_MASTER[|TARGET_NAMED_SERVER|TARGET_RLAG_MAX]
|
||||||
|
* - route primarily according to the hints and if they failed,
|
||||||
|
* eventually to master
|
||||||
|
*/
|
||||||
|
route_target = get_route_target(qtype,
|
||||||
|
rses->rses_transaction_active,
|
||||||
|
rses->rses_config.rw_use_sql_variables_in,
|
||||||
|
querybuf->hint);
|
||||||
|
|
||||||
|
if (TARGET_IS_ALL(route_target))
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* It is not sure if the session command in question requires
|
||||||
|
* response. Statement is examined in route_session_write.
|
||||||
|
* Router locking is done inside the function.
|
||||||
|
*/
|
||||||
|
succp = route_session_write(
|
||||||
|
rses,
|
||||||
|
gwbuf_clone(querybuf),
|
||||||
|
inst,
|
||||||
|
packet_type,
|
||||||
|
qtype);
|
||||||
|
|
||||||
|
if (succp)
|
||||||
|
{
|
||||||
|
atomic_add(&inst->stats.n_all, 1);
|
||||||
|
}
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Lock router session */
|
||||||
|
if (!rses_begin_locked_router_action(rses))
|
||||||
|
{
|
||||||
|
LOGIF(LT, (skygw_log_write(
|
||||||
|
LOGFILE_TRACE,
|
||||||
|
"Route query aborted! Routing session is closed <")));
|
||||||
|
succp = false;
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* There is a hint which either names the target backend or
|
||||||
|
* hint which sets maximum allowed replication lag for the
|
||||||
|
* backend.
|
||||||
|
*/
|
||||||
|
if (TARGET_IS_NAMED_SERVER(route_target) ||
|
||||||
|
TARGET_IS_RLAG_MAX(route_target))
|
||||||
|
{
|
||||||
|
HINT* hint;
|
||||||
|
char* named_server = NULL;
|
||||||
|
|
||||||
|
hint = querybuf->hint;
|
||||||
|
|
||||||
|
while (hint != NULL)
|
||||||
|
{
|
||||||
|
if (hint->type == HINT_ROUTE_TO_NAMED_SERVER)
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Set the name of searched
|
||||||
|
* backend server.
|
||||||
|
*/
|
||||||
|
named_server = hint->data;
|
||||||
|
LOGIF(LT, (skygw_log_write(
|
||||||
|
LOGFILE_TRACE,
|
||||||
|
"Hint: route to server "
|
||||||
|
"'%s'",
|
||||||
|
named_server)));
|
||||||
|
}
|
||||||
|
else if (hint->type == HINT_PARAMETER &&
|
||||||
|
(strncasecmp((char *)hint->data,
|
||||||
|
"max_slave_replication_lag",
|
||||||
|
strlen("max_slave_replication_lag")) == 0))
|
||||||
|
{
|
||||||
|
int val = (int) strtol((char *)hint->value,
|
||||||
|
(char **)NULL, 10);
|
||||||
|
|
||||||
|
if (val != 0 || errno == 0)
|
||||||
|
{
|
||||||
|
/**
|
||||||
|
* Set max. acceptable
|
||||||
|
* replication lag
|
||||||
|
* value for backend srv
|
||||||
|
*/
|
||||||
|
rlag_max = val;
|
||||||
|
LOGIF(LT, (skygw_log_write(
|
||||||
|
LOGFILE_TRACE,
|
||||||
|
"Hint: "
|
||||||
|
"max_slave_replication_lag=%d",
|
||||||
|
rlag_max)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
hint = hint->next;
|
||||||
|
} /*< while */
|
||||||
|
|
||||||
|
if (rlag_max == MAX_RLAG_UNDEFINED) /*< no rlag max hint, use config */
|
||||||
|
{
|
||||||
|
rlag_max = rses_get_max_replication_lag(rses);
|
||||||
|
}
|
||||||
|
btype = BE_UNDEFINED; /*< target may be master or slave */
|
||||||
|
/**
|
||||||
|
* Search backend server by name or replication lag.
|
||||||
|
* If it fails, then try to find valid slave or master.
|
||||||
|
*/
|
||||||
|
succp = get_dcb(&target_dcb, rses, btype, named_server,rlag_max);
|
||||||
|
|
||||||
|
if (!succp)
|
||||||
|
{
|
||||||
|
if (TARGET_IS_NAMED_SERVER(route_target))
|
||||||
|
{
|
||||||
|
LOGIF(LT, (skygw_log_write(
|
||||||
|
LOGFILE_TRACE,
|
||||||
|
"Was supposed to route to named server "
|
||||||
|
"%s but couldn't find the server in a "
|
||||||
|
"suitable state.",
|
||||||
|
named_server)));
|
||||||
|
}
|
||||||
|
else if (TARGET_IS_RLAG_MAX(route_target))
|
||||||
|
{
|
||||||
|
LOGIF(LT, (skygw_log_write(
|
||||||
|
LOGFILE_TRACE,
|
||||||
|
"Was supposed to route to server with "
|
||||||
|
"replication lag at most %d but couldn't "
|
||||||
|
"find such a slave.",
|
||||||
|
rlag_max)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (TARGET_IS_SLAVE(route_target))
|
||||||
|
{
|
||||||
|
btype = BE_SLAVE;
|
||||||
|
|
||||||
|
if (rlag_max == MAX_RLAG_UNDEFINED) /*< no rlag max hint, use config */
|
||||||
|
{
|
||||||
|
rlag_max = rses_get_max_replication_lag(rses);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Search suitable backend server, get DCB in target_dcb
|
||||||
|
*/
|
||||||
|
succp = get_dcb(&target_dcb, rses, BE_SLAVE, NULL,rlag_max);
|
||||||
|
|
||||||
|
if (succp)
|
||||||
|
{
|
||||||
|
#if defined(SS_EXTRA_DEBUG)
|
||||||
|
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
|
||||||
|
"Found DCB for slave.")));
|
||||||
|
#endif
|
||||||
|
ss_dassert(get_bref_from_dcb(rses, target_dcb) !=
|
||||||
|
rses->rses_master_ref);
|
||||||
|
ss_dassert(get_root_master_bref(rses) ==
|
||||||
|
rses->rses_master_ref);
|
||||||
|
atomic_add(&inst->stats.n_slave, 1);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
|
||||||
|
"Was supposed to route to slave"
|
||||||
|
"but finding suitable one "
|
||||||
|
"failed.")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (TARGET_IS_MASTER(route_target))
|
||||||
|
{
|
||||||
|
DCB* curr_master_dcb = NULL;
|
||||||
|
|
||||||
|
succp = get_dcb(&curr_master_dcb,
|
||||||
|
rses,
|
||||||
|
BE_MASTER,
|
||||||
|
NULL,
|
||||||
|
MAX_RLAG_UNDEFINED);
|
||||||
|
|
||||||
|
if (succp && master_dcb == curr_master_dcb)
|
||||||
|
{
|
||||||
|
atomic_add(&inst->stats.n_master, 1);
|
||||||
|
target_dcb = master_dcb;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (succp && master_dcb != curr_master_dcb)
|
||||||
|
{
|
||||||
|
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
|
||||||
|
"Was supposed to "
|
||||||
|
"route to master "
|
||||||
|
"but master has "
|
||||||
|
"changed.")));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
|
||||||
|
"Was supposed to "
|
||||||
|
"route to master "
|
||||||
|
"but couldn't find "
|
||||||
|
"master in a "
|
||||||
|
"suitable state.")));
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* Master has changed. Return with error indicator.
|
||||||
|
*/
|
||||||
|
rses_end_locked_router_action(rses);
|
||||||
|
succp = false;
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (succp) /*< Have DCB of the target backend */
|
||||||
|
{
|
||||||
|
backend_ref_t* bref;
|
||||||
|
sescmd_cursor_t* scur;
|
||||||
|
|
||||||
|
bref = get_bref_from_dcb(rses, target_dcb);
|
||||||
|
scur = &bref->bref_sescmd_cur;
|
||||||
|
|
||||||
|
LOGIF(LT, (skygw_log_write(
|
||||||
|
LOGFILE_TRACE,
|
||||||
|
"Route query to %s \t%s:%d <",
|
||||||
|
(SERVER_IS_MASTER(bref->bref_backend->backend_server) ?
|
||||||
|
"master" : "slave"),
|
||||||
|
bref->bref_backend->backend_server->name,
|
||||||
|
bref->bref_backend->backend_server->port)));
|
||||||
|
/**
|
||||||
|
* Store current stmt if execution of previous session command
|
||||||
|
* haven't completed yet. Note that according to MySQL protocol
|
||||||
|
* there can only be one such non-sescmd stmt at the time.
|
||||||
|
*/
|
||||||
|
if (sescmd_cursor_is_active(scur))
|
||||||
|
{
|
||||||
|
ss_dassert(bref->bref_pending_cmd == NULL);
|
||||||
|
bref->bref_pending_cmd = gwbuf_clone(querybuf);
|
||||||
|
|
||||||
|
rses_end_locked_router_action(rses);
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1)
|
||||||
|
{
|
||||||
|
backend_ref_t* bref;
|
||||||
|
|
||||||
|
atomic_add(&inst->stats.n_queries, 1);
|
||||||
|
/**
|
||||||
|
* Add one query response waiter to backend reference
|
||||||
|
*/
|
||||||
|
bref = get_bref_from_dcb(rses, target_dcb);
|
||||||
|
bref_set_state(bref, BREF_QUERY_ACTIVE);
|
||||||
|
bref_set_state(bref, BREF_WAITING_RESULT);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error : Routing query failed.")));
|
||||||
|
succp = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
rses_end_locked_router_action(rses);
|
||||||
|
|
||||||
|
retblock:
|
||||||
|
#if defined(SS_DEBUG2)
|
||||||
|
{
|
||||||
|
char* canonical_query_str;
|
||||||
|
|
||||||
|
canonical_query_str = skygw_get_canonical(querybuf);
|
||||||
|
|
||||||
|
if (canonical_query_str != NULL)
|
||||||
|
{
|
||||||
|
LOGIF(LT, (skygw_log_write(
|
||||||
|
LOGFILE_TRACE,
|
||||||
|
"Canonical version: %s",
|
||||||
|
canonical_query_str)));
|
||||||
|
free(canonical_query_str);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
return succp;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Reference in New Issue
Block a user