MXS-1160: Add support for LOAD DATA LOCAL INFILE

Schemarouter now supports the LOAD DATA LOCAL INFILE command.
This commit is contained in:
Markus Mäkelä 2017-03-27 23:39:00 +03:00
parent f6470c580a
commit 94ac2d89d0
2 changed files with 102 additions and 66 deletions

View File

@ -128,7 +128,8 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* rou
m_shard(m_router->m_shard_manager.get_shard(m_client->user, m_config->refresh_min_interval)),
m_state(0),
m_sent_sescmd(0),
m_replied_sescmd(0)
m_replied_sescmd(0),
m_load_target(NULL)
{
char db[MYSQL_DATABASE_MAXLEN + 1] = "";
MySQLProtocol* protocol = (MySQLProtocol*)session->client_dcb->protocol;
@ -368,6 +369,21 @@ SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket,
return target;
}
static bool is_empty_packet(GWBUF* pPacket)
{
bool rval = false;
uint8_t len[3];
if (gwbuf_length(pPacket) == 4 &&
gwbuf_copy_data(pPacket, 0, 3, len) == 3 &&
gw_mysql_get_byte3(len) == 0)
{
rval = true;
}
return rval;
}
int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
{
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(pPacket));
@ -416,91 +432,105 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
qc_query_op_t op = QUERY_OP_UNDEFINED;
enum route_target route_target = TARGET_UNDEFINED;
inspect_query(pPacket, &type, &op, &command);
/** Create the response to the SHOW DATABASES from the mapped databases */
if (qc_query_is_type(type, QUERY_TYPE_SHOW_DATABASES))
if (m_load_target)
{
if (send_database_list())
/** A load data local infile is active */
target = m_load_target->m_backend->server;
route_target = TARGET_NAMED_SERVER;
if (is_empty_packet(pPacket))
{
ret = 1;
m_load_target = NULL;
}
gwbuf_free(pPacket);
return ret;
}
else if (detect_show_shards(pPacket))
else
{
if (process_show_shards())
inspect_query(pPacket, &type, &op, &command);
/** Create the response to the SHOW DATABASES from the mapped databases */
if (qc_query_is_type(type, QUERY_TYPE_SHOW_DATABASES))
{
ret = 1;
}
gwbuf_free(pPacket);
return ret;
}
/** The default database changes must be routed to a specific server */
if (command == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB)
{
if (!change_current_db(m_current_db, m_shard, pPacket))
{
char db[MYSQL_DATABASE_MAXLEN + 1];
extract_database(pPacket, db);
gwbuf_free(pPacket);
char errbuf[128 + MYSQL_DATABASE_MAXLEN];
snprintf(errbuf, sizeof(errbuf), "Unknown database: %s", db);
if (m_config->debug)
if (send_database_list())
{
sprintf(errbuf + strlen(errbuf),
" ([%lu]: DB change failed)",
m_client->session->ses_id);
ret = 1;
}
write_error_to_client(m_client,
SCHEMA_ERR_DBNOTFOUND,
SCHEMA_ERRSTR_DBNOTFOUND,
errbuf);
return 1;
gwbuf_free(pPacket);
return ret;
}
else if (detect_show_shards(pPacket))
{
if (process_show_shards())
{
ret = 1;
}
gwbuf_free(pPacket);
return ret;
}
route_target = TARGET_UNDEFINED;
target = m_shard.get_location(m_current_db);
if (target)
/** The default database changes must be routed to a specific server */
if (command == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB)
{
MXS_INFO("INIT_DB for database '%s' on server '%s'",
m_current_db.c_str(), target->unique_name);
route_target = TARGET_NAMED_SERVER;
if (!change_current_db(m_current_db, m_shard, pPacket))
{
char db[MYSQL_DATABASE_MAXLEN + 1];
extract_database(pPacket, db);
gwbuf_free(pPacket);
char errbuf[128 + MYSQL_DATABASE_MAXLEN];
snprintf(errbuf, sizeof (errbuf), "Unknown database: %s", db);
if (m_config->debug)
{
sprintf(errbuf + strlen(errbuf),
" ([%lu]: DB change failed)",
m_client->session->ses_id);
}
write_error_to_client(m_client,
SCHEMA_ERR_DBNOTFOUND,
SCHEMA_ERRSTR_DBNOTFOUND,
errbuf);
return 1;
}
route_target = TARGET_UNDEFINED;
target = m_shard.get_location(m_current_db);
if (target)
{
MXS_INFO("INIT_DB for database '%s' on server '%s'",
m_current_db.c_str(), target->unique_name);
route_target = TARGET_NAMED_SERVER;
}
else
{
MXS_INFO("INIT_DB with unknown database");
}
}
else
{
MXS_INFO("INIT_DB with unknown database");
route_target = get_shard_route_target(type);
}
}
else
{
route_target = get_shard_route_target(type);
}
/**
* Find a suitable server that matches the requirements of @c route_target
*/
/**
* Find a suitable server that matches the requirements of @c route_target
*/
if (TARGET_IS_ALL(route_target))
{
/** Session commands, route to all servers */
if (route_session_write(pPacket, command))
if (TARGET_IS_ALL(route_target))
{
atomic_add(&m_router->m_stats.n_sescmd, 1);
atomic_add(&m_router->m_stats.n_queries, 1);
ret = 1;
/** Session commands, route to all servers */
if (route_session_write(pPacket, command))
{
atomic_add(&m_router->m_stats.n_sescmd, 1);
atomic_add(&m_router->m_stats.n_queries, 1);
ret = 1;
}
}
else
{
target = resolve_query_target(pPacket, type, command, route_target);
}
}
else
{
target = resolve_query_target(pPacket, type, command, route_target);
}
DCB* target_dcb = NULL;
@ -511,6 +541,11 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
/** We know where to route this query */
Backend *bref = get_bref_from_dcb(target_dcb);
if (op == QUERY_OP_LOAD)
{
m_load_target = bref;
}
MXS_INFO("Route query to \t%s:%d <", bref->m_backend->server->name, bref->m_backend->server->port);
if (bref->m_session_commands.size() > 0)

View File

@ -207,4 +207,5 @@ private:
Stats m_stats; /**< Statistics for this router */
uint64_t m_sent_sescmd; /**< The latest session command being executed */
uint64_t m_replied_sescmd; /**< The last session command reply that was sent to the client */
Backend* m_load_target; /**< Target for LOAD DATA LOCAL INFILE */
};