Fix to MXS-365: https://mariadb.atlassian.net/browse/MXS-365 Added tracking of LOAD DATA LOCAL INFILE

While a LOAD DATA LOCAL INFILE query is being executed, all queries will be sent to the master
and they will not be processed as normal packets.
This commit is contained in:
Markus Makela
2015-11-03 23:34:12 +02:00
parent 126b4c1d79
commit 0accf869de
4 changed files with 77 additions and 31 deletions

View File

@ -1647,6 +1647,11 @@ retblock:
skygw_query_op_t query_classifier_get_operation(GWBUF* querybuf) skygw_query_op_t query_classifier_get_operation(GWBUF* querybuf)
{ {
if (!query_is_parsed(querybuf))
{
parse_query(querybuf);
}
LEX* lex = get_lex(querybuf); LEX* lex = get_lex(querybuf);
skygw_query_op_t operation = QUERY_OP_UNDEFINED; skygw_query_op_t operation = QUERY_OP_UNDEFINED;
if(lex){ if(lex){
@ -1687,6 +1692,9 @@ skygw_query_op_t query_classifier_get_operation(GWBUF* querybuf)
case SQLCOM_CHANGE_DB: case SQLCOM_CHANGE_DB:
operation = QUERY_OP_CHANGE_DB; operation = QUERY_OP_CHANGE_DB;
break; break;
case SQLCOM_LOAD:
operation = QUERY_OP_LOAD;
break;
default: default:
operation = QUERY_OP_UNDEFINED; operation = QUERY_OP_UNDEFINED;

View File

@ -75,7 +75,8 @@ typedef enum {
QUERY_OP_CREATE_INDEX = (1 << 8), QUERY_OP_CREATE_INDEX = (1 << 8),
QUERY_OP_DROP_TABLE = (1 << 9), QUERY_OP_DROP_TABLE = (1 << 9),
QUERY_OP_DROP_INDEX = (1 << 10), QUERY_OP_DROP_INDEX = (1 << 10),
QUERY_OP_CHANGE_DB = (1 << 11) QUERY_OP_CHANGE_DB = (1 << 11),
QUERY_OP_LOAD = (1 << 12)
}skygw_query_op_t; }skygw_query_op_t;
typedef struct parsing_info_st { typedef struct parsing_info_st {

View File

@ -293,6 +293,9 @@ struct router_client_session {
int rses_capabilities; /*< input type, for example */ int rses_capabilities; /*< input type, for example */
bool rses_autocommit_enabled; bool rses_autocommit_enabled;
bool rses_transaction_active; bool rses_transaction_active;
bool rses_load_active; /*< If LOAD DATA LOCAL INFILE is
* being currently executed */
uint64_t rses_load_data_sent; /*< How much data has been sent */
DCB* client_dcb; DCB* client_dcb;
int pos_generator; int pos_generator;
#if defined(PREP_STMT_CACHING) #if defined(PREP_STMT_CACHING)

View File

@ -106,6 +106,7 @@ static DCB* rses_get_client_dcb(ROUTER_CLIENT_SES* rses);
static route_target_t get_route_target ( static route_target_t get_route_target (
skygw_query_type_t qtype, skygw_query_type_t qtype,
bool trx_active, bool trx_active,
bool load_active,
target_t use_sql_variables_in, target_t use_sql_variables_in,
HINT* hint); HINT* hint);
@ -1370,6 +1371,7 @@ static backend_ref_t* check_candidate_bref(
static route_target_t get_route_target ( static route_target_t get_route_target (
skygw_query_type_t qtype, skygw_query_type_t qtype,
bool trx_active, bool trx_active,
bool load_active,
target_t use_sql_variables_in, target_t use_sql_variables_in,
HINT* hint) HINT* hint)
{ {
@ -1377,7 +1379,7 @@ static route_target_t get_route_target (
/** /**
* These queries are not affected by hints * These queries are not affected by hints
*/ */
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE) || if (!load_active && (QUERY_IS_TYPE(qtype, QUERY_TYPE_SESSION_WRITE) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_STMT) || QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_STMT) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_NAMED_STMT) || QUERY_IS_TYPE(qtype, QUERY_TYPE_PREPARE_NAMED_STMT) ||
/** Configured to allow writing variables to all nodes */ /** Configured to allow writing variables to all nodes */
@ -1385,7 +1387,7 @@ static route_target_t get_route_target (
QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_WRITE)) || QUERY_IS_TYPE(qtype, QUERY_TYPE_GSYSVAR_WRITE)) ||
/** enable or disable autocommit are always routed to all */ /** enable or disable autocommit are always routed to all */
QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) || QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT) ||
QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT)) QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT)))
{ {
/** /**
* This is problematic query because it would be routed to all * This is problematic query because it would be routed to all
@ -1423,7 +1425,7 @@ static route_target_t get_route_target (
/** /**
* Hints may affect on routing of the following queries * Hints may affect on routing of the following queries
*/ */
else if (!trx_active && else if (!trx_active && !load_active &&
(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) || /*< any SELECT */ (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) || /*< any SELECT */
QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_TABLES) || /*< 'SHOW TABLES' */ QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_TABLES) || /*< 'SHOW TABLES' */
QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ)|| /*< read user var */ QUERY_IS_TYPE(qtype, QUERY_TYPE_USERVAR_READ)|| /*< read user var */
@ -2143,8 +2145,13 @@ static bool route_single_stmt(
if(packet_len == 0) if(packet_len == 0)
{ {
/** Empty packet signals end of LOAD DATA LOCAL INFILE, send it to master*/
route_target = TARGET_MASTER; route_target = TARGET_MASTER;
packet_type = MYSQL_COM_UNDEFINED; packet_type = MYSQL_COM_UNDEFINED;
rses->rses_load_active = false;
route_target = TARGET_MASTER;
skygw_log_write_flush(LT, "> LOAD DATA LOCAL INFILE finished: "
"%lu bytes sent.", rses->rses_load_data_sent + gwbuf_length(querybuf));
} }
else else
{ {
@ -2194,17 +2201,36 @@ static bool route_single_stmt(
break; break;
} /**< switch by packet type */ } /**< switch by packet type */
/**
* Check if the query has anything to do with temporary tables.
*/
if (!rses_begin_locked_router_action(rses)) if (!rses_begin_locked_router_action(rses))
{ {
succp = false; succp = false;
goto retblock; goto retblock;
} }
/**
* Check if the query has anything to do with temporary tables.
*/
qtype = is_read_tmp_table(rses, querybuf, qtype); qtype = is_read_tmp_table(rses, querybuf, qtype);
check_create_tmp_table(rses, querybuf, qtype); check_create_tmp_table(rses, querybuf, qtype);
check_drop_tmp_table(rses, querybuf,qtype); check_drop_tmp_table(rses, querybuf,qtype);
/**
* Check if this is a LOAD DATA LOCAL INFILE query. If so, send all queries
* to the master until the last, empty packet arrives.
*/
if (!rses->rses_load_active)
{
skygw_query_op_t queryop = query_classifier_get_operation(querybuf);
if (queryop == QUERY_OP_LOAD)
{
rses->rses_load_active = true;
rses->rses_load_data_sent = 0;
}
}
else
{
rses->rses_load_data_sent += gwbuf_length(querybuf);
}
rses_end_locked_router_action(rses); rses_end_locked_router_action(rses);
/** /**
* If autocommit is disabled or transaction is explicitly started * If autocommit is disabled or transaction is explicitly started
@ -2244,6 +2270,8 @@ static bool route_single_stmt(
} }
if (LOG_IS_ENABLED(LOGFILE_TRACE)) if (LOG_IS_ENABLED(LOGFILE_TRACE))
{
if (!rses->rses_load_active)
{ {
uint8_t* packet = GWBUF_DATA(querybuf); uint8_t* packet = GWBUF_DATA(querybuf);
unsigned char ptype = packet[4]; unsigned char ptype = packet[4];
@ -2253,8 +2281,7 @@ static bool route_single_stmt(
char* contentstr = strndup(data, MIN(len, RWSPLIT_TRACE_MSG_LEN)); char* contentstr = strndup(data, MIN(len, RWSPLIT_TRACE_MSG_LEN));
char* qtypestr = skygw_get_qtype_str(qtype); char* qtypestr = skygw_get_qtype_str(qtype);
skygw_log_write( skygw_log_write(LOGFILE_TRACE,
LOGFILE_TRACE,
"> Autocommit: %s, trx is %s, cmd: %s, type: %s, " "> Autocommit: %s, trx is %s, cmd: %s, type: %s, "
"stmt: %s%s %s", "stmt: %s%s %s",
(rses->rses_autocommit_enabled ? "[enabled]" : "[disabled]"), (rses->rses_autocommit_enabled ? "[enabled]" : "[disabled]"),
@ -2268,6 +2295,12 @@ static bool route_single_stmt(
free(contentstr); free(contentstr);
free(qtypestr); free(qtypestr);
} }
else
{
skygw_log_write(LT, "> Processing LOAD DATA LOCAL INFILE: "
"%lu bytes sent.", rses->rses_load_data_sent);
}
}
/** /**
* Find out where to route the query. Result may not be clear; it is * Find out where to route the query. Result may not be clear; it is
* possible to have a hint for routing to a named server which can * possible to have a hint for routing to a named server which can
@ -2287,6 +2320,7 @@ static bool route_single_stmt(
*/ */
route_target = get_route_target(qtype, route_target = get_route_target(qtype,
rses->rses_transaction_active, rses->rses_transaction_active,
rses->rses_load_active,
rses->rses_config.rw_use_sql_variables_in, rses->rses_config.rw_use_sql_variables_in,
querybuf->hint); querybuf->hint);