From 94ac2d89d09833158398fc841b999d877009b614 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Mon, 27 Mar 2017 23:39:00 +0300 Subject: [PATCH] MXS-1160: Add support for LOAD DATA LOCAL INFILE Schemarouter now supports the LOAD DATA LOCAL INFILE command. --- .../schemarouter/schemaroutersession.cc | 167 +++++++++++------- .../schemarouter/schemaroutersession.hh | 1 + 2 files changed, 102 insertions(+), 66 deletions(-) diff --git a/server/modules/routing/schemarouter/schemaroutersession.cc b/server/modules/routing/schemarouter/schemaroutersession.cc index ae1b86055..3714f36fd 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -128,7 +128,8 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter* rou m_shard(m_router->m_shard_manager.get_shard(m_client->user, m_config->refresh_min_interval)), m_state(0), m_sent_sescmd(0), - m_replied_sescmd(0) + m_replied_sescmd(0), + m_load_target(NULL) { char db[MYSQL_DATABASE_MAXLEN + 1] = ""; MySQLProtocol* protocol = (MySQLProtocol*)session->client_dcb->protocol; @@ -368,6 +369,21 @@ SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket, return target; } +static bool is_empty_packet(GWBUF* pPacket) +{ + bool rval = false; + uint8_t len[3]; + + if (gwbuf_length(pPacket) == 4 && + gwbuf_copy_data(pPacket, 0, 3, len) == 3 && + gw_mysql_get_byte3(len) == 0) + { + rval = true; + } + + return rval; +} + int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) { ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(pPacket)); @@ -416,91 +432,105 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) qc_query_op_t op = QUERY_OP_UNDEFINED; enum route_target route_target = TARGET_UNDEFINED; - inspect_query(pPacket, &type, &op, &command); - - /** Create the response to the SHOW DATABASES from the mapped databases */ - if (qc_query_is_type(type, QUERY_TYPE_SHOW_DATABASES)) + if (m_load_target) { - if (send_database_list()) + /** A load data local infile is active */ + target = m_load_target->m_backend->server; + route_target = TARGET_NAMED_SERVER; + + if (is_empty_packet(pPacket)) { - ret = 1; + m_load_target = NULL; } - - gwbuf_free(pPacket); - return ret; } - else if (detect_show_shards(pPacket)) + else { - if (process_show_shards()) + inspect_query(pPacket, &type, &op, &command); + + /** Create the response to the SHOW DATABASES from the mapped databases */ + if (qc_query_is_type(type, QUERY_TYPE_SHOW_DATABASES)) { - ret = 1; - } - gwbuf_free(pPacket); - return ret; - } - - /** The default database changes must be routed to a specific server */ - if (command == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB) - { - if (!change_current_db(m_current_db, m_shard, pPacket)) - { - char db[MYSQL_DATABASE_MAXLEN + 1]; - extract_database(pPacket, db); - gwbuf_free(pPacket); - - char errbuf[128 + MYSQL_DATABASE_MAXLEN]; - snprintf(errbuf, sizeof(errbuf), "Unknown database: %s", db); - - if (m_config->debug) + if (send_database_list()) { - sprintf(errbuf + strlen(errbuf), - " ([%lu]: DB change failed)", - m_client->session->ses_id); + ret = 1; } - write_error_to_client(m_client, - SCHEMA_ERR_DBNOTFOUND, - SCHEMA_ERRSTR_DBNOTFOUND, - errbuf); - return 1; + gwbuf_free(pPacket); + return ret; + } + else if (detect_show_shards(pPacket)) + { + if (process_show_shards()) + { + ret = 1; + } + gwbuf_free(pPacket); + return ret; } - route_target = TARGET_UNDEFINED; - target = m_shard.get_location(m_current_db); - - if (target) + /** The default database changes must be routed to a specific server */ + if (command == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB) { - MXS_INFO("INIT_DB for database '%s' on server '%s'", - m_current_db.c_str(), target->unique_name); - route_target = TARGET_NAMED_SERVER; + if (!change_current_db(m_current_db, m_shard, pPacket)) + { + char db[MYSQL_DATABASE_MAXLEN + 1]; + extract_database(pPacket, db); + gwbuf_free(pPacket); + + char errbuf[128 + MYSQL_DATABASE_MAXLEN]; + snprintf(errbuf, sizeof (errbuf), "Unknown database: %s", db); + + if (m_config->debug) + { + sprintf(errbuf + strlen(errbuf), + " ([%lu]: DB change failed)", + m_client->session->ses_id); + } + + write_error_to_client(m_client, + SCHEMA_ERR_DBNOTFOUND, + SCHEMA_ERRSTR_DBNOTFOUND, + errbuf); + return 1; + } + + route_target = TARGET_UNDEFINED; + target = m_shard.get_location(m_current_db); + + if (target) + { + MXS_INFO("INIT_DB for database '%s' on server '%s'", + m_current_db.c_str(), target->unique_name); + route_target = TARGET_NAMED_SERVER; + } + else + { + MXS_INFO("INIT_DB with unknown database"); + } } else { - MXS_INFO("INIT_DB with unknown database"); + route_target = get_shard_route_target(type); } - } - else - { - route_target = get_shard_route_target(type); - } - /** - * Find a suitable server that matches the requirements of @c route_target - */ + /** + * Find a suitable server that matches the requirements of @c route_target + */ - if (TARGET_IS_ALL(route_target)) - { - /** Session commands, route to all servers */ - if (route_session_write(pPacket, command)) + if (TARGET_IS_ALL(route_target)) { - atomic_add(&m_router->m_stats.n_sescmd, 1); - atomic_add(&m_router->m_stats.n_queries, 1); - ret = 1; + /** Session commands, route to all servers */ + if (route_session_write(pPacket, command)) + { + atomic_add(&m_router->m_stats.n_sescmd, 1); + atomic_add(&m_router->m_stats.n_queries, 1); + ret = 1; + } + } + else + { + target = resolve_query_target(pPacket, type, command, route_target); } - } - else - { - target = resolve_query_target(pPacket, type, command, route_target); } DCB* target_dcb = NULL; @@ -511,6 +541,11 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) /** We know where to route this query */ Backend *bref = get_bref_from_dcb(target_dcb); + if (op == QUERY_OP_LOAD) + { + m_load_target = bref; + } + MXS_INFO("Route query to \t%s:%d <", bref->m_backend->server->name, bref->m_backend->server->port); if (bref->m_session_commands.size() > 0) diff --git a/server/modules/routing/schemarouter/schemaroutersession.hh b/server/modules/routing/schemarouter/schemaroutersession.hh index 676ce6c8b..2832d6096 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.hh +++ b/server/modules/routing/schemarouter/schemaroutersession.hh @@ -207,4 +207,5 @@ private: Stats m_stats; /**< Statistics for this router */ uint64_t m_sent_sescmd; /**< The latest session command being executed */ uint64_t m_replied_sescmd; /**< The last session command reply that was sent to the client */ + Backend* m_load_target; /**< Target for LOAD DATA LOCAL INFILE */ };