From ac641e0f22ee1bbf231abf25d1ea95108be2c3ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20M=C3=A4kel=C3=A4?= Date: Mon, 27 Mar 2017 09:22:59 +0300 Subject: [PATCH] Use router template in schemarouter The schemarouter now uses the router template. --- .../routing/schemarouter/CMakeLists.txt | 2 +- .../routing/schemarouter/schemarouter.hh | 146 +- .../schemarouter/schemarouterinstance.cc | 268 ++ .../schemarouter/schemarouterinstance.hh | 61 + ...schemarouter.cc => schemaroutersession.cc} | 3597 +++++++---------- .../schemarouter/schemaroutersession.hh | 198 + 6 files changed, 2072 insertions(+), 2200 deletions(-) create mode 100644 server/modules/routing/schemarouter/schemarouterinstance.cc create mode 100644 server/modules/routing/schemarouter/schemarouterinstance.hh rename server/modules/routing/schemarouter/{schemarouter.cc => schemaroutersession.cc} (61%) create mode 100644 server/modules/routing/schemarouter/schemaroutersession.hh diff --git a/server/modules/routing/schemarouter/CMakeLists.txt b/server/modules/routing/schemarouter/CMakeLists.txt index 4be7c2221..6bc3fd575 100644 --- a/server/modules/routing/schemarouter/CMakeLists.txt +++ b/server/modules/routing/schemarouter/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(schemarouter SHARED schemarouter.cc shard_map.cc session_command.cc) +add_library(schemarouter SHARED schemarouterinstance.cc schemaroutersession.cc shard_map.cc session_command.cc) target_link_libraries(schemarouter maxscale-common) add_dependencies(schemarouter pcre2) set_target_properties(schemarouter PROPERTIES VERSION "1.0.0") diff --git a/server/modules/routing/schemarouter/schemarouter.hh b/server/modules/routing/schemarouter/schemarouter.hh index e5b608ae2..3422214ac 100644 --- a/server/modules/routing/schemarouter/schemarouter.hh +++ b/server/modules/routing/schemarouter/schemarouter.hh @@ -14,112 +14,13 @@ #pragma once /** - * @file schemarouter.hh - The schemarouter router module header file + * @file schemarouter.hh - Common schemarouter definitions */ #define MXS_MODULE_NAME "schemarouter" #include -#include -#include - -#include -#include -#include -#include - -#include "shard_map.hh" -#include "session_command.hh" - -using std::string; -using std::set; - -/** - * Bitmask values for the router session's initialization. These values are used - * to prevent responses from internal commands being forwarded to the client. - */ -typedef enum init_mask -{ - INIT_READY = 0x00, - INIT_MAPPING = 0x01, - INIT_USE_DB = 0x02, - INIT_UNINT = 0x04, - INIT_FAILED = 0x08 -} init_mask_t; - -typedef enum showdb_response -{ - SHOWDB_FULL_RESPONSE, - SHOWDB_PARTIAL_RESPONSE, - SHOWDB_DUPLICATE_DATABASES, - SHOWDB_FATAL_ERROR -} showdb_response_t; - -/** - * The state of the backend server reference - */ -typedef enum bref_state -{ - BREF_IN_USE = 0x01, - BREF_WAITING_RESULT = 0x02, /**< for session commands only */ - BREF_QUERY_ACTIVE = 0x04, /**< for other queries */ - BREF_CLOSED = 0x08, - BREF_DB_MAPPED = 0x10 -} bref_state_t; - -#define BREF_IS_NOT_USED(s) ((s)->bref_state & ~BREF_IN_USE) -#define BREF_IS_IN_USE(s) ((s)->bref_state & BREF_IN_USE) -#define BREF_IS_WAITING_RESULT(s) ((s)->bref_num_result_wait > 0) -#define BREF_IS_QUERY_ACTIVE(s) ((s)->bref_state & BREF_QUERY_ACTIVE) -#define BREF_IS_CLOSED(s) ((s)->bref_state & BREF_CLOSED) -#define BREF_IS_MAPPED(s) ((s)->bref_mapped) - -#define SCHEMA_ERR_DUPLICATEDB 5000 -#define SCHEMA_ERRSTR_DUPLICATEDB "DUPDB" -#define SCHEMA_ERR_DBNOTFOUND 1049 -#define SCHEMA_ERRSTR_DBNOTFOUND "42000" - -struct schemarouter_instance; - -/** - * Route target types - */ -typedef enum -{ - TARGET_UNDEFINED = (1 << 0), - TARGET_NAMED_SERVER = (1 << 1), - TARGET_ALL = (1 << 2), - TARGET_ANY = (1 << 3) -} route_target_t; - -/** Helper macros for route target type */ -#define TARGET_IS_UNDEFINED(t) (t == TARGET_UNDEFINED) -#define TARGET_IS_NAMED_SERVER(t) (t & TARGET_NAMED_SERVER) -#define TARGET_IS_ALL(t) (t & TARGET_ALL) -#define TARGET_IS_ANY(t) (t & TARGET_ANY) - -/** - * Reference to BACKEND. - * - * Owned by router client session. - */ -typedef struct backend_ref_st -{ - int n_mapping_eof; - GWBUF* map_queue; - SERVER_REF* bref_backend; /*< Backend server */ - DCB* bref_dcb; /*< Backend DCB */ - int bref_state; /*< State of the backend */ - bool bref_mapped; /*< Whether the backend has been mapped */ - bool last_sescmd_replied; - int bref_num_result_wait; /*< Number of not yet received results */ - GWBUF* bref_pending_cmd; /*< Pending commands */ - - SessionCommandList session_commands; /**< List of session commands that are - * to be executed on this backend server */ -} backend_ref_t; - /** * Configuration values */ @@ -147,48 +48,3 @@ typedef struct int shmap_cache_hit; /*< Shard map was found from the cache */ int shmap_cache_miss; /*< No shard map found from the cache */ } ROUTER_STATS; - -/** - * The per instance data for the router. - */ -typedef struct schemarouter_instance -{ - ShardManager shard_manager; /*< Shard maps hashed by user name */ - SERVICE* service; /*< Pointer to service */ - SPINLOCK lock; /*< Lock for the instance data */ - schemarouter_config_t schemarouter_config; /*< expanded config info from SERVICE */ - int schemarouter_version; /*< version number for router's config */ - ROUTER_STATS stats; /*< Statistics for this router */ - set ignored_dbs; /*< List of databases to ignore when the - * database mapping finds multiple servers - * with the same database */ - pcre2_code* ignore_regex; /*< Databases matching this regex will - * not cause the session to be terminated - * if they are found on more than one server. */ - pcre2_match_data* ignore_match_data; - -} SCHEMAROUTER; - -/** - * The client session structure used within this router. - */ -typedef struct schemarouter_session -{ - bool closed; /*< true when closeSession is called */ - DCB* rses_client_dcb; - MYSQL_session* rses_mysql_session; /*< Session client data (username, password, SHA1). */ - backend_ref_t* rses_backend_ref; /*< Pointer to backend reference array */ - schemarouter_config_t rses_config; /*< Copied config info from router instance */ - int rses_nbackends; /*< Number of backends */ - SCHEMAROUTER *router; /*< The router instance */ - Shard shardmap; /**< Database hash containing names of the databases - * mapped to the servers that contain them */ - string connect_db; /*< Database the user was trying to connect to */ - string current_db; /*< Current active database */ - int state; /*< Initialization state bitmask */ - GWBUF* queue; /*< Query that was received before the session was ready */ - ROUTER_STATS stats; /*< Statistics for this router */ - - uint64_t sent_sescmd; /**< The latest session command being executed */ - uint64_t replied_sescmd; /**< The last session command reply that was sent to the client */ -} SCHEMAROUTER_SESSION; diff --git a/server/modules/routing/schemarouter/schemarouterinstance.cc b/server/modules/routing/schemarouter/schemarouterinstance.cc new file mode 100644 index 000000000..9b48ad84c --- /dev/null +++ b/server/modules/routing/schemarouter/schemarouterinstance.cc @@ -0,0 +1,268 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#include "schemarouter.hh" +#include "schemarouterinstance.hh" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using std::string; +using std::map; + +#define DEFAULT_REFRESH_INTERVAL "300" + +/** + * @file schemarouter.c The entry points for the simple sharding router module. + */ + +SchemaRouter::SchemaRouter(SERVICE *service, char **options): + mxs::Router(service) +{ + MXS_CONFIG_PARAMETER* conf; + MXS_CONFIG_PARAMETER* param; + + /** Add default system databases to ignore */ + this->ignored_dbs.insert("mysql"); + this->ignored_dbs.insert("information_schema"); + this->ignored_dbs.insert("performance_schema"); + this->service = service; + this->stats.longest_sescmd = 0; + this->stats.n_hist_exceeded = 0; + this->stats.n_queries = 0; + this->stats.n_sescmd = 0; + this->stats.ses_longest = 0; + this->stats.ses_shortest = (double)((unsigned long)(~0)); + spinlock_init(&this->lock); + + conf = service->svc_config_param; + + this->schemarouter_config.refresh_databases = config_get_bool(conf, "refresh_databases"); + this->schemarouter_config.refresh_min_interval = config_get_integer(conf, "refresh_interval"); + this->schemarouter_config.debug = config_get_bool(conf, "debug"); + + if ((config_get_param(conf, "auth_all_servers")) == NULL) + { + MXS_NOTICE("Authentication data is fetched from all servers. To disable this " + "add 'auth_all_servers=0' to the service."); + service->users_from_all = true; + } + + if ((param = config_get_param(conf, "ignore_databases_regex"))) + { + int errcode; + PCRE2_SIZE erroffset; + pcre2_code* re = pcre2_compile((PCRE2_SPTR)param->value, PCRE2_ZERO_TERMINATED, 0, + &errcode, &erroffset, NULL); + + if (re == NULL) + { + PCRE2_UCHAR errbuf[512]; + pcre2_get_error_message(errcode, errbuf, sizeof(errbuf)); + MXS_ERROR("Regex compilation failed at %d for regex '%s': %s", + (int)erroffset, param->value, errbuf); + throw std::runtime_error("Regex compilation failed"); + } + + pcre2_match_data* match_data = pcre2_match_data_create_from_pattern(re, NULL); + + if (match_data == NULL) + { + pcre2_code_free(re); + throw std::bad_alloc(); + } + + this->ignore_regex = re; + this->ignore_match_data = match_data; + } + + if ((param = config_get_param(conf, "ignore_databases"))) + { + char val[strlen(param->value) + 1]; + strcpy(val, param->value); + + const char *sep = ", \t"; + char *sptr; + char *tok = strtok_r(val, sep, &sptr); + + while (tok) + { + this->ignored_dbs.insert(tok); + tok = strtok_r(NULL, sep, &sptr); + } + } + + bool failure = false; + + for (int i = 0; options && options[i]; i++) + { + char* value = strchr(options[i], '='); + + if (value == NULL) + { + MXS_ERROR("Unknown router options for %s", options[i]); + failure = true; + break; + } + + *value = '\0'; + value++; + + if (strcmp(options[i], "max_sescmd_history") == 0) + { + MXS_WARNING("Use of 'max_sescmd_history' is deprecated"); + } + else if (strcmp(options[i], "disable_sescmd_history") == 0) + { + MXS_WARNING("Use of 'disable_sescmd_history' is deprecated"); + } + else if (strcmp(options[i], "refresh_databases") == 0) + { + this->schemarouter_config.refresh_databases = config_truth_value(value); + } + else if (strcmp(options[i], "refresh_interval") == 0) + { + this->schemarouter_config.refresh_min_interval = atof(value); + } + else if (strcmp(options[i], "debug") == 0) + { + this->schemarouter_config.debug = config_truth_value(value); + } + else + { + MXS_ERROR("Unknown router options for %s", options[i]); + failure = true; + break; + } + } + + if (failure) + { + throw std::runtime_error("Failed to create schemarouter instance."); + } +} + +SchemaRouter::~SchemaRouter() +{ + if (this->ignore_regex) + { + pcre2_code_free(this->ignore_regex); + } + + if (this->ignore_match_data) + { + pcre2_match_data_free(this->ignore_match_data); + } +} + +SchemaRouter* SchemaRouter::create(SERVICE* pService, char** pzOptions) +{ + return new SchemaRouter(pService, pzOptions); +} + +SchemaRouterSession* SchemaRouter::newSession(MXS_SESSION* pSession) +{ + return new SchemaRouterSession(pSession, *this); +} + +void SchemaRouter::diagnostics(DCB* dcb) +{ + double sescmd_pct = this->stats.n_sescmd != 0 ? + 100.0 * ((double)this->stats.n_sescmd / (double)this->stats.n_queries) : + 0.0; + + /** Session command statistics */ + dcb_printf(dcb, "\n\33[1;4mSession Commands\33[0m\n"); + dcb_printf(dcb, "Total number of queries: %d\n", + this->stats.n_queries); + dcb_printf(dcb, "Percentage of session commands: %.2f\n", + sescmd_pct); + dcb_printf(dcb, "Longest chain of stored session commands: %d\n", + this->stats.longest_sescmd); + dcb_printf(dcb, "Session command history limit exceeded: %d times\n", + this->stats.n_hist_exceeded); + + /** Session time statistics */ + + if (this->stats.sessions > 0) + { + dcb_printf(dcb, "\n\33[1;4mSession Time Statistics\33[0m\n"); + dcb_printf(dcb, "Longest session: %.2lf seconds\n", this->stats.ses_longest); + dcb_printf(dcb, "Shortest session: %.2lf seconds\n", this->stats.ses_shortest); + dcb_printf(dcb, "Average session length: %.2lf seconds\n", this->stats.ses_average); + } + dcb_printf(dcb, "Shard map cache hits: %d\n", this->stats.shmap_cache_hit); + dcb_printf(dcb, "Shard map cache misses: %d\n", this->stats.shmap_cache_miss); + dcb_printf(dcb, "\n"); +} + +uint64_t SchemaRouter::getCapabilities() +{ + return RCAP_TYPE_NONE; +} + +MXS_BEGIN_DECLS + +/** + * The module entry point routine. It is this routine that + * must populate the structure that is referred to as the + * "module object", this is a structure with the set of + * external entry points for this module. + * + * @return The module object + */ +MXS_MODULE* MXS_CREATE_MODULE() +{ + static MXS_MODULE info = + { + MXS_MODULE_API_ROUTER, + MXS_MODULE_BETA_RELEASE, + MXS_ROUTER_VERSION, + "A database sharding router for simple sharding", + "V1.0.0", + RCAP_TYPE_CONTIGUOUS_INPUT, + &SchemaRouter::s_object, + NULL, /* Process init. */ + NULL, /* Process finish. */ + NULL, /* Thread init. */ + NULL, /* Thread finish. */ + { + {"ignore_databases", MXS_MODULE_PARAM_STRING}, + {"ignore_databases_regex", MXS_MODULE_PARAM_STRING}, + {"max_sescmd_history", MXS_MODULE_PARAM_COUNT, "0"}, + {"disable_sescmd_history", MXS_MODULE_PARAM_BOOL, "false"}, + {"refresh_databases", MXS_MODULE_PARAM_BOOL, "true"}, + {"refresh_interval", MXS_MODULE_PARAM_COUNT, DEFAULT_REFRESH_INTERVAL}, + {"debug", MXS_MODULE_PARAM_BOOL, "false"}, + {MXS_END_MODULE_PARAMS} + } + }; + + return &info; +} + +MXS_END_DECLS diff --git a/server/modules/routing/schemarouter/schemarouterinstance.hh b/server/modules/routing/schemarouter/schemarouterinstance.hh new file mode 100644 index 000000000..a723ae9fd --- /dev/null +++ b/server/modules/routing/schemarouter/schemarouterinstance.hh @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#pragma once + +#include "schemarouter.hh" + +#include +#include + +#include +#include + +#include "schemaroutersession.hh" + +using std::string; +using std::set; + +class SchemaRouterSession; + +/** + * The per instance data for the router. + */ +class SchemaRouter: public mxs::Router +{ +public: + ~SchemaRouter(); + static SchemaRouter* create(SERVICE* pService, char** pzOptions); + SchemaRouterSession* newSession(MXS_SESSION* pSession); + void diagnostics(DCB* pDcb); + uint64_t getCapabilities(); + +protected: + friend class SchemaRouterSession; + schemarouter_config_t schemarouter_config; /*< expanded config info from SERVICE */ + + ShardManager shard_manager; /*< Shard maps hashed by user name */ + SERVICE* service; /*< Pointer to service */ + SPINLOCK lock; /*< Lock for the instance data */ + int schemarouter_version; /*< version number for router's config */ + ROUTER_STATS stats; /*< Statistics for this router */ + set ignored_dbs; /*< List of databases to ignore when the + * database mapping finds multiple servers + * with the same database */ + pcre2_code* ignore_regex; /*< Databases matching this regex will + * not cause the session to be terminated + * if they are found on more than one server. */ + pcre2_match_data* ignore_match_data; + + SchemaRouter(SERVICE *service, char **options); +}; diff --git a/server/modules/routing/schemarouter/schemarouter.cc b/server/modules/routing/schemarouter/schemaroutersession.cc similarity index 61% rename from server/modules/routing/schemarouter/schemarouter.cc rename to server/modules/routing/schemarouter/schemaroutersession.cc index 68c2bf3e1..1ee10e5ca 100644 --- a/server/modules/routing/schemarouter/schemarouter.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -13,75 +13,1515 @@ #include "schemarouter.hh" -#include -#include -#include -#include -#include -#include - #include -#include -#include -#include -#include -#include #include -#include -#include -#include +#include -using std::string; -using std::map; +#include "schemaroutersession.hh" +#include "schemarouterinstance.hh" -#define DEFAULT_REFRESH_INTERVAL "300" +bool connect_backend_servers(backend_ref_t* backend_ref, + int router_nservers, + MXS_SESSION* session); -/** - * @file schemarouter.c The entry points for the simple sharding router module. - */ +route_target_t get_shard_route_target(uint32_t qtype); +bool execute_sescmd_in_backend(backend_ref_t* backend_ref); -static backend_ref_t* get_bref_from_dcb(SCHEMAROUTER_SESSION* rses, DCB* dcb); +void bref_clear_state(backend_ref_t* bref, bref_state_t state); +void bref_set_state(backend_ref_t* bref, bref_state_t state); -static route_target_t get_shard_route_target(qc_query_type_t qtype); -static bool connect_backend_servers(backend_ref_t* backend_ref, - int router_nservers, - MXS_SESSION* session, - SCHEMAROUTER* router); - -static bool get_shard_dcb(DCB** dcb, - SCHEMAROUTER_SESSION* rses, - char* name); - -static bool execute_sescmd_in_backend(backend_ref_t* backend_ref); -static bool route_session_write(SCHEMAROUTER_SESSION* router_client_ses, - GWBUF* querybuf, - SCHEMAROUTER* inst, - unsigned char packet_type, - uint32_t qtype); -static void bref_clear_state(backend_ref_t* bref, bref_state_t state); -static void bref_set_state(backend_ref_t* bref, bref_state_t state); -static int router_handle_state_switch(DCB* dcb, DCB_REASON reason, void* data); -static bool handle_error_new_connection(SCHEMAROUTER* inst, - SCHEMAROUTER_SESSION* rses, - DCB* backend_dcb, - GWBUF* errmsg); -static void handle_error_reply_client(MXS_SESSION* ses, - SCHEMAROUTER_SESSION* rses, - DCB* backend_dcb, - GWBUF* errmsg); bool change_current_db(string& dest, Shard& shard, GWBUF* buf); bool extract_database(GWBUF* buf, char* str); - bool detect_show_shards(GWBUF* query); -int process_show_shards(SCHEMAROUTER_SESSION* rses); - void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg); -int inspect_backend_mapping_states(SCHEMAROUTER_SESSION *router_cli_ses, - backend_ref_t *bref, - GWBUF** wbuf); -bool handle_default_db(SCHEMAROUTER_SESSION *router_cli_ses); -void route_queued_query(SCHEMAROUTER_SESSION *router_cli_ses); -void synchronize_shard_map(SCHEMAROUTER_SESSION *client); + + +SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter& router): + mxs::RouterSession(session), + m_router(router) +{ + char db[MYSQL_DATABASE_MAXLEN + 1] = ""; + MySQLProtocol* protocol = (MySQLProtocol*)session->client_dcb->protocol; + MYSQL_session* data = (MYSQL_session*)session->client_dcb->data; + bool using_db = false; + bool have_db = false; + + /* To enable connecting directly to a sharded database we first need + * to disable it for the client DCB's protocol so that we can connect to them*/ + if (protocol->client_capabilities & GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB && + (have_db = strnlen(data->db, MYSQL_DATABASE_MAXLEN) > 0)) + { + protocol->client_capabilities &= ~GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB; + strcpy(db, data->db); + *data->db = 0; + using_db = true; + MXS_INFO("Client logging in directly to a database '%s', " + "postponing until databases have been mapped.", db); + } + + if (!have_db) + { + MXS_INFO("Client'%s' connecting with empty database.", data->user); + } + + SchemaRouterSession& client_rses = *this; + + this->m_router = router; + this->rses_client_dcb = (DCB*)session->client_dcb; + this->queue = NULL; + this->closed = false; + this->sent_sescmd = 0; + this->replied_sescmd = 0; + + this->shardmap = router.shard_manager.get_shard(session->client_dcb->user, + router.schemarouter_config.refresh_min_interval); + + this->rses_config = router.schemarouter_config; + + if (using_db) + { + this->state |= INIT_USE_DB; + } + /** + * Set defaults to session variables. + */ + + /** + * Instead of calling this, ensure that there is at least one + * responding server. + */ + + int router_nservers = router.service->n_dbref; + + /** + * Create backend reference objects for this session. + */ + + backend_ref_t* backend_ref = new backend_ref_t[router_nservers]; + + /** + * Initialize backend references with BACKEND ptr. + * Initialize session command cursors for each backend reference. + */ + + int i = 0; + + for (SERVER_REF *ref = router.service->dbref; ref; ref = ref->next) + { + if (ref->active) + { + backend_ref[i].bref_state = 0; + backend_ref[i].n_mapping_eof = 0; + backend_ref[i].map_queue = NULL; + backend_ref[i].bref_backend = ref; + backend_ref[i].bref_pending_cmd = NULL; + backend_ref[i].bref_num_result_wait = 0; + i++; + } + } + + if (i < router_nservers) + { + router_nservers = i; + } + + this->rses_backend_ref = backend_ref; + this->rses_nbackends = router_nservers; + + /** + * Connect to all backend servers + */ + bool succp = connect_backend_servers(backend_ref, router_nservers, session); + + if (!succp) + { + throw std::runtime_error("Failed to connect to backend servers"); + } + + if (db[0]) + { + /* Store the database the client is connecting to */ + this->connect_db = db; + } + + atomic_add(&router.stats.sessions, 1); +} + +SchemaRouterSession::~SchemaRouterSession() +{ + for (int i = 0; i < this->rses_nbackends; i++) + { + gwbuf_free(this->rses_backend_ref[i].bref_pending_cmd); + } + + delete[] this->rses_backend_ref; +} + +void SchemaRouterSession::close() +{ + ss_dassert(!this->closed); + + /** + * Lock router client session for secure read and update. + */ + if (!this->closed) + { + this->closed = true; + + for (int i = 0; i < this->rses_nbackends; i++) + { + backend_ref_t* bref = &this->rses_backend_ref[i]; + DCB* dcb = bref->bref_dcb; + /** Close those which had been connected */ + if (BREF_IS_IN_USE(bref)) + { + CHK_DCB(dcb); + + /** Clean operation counter in bref and in SERVER */ + while (BREF_IS_WAITING_RESULT(bref)) + { + bref_clear_state(bref, BREF_WAITING_RESULT); + } + bref_clear_state(bref, BREF_IN_USE); + bref_set_state(bref, BREF_CLOSED); + /** + * closes protocol and dcb + */ + dcb_close(dcb); + /** decrease server current connection counters */ + atomic_add(&bref->bref_backend->connections, -1); + } + } + + gwbuf_free(this->queue); + + spinlock_acquire(&m_router.lock); + if (m_router.stats.longest_sescmd < this->stats.longest_sescmd) + { + m_router.stats.longest_sescmd = this->stats.longest_sescmd; + } + double ses_time = difftime(time(NULL), this->rses_client_dcb->session->stats.connect); + if (m_router.stats.ses_longest < ses_time) + { + m_router.stats.ses_longest = ses_time; + } + if (m_router.stats.ses_shortest > ses_time && m_router.stats.ses_shortest > 0) + { + m_router.stats.ses_shortest = ses_time; + } + + m_router.stats.ses_average = + (ses_time + ((m_router.stats.sessions - 1) * m_router.stats.ses_average)) / + (m_router.stats.sessions); + + spinlock_release(&m_router.lock); + } +} + +int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) +{ + uint32_t qtype = QUERY_TYPE_UNKNOWN; + uint8_t packet_type; + uint8_t* packet; + int ret = 0; + DCB* target_dcb = NULL; + SchemaRouter& inst = this->m_router; + SchemaRouterSession& router_cli_ses = *this; + bool change_successful = false; + route_target_t route_target = TARGET_UNDEFINED; + bool succp = false; + char db[MYSQL_DATABASE_MAXLEN + 1]; + char errbuf[26 + MYSQL_DATABASE_MAXLEN]; + + SERVER* target = NULL; + + ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(pPacket)); + + if (this->closed) + { + return 0; + } + + if (this->shardmap.empty()) + { + /* Generate database list */ + gen_databaselist(); + } + + /** + * If the databases are still being mapped or if the client connected + * with a default database but no database mapping was performed we need + * to store the query. Once the databases have been mapped and/or the + * default database is taken into use we can send the query forward. + */ + if (this->state & (INIT_MAPPING | INIT_USE_DB)) + { + int init_rval = 1; + char* querystr = modutil_get_SQL(pPacket); + MXS_INFO("Storing query for session %p: %s", + this->rses_client_dcb->session, + querystr); + MXS_FREE(querystr); + pPacket = gwbuf_make_contiguous(pPacket); + GWBUF* ptr = this->queue; + + while (ptr && ptr->next) + { + ptr = ptr->next; + } + + if (ptr == NULL) + { + this->queue = pPacket; + } + else + { + ptr->next = pPacket; + + } + + if (this->state == (INIT_READY | INIT_USE_DB)) + { + /** + * This state is possible if a client connects with a default database + * and the shard map was found from the router cache + */ + if (!handle_default_db()) + { + init_rval = 0; + } + } + + return init_rval; + } + + packet = GWBUF_DATA(pPacket); + packet_type = packet[4]; + qc_query_op_t op = QUERY_OP_UNDEFINED; + + if (detect_show_shards(pPacket)) + { + process_show_shards(); + gwbuf_free(pPacket); + return 1; + } + + switch (packet_type) + { + case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */ + case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */ + case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */ + case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */ + case MYSQL_COM_PING: /*< 0e all servers are pinged */ + case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */ + case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */ + case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */ + case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */ + qtype = QUERY_TYPE_SESSION_WRITE; + break; + + case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */ + case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */ + qtype = QUERY_TYPE_WRITE; + break; + + case MYSQL_COM_QUERY: + qtype = qc_get_type_mask(pPacket); + op = qc_get_operation(pPacket); + break; + + case MYSQL_COM_STMT_PREPARE: + qtype = qc_get_type_mask(pPacket); + qtype |= QUERY_TYPE_PREPARE_STMT; + break; + + case MYSQL_COM_STMT_EXECUTE: + /** Parsing is not needed for this type of packet */ + qtype = QUERY_TYPE_EXEC_STMT; + break; + + case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */ + case MYSQL_COM_STATISTICS: /**< 9 ? */ + case MYSQL_COM_PROCESS_INFO: /**< 0a ? */ + case MYSQL_COM_CONNECT: /**< 0b ? */ + case MYSQL_COM_PROCESS_KILL: /**< 0c ? */ + case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */ + case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */ + case MYSQL_COM_DAEMON: /**< 1d ? */ + default: + break; + } + + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) + { + char *sql; + int sql_len; + char* qtypestr = qc_typemask_to_string(qtype); + modutil_extract_SQL(pPacket, &sql, &sql_len); + + MXS_INFO("> Command: %s, stmt: %.*s %s%s", + STRPACKETTYPE(packet_type), sql_len, sql, + (pPacket->hint == NULL ? "" : ", Hint:"), + (pPacket->hint == NULL ? "" : STRHINTTYPE(pPacket->hint->type))); + + MXS_FREE(qtypestr); + } + /** + * Find out whether the query should be routed to single server or to + * all of them. + */ + + if (packet_type == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB) + { + change_successful = change_current_db(this->current_db, + this->shardmap, + pPacket); + if (!change_successful) + { + extract_database(pPacket, db); + snprintf(errbuf, 25 + MYSQL_DATABASE_MAXLEN, "Unknown database: %s", db); + + if (this->rses_config.debug) + { + sprintf(errbuf + strlen(errbuf), + " ([%lu]: DB change failed)", + this->rses_client_dcb->session->ses_id); + } + + write_error_to_client(this->rses_client_dcb, + SCHEMA_ERR_DBNOTFOUND, + SCHEMA_ERRSTR_DBNOTFOUND, + errbuf); + + MXS_ERROR("Changing database failed."); + gwbuf_free(pPacket); + return 1; + } + } + + /** Create the response to the SHOW DATABASES from the mapped databases */ + if (qc_query_is_type(qtype, QUERY_TYPE_SHOW_DATABASES)) + { + if (send_database_list()) + { + ret = 1; + } + + gwbuf_free(pPacket); + return ret; + } + + route_target = get_shard_route_target(qtype); + + if (packet_type == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB) + { + route_target = TARGET_UNDEFINED; + target = this->shardmap.get_location(this->current_db); + + if (target) + { + MXS_INFO("INIT_DB for database '%s' on server '%s'", + this->current_db.c_str(), target->unique_name); + route_target = TARGET_NAMED_SERVER; + } + else + { + MXS_INFO("INIT_DB with unknown database"); + } + } + else if (route_target != TARGET_ALL) + { + /** If no database is found in the query and there is no active database + * or hints in the query we route the query to the first available + * server. This isn't ideal for monitoring server status but works if + * we just want the server to send an error back. */ + + target = get_shard_target(pPacket, qtype); + + if (target) + { + if (SERVER_IS_RUNNING(target)) + { + route_target = TARGET_NAMED_SERVER; + } + else + { + MXS_INFO("Backend server '%s' is not in a viable state", target->unique_name); + + /** + * Shard is not a viable target right now so we check + * for an alternate backend with the database. If this is not found + * the target is undefined and an error will be returned to the client. + */ + } + } + } + + if (TARGET_IS_UNDEFINED(route_target)) + { + target = get_shard_target(pPacket, qtype); + + if ((target == NULL && + packet_type != MYSQL_COM_INIT_DB && + this->current_db.length() == 0) || + packet_type == MYSQL_COM_FIELD_LIST || + (this->current_db.length() == 0)) + { + /** + * No current database and no databases in query or + * the database is ignored, route to first available backend. + */ + + route_target = TARGET_ANY; + MXS_INFO("Routing query to first available backend."); + + } + else + { + if (!change_successful) + { + /** + * Bad shard status. The changing of the database + * was not successful and the error message was already sent. + */ + + ret = 1; + } + else + { + MXS_ERROR("Error : Router internal failure (schemarouter)"); + /** Something else went wrong, terminate connection */ + ret = 0; + } + + gwbuf_free(pPacket); + return ret; + } + } + + if (TARGET_IS_ALL(route_target)) + { + /** + * It is not sure if the session command in question requires + * response. Statement is examined in route_session_write. + * Router locking is done inside the function. + */ + succp = route_session_write(pPacket, packet_type); + + if (succp) + { + atomic_add(&m_router.stats.n_sescmd, 1); + atomic_add(&m_router.stats.n_queries, 1); + ret = 1; + } + + gwbuf_free(pPacket); + return ret; + } + + if (TARGET_IS_ANY(route_target)) + { + for (int i = 0; i < this->rses_nbackends; i++) + { + SERVER *server = this->rses_backend_ref[i].bref_backend->server; + if (SERVER_IS_RUNNING(server)) + { + route_target = TARGET_NAMED_SERVER; + target = server; + break; + } + } + + if (TARGET_IS_ANY(route_target)) + { + /**No valid backends alive*/ + MXS_ERROR("Failed to route query, no backends are available."); + gwbuf_free(pPacket); + return 0; + } + + } + + /** + * Query is routed to one of the backends + */ + if (TARGET_IS_NAMED_SERVER(route_target) && target) + { + /** + * Search backend server by name or replication lag. + * If it fails, then try to find valid slave or master. + */ + + succp = get_shard_dcb(&target_dcb, target->unique_name); + + if (!succp) + { + MXS_INFO("Was supposed to route to named server " + "%s but couldn't find the server in a " + "suitable state.", target->unique_name); + } + + } + + if (succp) /*< Have DCB of the target backend */ + { + backend_ref_t *bref = get_bref_from_dcb(target_dcb); + + MXS_INFO("Route query to \t%s:%d <", + bref->bref_backend->server->name, + bref->bref_backend->server->port); + /** + * Store current stmt if execution of previous session command + * haven't completed yet. Note that according to MySQL protocol + * there can only be one such non-sescmd stmt at the time. + */ + if (bref->session_commands.size() > 0) + { + ss_dassert((bref->bref_pending_cmd == NULL || + this->closed)); + bref->bref_pending_cmd = pPacket; + return 1; + } + + if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(pPacket))) == 1) + { + backend_ref_t* bref; + + atomic_add(&m_router.stats.n_queries, 1); + + /** + * Add one query response waiter to backend reference + */ + bref = get_bref_from_dcb(target_dcb); + bref_set_state(bref, BREF_QUERY_ACTIVE); + bref_set_state(bref, BREF_WAITING_RESULT); + } + else + { + MXS_ERROR("Routing query failed."); + } + } + + gwbuf_free(pPacket); + + return ret; +} + +void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) +{ + backend_ref_t* bref; + GWBUF* writebuf = pPacket; + + /** + * Lock router client session for secure read of router session members. + * Note that this could be done without lock by using version # + */ + if (this->closed) + { + gwbuf_free(pPacket); + return; + } + + /** Holding lock ensures that router session remains open */ + ss_dassert(pDcb->session != NULL); + DCB *client_dcb = pDcb->session->client_dcb; + + bref = get_bref_from_dcb(pDcb); + + if (bref == NULL) + { + gwbuf_free(writebuf); + return; + } + + MXS_DEBUG("Reply from [%s] session [%p]" + " mapping [%s] queries queued [%s]", + bref->bref_backend->server->unique_name, + this->rses_client_dcb->session, + this->state & INIT_MAPPING ? "true" : "false", + this->queue == NULL ? "none" : + this->queue->next ? "multiple" : "one"); + + + + if (this->state & INIT_MAPPING) + { + int rc = inspect_backend_mapping_states(bref, &writebuf); + gwbuf_free(writebuf); + writebuf = NULL; + + if (rc == 1) + { + synchronize_shard_map(); + + /* + * Check if the session is reconnecting with a database name + * that is not in the hashtable. If the database is not found + * then close the session. + */ + this->state &= ~INIT_MAPPING; + + if (this->state & INIT_USE_DB) + { + bool success = handle_default_db(); + if (!success) + { + dcb_close(this->rses_client_dcb); + } + return; + } + + if (this->queue) + { + ss_dassert(this->state == INIT_READY); + route_queued_query(); + } + } + + if (rc == -1) + { + dcb_close(this->rses_client_dcb); + } + return; + } + + if (this->state & INIT_USE_DB) + { + MXS_DEBUG("Reply to USE '%s' received for session %p", + this->connect_db.c_str(), + this->rses_client_dcb->session); + this->state &= ~INIT_USE_DB; + this->current_db = this->connect_db; + ss_dassert(this->state == INIT_READY); + + if (this->queue) + { + route_queued_query(); + } + + gwbuf_free(writebuf); + return; + } + + if (this->queue) + { + ss_dassert(this->state == INIT_READY); + route_queued_query(); + return; + } + + + + /** + * Active cursor means that reply is from session command + * execution. + */ + if (bref->session_commands.size() > 0) + { + if (GWBUF_IS_TYPE_SESCMD_RESPONSE(writebuf)) + { + /** + * Discard all those responses that have already been sent to + * the client. Return with buffer including response that + * needs to be sent to client or NULL. + */ + if (this->replied_sescmd < this->sent_sescmd && + bref->session_commands.front().get_position() == this->replied_sescmd + 1) + { + ++this->replied_sescmd; + } + else + { + /** The reply to this session command has already been sent + * to the client. */ + gwbuf_free(writebuf); + writebuf = NULL; + } + bref->session_commands.pop_front(); + } + /** + * If response will be sent to client, decrease waiter count. + * This applies to session commands only. Counter decrement + * for other type of queries is done outside this block. + */ + if (writebuf != NULL && client_dcb != NULL) + { + /** Set response status as replied */ + bref_clear_state(bref, BREF_WAITING_RESULT); + } + } + /** + * Clear BREF_QUERY_ACTIVE flag and decrease waiter counter. + * This applies for queries other than session commands. + */ + else if (BREF_IS_QUERY_ACTIVE(bref)) + { + bref_clear_state(bref, BREF_QUERY_ACTIVE); + /** Set response status as replied */ + bref_clear_state(bref, BREF_WAITING_RESULT); + } + + if (writebuf != NULL && client_dcb != NULL) + { + unsigned char* cmd = (unsigned char*) writebuf->start; + int state = this->state; + /** Write reply to client DCB */ + MXS_INFO("returning reply [%s] " + "state [%s] session [%p]", + PTR_IS_ERR(cmd) ? "ERR" : PTR_IS_OK(cmd) ? "OK" : "RSET", + state & INIT_UNINT ? "UNINIT" : state & INIT_MAPPING ? "MAPPING" : "READY", + this->rses_client_dcb->session); + MXS_SESSION_ROUTE_REPLY(pDcb->session, writebuf); + } + + /** There is one pending session command to be executed. */ + if (bref->session_commands.size() > 0) + { + + MXS_INFO("Backend %s:%d processed reply and starts to execute " + "active cursor.", + bref->bref_backend->server->name, + bref->bref_backend->server->port); + + execute_sescmd_in_backend(bref); + } + else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */ + { + int ret; + + CHK_GWBUF(bref->bref_pending_cmd); + + if ((ret = bref->bref_dcb->func.write(bref->bref_dcb, + gwbuf_clone(bref->bref_pending_cmd))) == 1) + { + atomic_add(&this->m_router.stats.n_queries, 1); + /** + * Add one query response waiter to backend reference + */ + bref_set_state(bref, BREF_QUERY_ACTIVE); + bref_set_state(bref, BREF_WAITING_RESULT); + } + else + { + char* sql = modutil_get_SQL(bref->bref_pending_cmd); + + if (sql) + { + MXS_ERROR("Routing query \"%s\" failed.", sql); + MXS_FREE(sql); + } + else + { + MXS_ERROR("Routing query failed."); + } + } + gwbuf_free(bref->bref_pending_cmd); + bref->bref_pending_cmd = NULL; + } +} + +void SchemaRouterSession::handleError(GWBUF* pMessage, + DCB* pProblem, + mxs_error_action_t action, + bool* pSuccess) +{ + ss_dassert(pProblem->dcb_role == DCB_ROLE_BACKEND_HANDLER); + CHK_DCB(pProblem); + MXS_SESSION *session = pProblem->session; + ss_dassert(session); + + CHK_SESSION(session); + + switch (action) + { + case ERRACT_NEW_CONNECTION: + *pSuccess = handle_error_new_connection(pProblem, pMessage); + break; + + case ERRACT_REPLY_CLIENT: + handle_error_reply_client(pProblem, pMessage); + *pSuccess = false; /*< no new backend servers were made available */ + break; + + default: + *pSuccess = false; + break; + } + + dcb_close(pProblem); +} + +/** + * Private functions + */ + + +/** + * Synchronize the router client session shard map with the global shard map for + * this user. + * + * If the router doesn't have a shard map for this user then the current shard map + * of the client session is added to the router. If the shard map in the router is + * out of date, its contents are replaced with the contents of the current client + * session. If the router has a usable shard map, the current shard map of the client + * is discarded and the router's shard map is used. + * @param client Router session + */ +void SchemaRouterSession::synchronize_shard_map() +{ + m_router.stats.shmap_cache_miss++; + m_router.shard_manager.update_shard(this->shardmap, this->rses_client_dcb->user); +} + +/** + * Extract the database name from a COM_INIT_DB or literal USE ... query. + * @param buf Buffer with the database change query + * @param str Pointer where the database name is copied + * @return True for success, false for failure + */ +bool extract_database(GWBUF* buf, char* str) +{ + uint8_t* packet; + char *saved, *tok, *query = NULL; + bool succp = true; + unsigned int plen; + + packet = GWBUF_DATA(buf); + plen = gw_mysql_get_byte3(packet) - 1; + + /** Copy database name from MySQL packet to session */ + if (qc_get_operation(buf) == QUERY_OP_CHANGE_DB) + { + const char *delim = "` \n\t;"; + + query = modutil_get_SQL(buf); + tok = strtok_r(query, delim, &saved); + + if (tok == NULL || strcasecmp(tok, "use") != 0) + { + MXS_ERROR("extract_database: Malformed chage database packet."); + succp = false; + goto retblock; + } + + tok = strtok_r(NULL, delim, &saved); + + if (tok == NULL) + { + MXS_ERROR("extract_database: Malformed change database packet."); + succp = false; + goto retblock; + } + + strncpy(str, tok, MYSQL_DATABASE_MAXLEN); + } + else + { + memcpy(str, packet + 5, plen); + memset(str + plen, 0, 1); + } +retblock: + MXS_FREE(query); + return succp; +} + +/** + * If session command cursor is passive, sends the command to backend for + * execution. + * + * Returns true if command was sent or added successfully to the queue. + * Returns false if command sending failed or if there are no pending session + * commands. + * + * Router session must be locked. + */ +bool SchemaRouterSession::execute_sescmd_in_backend(backend_ref_t* backend_ref) +{ + if (BREF_IS_CLOSED(backend_ref)) + { + return false; + } + + DCB *dcb = backend_ref->bref_dcb; + + CHK_DCB(dcb); + + int rc = 0; + + /** Return if there are no pending ses commands */ + if (backend_ref->session_commands.size() == 0) + { + MXS_INFO("Cursor had no pending session commands."); + return false; + } + + SessionCommandList::iterator iter = backend_ref->session_commands.begin(); + GWBUF *buffer = iter->copy_buffer().release(); + + switch (iter->get_command()) + { + case MYSQL_COM_CHANGE_USER: + /** This makes it possible to handle replies correctly */ + gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD); + rc = dcb->func.auth(dcb, NULL, dcb->session, buffer); + break; + + case MYSQL_COM_QUERY: + default: + /** + * Mark session command buffer, it triggers writing + * MySQL command to protocol + */ + gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD); + rc = dcb->func.write(dcb, buffer); + break; + } + + return rc == 1; +} + +/** + * Execute in backends used by current router session. + * Save session variable commands to router session property + * struct. Thus, they can be replayed in backends which are + * started and joined later. + * + * Suppress redundant OK packets sent by backends. + * + * The first OK packet is replied to the client. + * Return true if succeed, false is returned if router session was closed or + * if execute_sescmd_in_backend failed. + */ +bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) +{ + bool succp = false; + backend_ref_t *backend_ref = this->rses_backend_ref; + + MXS_INFO("Session write, routing to all servers."); + atomic_add(&this->stats.longest_sescmd, 1); + + /** Increment the session command count */ + ++this->sent_sescmd; + + for (int i = 0; i < this->rses_nbackends; i++) + { + if (BREF_IS_IN_USE((&backend_ref[i]))) + { + GWBUF *buffer = gwbuf_clone(querybuf); + backend_ref[i].session_commands.push_back(SessionCommand(buffer, this->sent_sescmd)); + + if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) + { + MXS_INFO("Route query to %s\t%s:%d%s", + (SERVER_IS_MASTER(backend_ref[i].bref_backend->server) ? + "master" : "slave"), + backend_ref[i].bref_backend->server->name, + backend_ref[i].bref_backend->server->port, + (i + 1 == this->rses_nbackends ? " <" : "")); + } + + if (backend_ref[i].session_commands.size() == 1) + { + /** Only one command, execute it */ + switch (command) + { + /** These types of commands don't generate responses */ + case MYSQL_COM_QUIT: + case MYSQL_COM_STMT_CLOSE: + break; + + default: + bref_set_state(&backend_ref[i], BREF_WAITING_RESULT); + break; + } + + if (execute_sescmd_in_backend(&backend_ref[i])) + { + succp = true; + } + else + { + MXS_ERROR("Failed to execute session " + "command in %s:%d", + backend_ref[i].bref_backend->server->name, + backend_ref[i].bref_backend->server->port); + } + } + else + { + ss_dassert(backend_ref[i].session_commands.size() > 1); + /** The server is already executing a session command */ + MXS_INFO("Backend %s:%d already executing sescmd.", + backend_ref[i].bref_backend->server->name, + backend_ref[i].bref_backend->server->port); + succp = true; + } + } + } + + return succp; +} + +void SchemaRouterSession::handle_error_reply_client(DCB* dcb, GWBUF* errmsg) +{ + backend_ref_t* bref = get_bref_from_dcb(dcb); + + if (bref) + { + + bref_clear_state(bref, BREF_IN_USE); + bref_set_state(bref, BREF_CLOSED); + } + + if (dcb->session->state == SESSION_STATE_ROUTER_READY) + { + dcb->session->client_dcb->func.write(dcb->session->client_dcb, gwbuf_clone(errmsg)); + } +} + +/** + * Check if a router session has servers in use + * @param rses Router client session + * @return True if session has a single backend server in use that is running. + * False if no backends are in use or running. + */ +bool SchemaRouterSession::have_servers() +{ + for (int i = 0; i < this->rses_nbackends; i++) + { + if (BREF_IS_IN_USE(&this->rses_backend_ref[i]) && + !BREF_IS_CLOSED(&this->rses_backend_ref[i])) + { + return true; + } + } + + return false; +} + +/** + * Check if there is backend reference pointing at failed DCB, and reset its + * flags. Then clear DCB's callback and finally try to reconnect. + * + * This must be called with router lock. + * + * @param inst router instance + * @param rses router client session + * @param dcb failed DCB + * @param errmsg error message which is sent to client if it is waiting + * + * @return true if there are enough backend connections to continue, false if not + */ +bool SchemaRouterSession::handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg) +{ + backend_ref_t* bref; + + MXS_SESSION *ses = backend_dcb->session; + CHK_SESSION(ses); + + /** + * If bref == NULL it has been replaced already with another one. + */ + if ((bref = get_bref_from_dcb(backend_dcb)) == NULL) + { + /** This should not happen */ + ss_dassert(false); + return false; + } + + /** + * If query was sent through the bref and it is waiting for reply from + * the backend server it is necessary to send an error to the client + * because it is waiting for reply. + */ + if (BREF_IS_WAITING_RESULT(bref)) + { + DCB* client_dcb; + client_dcb = ses->client_dcb; + client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); + bref_clear_state(bref, BREF_WAITING_RESULT); + } + bref_clear_state(bref, BREF_IN_USE); + bref_set_state(bref, BREF_CLOSED); + + return have_servers(); +} + +/** + * Finds out if there is a backend reference pointing at the DCB given as + * parameter. + * @param rses router client session + * @param dcb DCB + * + * @return backend reference pointer if succeed or NULL + */ +backend_ref_t* SchemaRouterSession::get_bref_from_dcb(DCB* dcb) +{ + CHK_DCB(dcb); + + for (int i = 0; i < this->rses_nbackends; i++) + { + if (this->rses_backend_ref[i].bref_dcb == dcb) + { + return &this->rses_backend_ref[i]; + } + } + + return NULL; +} + +/** + * Detect if a query contains a SHOW SHARDS query. + * @param query Query to inspect + * @return true if the query is a SHOW SHARDS query otherwise false + */ +bool detect_show_shards(GWBUF* query) +{ + bool rval = false; + char *querystr, *tok, *sptr; + + if (query == NULL) + { + MXS_ERROR("NULL value passed at %s:%d", __FILE__, __LINE__); + return false; + } + + if (!modutil_is_SQL(query) && !modutil_is_SQL_prepare(query)) + { + return false; + } + + if ((querystr = modutil_get_SQL(query)) == NULL) + { + MXS_ERROR("Failure to parse SQL at %s:%d", __FILE__, __LINE__); + return false; + } + + tok = strtok_r(querystr, " ", &sptr); + if (tok && strcasecmp(tok, "show") == 0) + { + tok = strtok_r(NULL, " ", &sptr); + if (tok && strcasecmp(tok, "shards") == 0) + { + rval = true; + } + } + + MXS_FREE(querystr); + return rval; +} + +/** + * Callback for the shard list result set creation + */ +RESULT_ROW* shard_list_cb(struct resultset* rset, void* data) +{ + ServerMap* pContent = (ServerMap*)data; + RESULT_ROW* rval = resultset_make_row(rset); + + if (rval) + { + resultset_row_set(rval, 0, pContent->begin()->first.c_str()); + resultset_row_set(rval, 1, pContent->begin()->second->unique_name); + pContent->erase(pContent->begin()); + } + + return rval; +} + +/** + * Send a result set of all shards and their locations to the client. + * @param rses Router client session + * @return 0 on success, -1 on error + */ +int SchemaRouterSession::process_show_shards() +{ + int rval = -1; + + ServerMap pContent; + this->shardmap.get_content(pContent); + RESULTSET* rset = resultset_create(shard_list_cb, &pContent); + + if (rset) + { + resultset_add_column(rset, "Database", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); + resultset_add_column(rset, "Server", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); + resultset_stream_mysql(rset, this->rses_client_dcb); + resultset_free(rset); + rval = 0; + } + + return rval; +} + +/** + * + * @param dcb + * @param errnum + * @param mysqlstate + * @param errmsg + */ +void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg) +{ + GWBUF* errbuff = modutil_create_mysql_err_msg(1, 0, errnum, mysqlstate, errmsg); + if (errbuff) + { + if (dcb->func.write(dcb, errbuff) != 1) + { + MXS_ERROR("Failed to write error packet to client."); + } + } + else + { + MXS_ERROR("Memory allocation failed when creating error packet."); + } +} + +/** + * + * @param router_cli_ses + * @return + */ +bool SchemaRouterSession::handle_default_db() +{ + bool rval = false; + SERVER* target = this->shardmap.get_location(this->connect_db); + + if (target) + { + /* Send a COM_INIT_DB packet to the server with the right database + * and set it as the client's active database */ + + unsigned int qlen = this->connect_db.length(); + GWBUF* buffer = gwbuf_alloc(qlen + 5); + + if (buffer) + { + uint8_t *data = GWBUF_DATA(buffer); + gw_mysql_set_byte3(data, qlen + 1); + gwbuf_set_type(buffer, GWBUF_TYPE_MYSQL); + data[3] = 0x0; + data[4] = 0x2; + memcpy(data + 5, this->connect_db.c_str(), qlen); + DCB* dcb = NULL; + + if (get_shard_dcb(&dcb, target->unique_name)) + { + dcb->func.write(dcb, buffer); + MXS_DEBUG("USE '%s' sent to %s for session %p", + this->connect_db.c_str(), + target->unique_name, + this->rses_client_dcb->session); + rval = true; + } + else + { + MXS_INFO("Couldn't find target DCB for '%s'.", target->unique_name); + } + } + else + { + MXS_ERROR("Buffer allocation failed."); + } + } + else + { + /** Unknown database, hang up on the client*/ + MXS_INFO("Connecting to a non-existent database '%s'", this->connect_db.c_str()); + char errmsg[128 + MYSQL_DATABASE_MAXLEN + 1]; + sprintf(errmsg, "Unknown database '%s'", this->connect_db.c_str()); + if (this->rses_config.debug) + { + sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)", + this->rses_client_dcb->session->ses_id); + } + write_error_to_client(this->rses_client_dcb, + SCHEMA_ERR_DBNOTFOUND, + SCHEMA_ERRSTR_DBNOTFOUND, + errmsg); + } + + return rval; +} + +void SchemaRouterSession::route_queued_query() +{ + GWBUF* tmp = this->queue; + this->queue = this->queue->next; + tmp->next = NULL; +#ifdef SS_DEBUG + char* querystr = modutil_get_SQL(tmp); + MXS_DEBUG("Sending queued buffer for session %p: %s", + this->rses_client_dcb->session, + querystr); + MXS_FREE(querystr); +#endif + poll_add_epollin_event_to_dcb(this->rses_client_dcb, tmp); +} + +/** + * + * @param router_cli_ses Router client session + * @return 1 if mapping is done, 0 if it is still ongoing and -1 on error + */ +int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref, + GWBUF** wbuf) +{ + bool mapped = true; + GWBUF* writebuf = *wbuf; + backend_ref_t* bkrf = this->rses_backend_ref; + + for (int i = 0; i < this->rses_nbackends; i++) + { + if (bref->bref_dcb == bkrf[i].bref_dcb && !BREF_IS_MAPPED(&bkrf[i])) + { + if (bref->map_queue) + { + writebuf = gwbuf_append(bref->map_queue, writebuf); + bref->map_queue = NULL; + } + showdb_response_t rc = parse_showdb_response(&this->rses_backend_ref[i], + &writebuf); + if (rc == SHOWDB_FULL_RESPONSE) + { + this->rses_backend_ref[i].bref_mapped = true; + MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p", + this->rses_backend_ref[i].bref_backend->server->unique_name, + this->rses_client_dcb->session); + } + else if (rc == SHOWDB_PARTIAL_RESPONSE) + { + bref->map_queue = writebuf; + writebuf = NULL; + MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p", + this->rses_backend_ref[i].bref_backend->server->unique_name, + this->rses_client_dcb->session); + } + else + { + DCB* client_dcb = NULL; + + if ((this->state & INIT_FAILED) == 0) + { + if (rc == SHOWDB_DUPLICATE_DATABASES) + { + MXS_ERROR("Duplicate databases found, closing session."); + } + else + { + MXS_ERROR("Fatal error when processing SHOW DATABASES response, closing session."); + } + client_dcb = this->rses_client_dcb; + + /** This is the first response to the database mapping which + * has duplicate database conflict. Set the initialization bitmask + * to INIT_FAILED */ + this->state |= INIT_FAILED; + + /** Send the client an error about duplicate databases + * if there is a queued query from the client. */ + if (this->queue) + { + GWBUF* error = modutil_create_mysql_err_msg(1, 0, + SCHEMA_ERR_DUPLICATEDB, + SCHEMA_ERRSTR_DUPLICATEDB, + "Error: duplicate databases " + "found on two different shards."); + + if (error) + { + client_dcb->func.write(client_dcb, error); + } + else + { + MXS_ERROR("Creating buffer for error message failed."); + } + } + } + *wbuf = writebuf; + return -1; + } + } + + if (BREF_IS_IN_USE(&bkrf[i]) && !BREF_IS_MAPPED(&bkrf[i])) + { + mapped = false; + MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p", + bkrf[i].bref_backend->server->unique_name, + this->rses_client_dcb->session); + } + } + *wbuf = writebuf; + return mapped ? 1 : 0; +} + +/** + * Create a fake error message from a DCB. + * @param fail_str Custom error message + * @param dcb DCB to use as the origin of the error + */ +void create_error_reply(char* fail_str, DCB* dcb) +{ + MXS_INFO("change_current_db: failed to change database: %s", fail_str); + GWBUF* errbuf = modutil_create_mysql_err_msg(1, 0, 1049, "42000", fail_str); + + if (errbuf == NULL) + { + MXS_ERROR("Creating buffer for error message failed."); + return; + } + /** Set flags that help router to identify session commands reply */ + gwbuf_set_type(errbuf, GWBUF_TYPE_MYSQL); + gwbuf_set_type(errbuf, GWBUF_TYPE_SESCMD_RESPONSE); + gwbuf_set_type(errbuf, GWBUF_TYPE_RESPONSE_END); + + poll_add_epollin_event_to_dcb(dcb, errbuf); +} + +/** + * Read new database name from MYSQL_COM_INIT_DB packet or a literal USE ... COM_QUERY + * packet, check that it exists in the hashtable and copy its name to MYSQL_session. + * + * @param dest Destination where the database name will be written + * @param dbhash Hashtable containing valid databases + * @param buf Buffer containing the database change query + * + * @return true if new database is set, false if non-existent database was tried + * to be set + */ +bool change_current_db(string& dest, Shard& shard, GWBUF* buf) +{ + bool succp = false; + char db[MYSQL_DATABASE_MAXLEN + 1]; + + if (GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5) + { + /** Copy database name from MySQL packet to session */ + if (extract_database(buf, db)) + { + MXS_INFO("change_current_db: INIT_DB with database '%s'", db); + /** + * Update the session's active database only if it's in the hashtable. + * If it isn't found, send a custom error packet to the client. + */ + + SERVER* target = shard.get_location(db); + + if (target) + { + dest = db; + MXS_INFO("change_current_db: database is on server: '%s'.", target->unique_name); + succp = true; + } + } + } + else + { + MXS_ERROR("change_current_db: failed to change database: Query buffer too large"); + } + + return succp; +} /** * Convert a length encoded string into a C string. @@ -152,7 +1592,7 @@ char* get_lenenc_str(void* data) * @return 1 if a complete response was received, 0 if a partial response was received * and -1 if a database was found on more than one server. */ -showdb_response_t parse_showdb_response(SCHEMAROUTER_SESSION* rses, backend_ref_t* bref, GWBUF** buffer) +showdb_response_t SchemaRouterSession::parse_showdb_response(backend_ref_t* bref, GWBUF** buffer) { unsigned char* ptr; SERVER* target = bref->bref_backend->server; @@ -211,25 +1651,25 @@ showdb_response_t parse_showdb_response(SCHEMAROUTER_SESSION* rses, backend_ref_ if (data) { - if (rses->shardmap.add_location(data, target)) + if (this->shardmap.add_location(data, target)) { MXS_INFO("<%s, %s>", target->unique_name, data); } else { - if (!(rses->router->ignored_dbs.find(data) != rses->router->ignored_dbs.end() || - (rses->router->ignore_regex && - pcre2_match(rses->router->ignore_regex, (PCRE2_SPTR)data, + if (!(this->m_router.ignored_dbs.find(data) != this->m_router.ignored_dbs.end() || + (this->m_router.ignore_regex && + pcre2_match(this->m_router.ignore_regex, (PCRE2_SPTR)data, PCRE2_ZERO_TERMINATED, 0, 0, - rses->router->ignore_match_data, NULL) >= 0))) + this->m_router.ignore_match_data, NULL) >= 0))) { duplicate_found = true; - SERVER *duplicate = rses->shardmap.get_location(data); + SERVER *duplicate = this->shardmap.get_location(data); MXS_ERROR("Database '%s' found on servers '%s' and '%s' for user %s@%s.", data, target->unique_name, duplicate->unique_name, - rses->rses_client_dcb->user, - rses->rses_client_dcb->remote); + this->rses_client_dcb->user, + this->rses_client_dcb->remote); } } MXS_FREE(data); @@ -272,7 +1712,7 @@ showdb_response_t parse_showdb_response(SCHEMAROUTER_SESSION* rses, backend_ref_ * @param session Router client session * @return 1 if all writes to backends were succesful and 0 if one or more errors occurred */ -int gen_databaselist(SCHEMAROUTER* inst, SCHEMAROUTER_SESSION* session) +int SchemaRouterSession::gen_databaselist() { DCB* dcb; const char* query = "SHOW DATABASES"; @@ -280,14 +1720,14 @@ int gen_databaselist(SCHEMAROUTER* inst, SCHEMAROUTER_SESSION* session) int i, rval = 0; unsigned int len; - for (i = 0; i < session->rses_nbackends; i++) + for (i = 0; i < this->rses_nbackends; i++) { - session->rses_backend_ref[i].bref_mapped = false; - session->rses_backend_ref[i].n_mapping_eof = 0; + this->rses_backend_ref[i].bref_mapped = false; + this->rses_backend_ref[i].n_mapping_eof = 0; } - session->state |= INIT_MAPPING; - session->state &= ~INIT_UNINT; + this->state |= INIT_MAPPING; + this->state &= ~INIT_UNINT; len = strlen(query) + 1; buffer = gwbuf_alloc(len + 4); uint8_t *data = GWBUF_DATA(buffer); @@ -298,18 +1738,18 @@ int gen_databaselist(SCHEMAROUTER* inst, SCHEMAROUTER_SESSION* session) *(data + 4) = 0x03; memcpy(data + 5, query, strlen(query)); - for (i = 0; i < session->rses_nbackends; i++) + for (i = 0; i < this->rses_nbackends; i++) { - if (BREF_IS_IN_USE(&session->rses_backend_ref[i]) && - !BREF_IS_CLOSED(&session->rses_backend_ref[i]) & - SERVER_IS_RUNNING(session->rses_backend_ref[i].bref_backend->server)) + if (BREF_IS_IN_USE(&this->rses_backend_ref[i]) && + !BREF_IS_CLOSED(&this->rses_backend_ref[i]) & + SERVER_IS_RUNNING(this->rses_backend_ref[i].bref_backend->server)) { clone = gwbuf_clone(buffer); - dcb = session->rses_backend_ref[i].bref_dcb; + dcb = this->rses_backend_ref[i].bref_dcb; rval |= !dcb->func.write(dcb, clone); MXS_DEBUG("Wrote SHOW DATABASES to %s for session %p: returned %d", - session->rses_backend_ref[i].bref_backend->server->unique_name, - session->rses_client_dcb->session, + this->rses_backend_ref[i].bref_backend->server->unique_name, + this->rses_client_dcb->session, rval); } } @@ -324,10 +1764,7 @@ int gen_databaselist(SCHEMAROUTER* inst, SCHEMAROUTER_SESSION* session) * @param buffer Query to inspect * @return Name of the backend or NULL if the query contains no known databases. */ -SERVER* get_shard_target(SCHEMAROUTER* router, - SCHEMAROUTER_SESSION* client, - GWBUF* buffer, - uint32_t qtype) +SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) { SERVER *rval = NULL; bool has_dbs = false; /**If the query targets any database other than the current one*/ @@ -346,7 +1783,7 @@ SERVER* get_shard_target(SCHEMAROUTER* router, } else { - SERVER* target = client->shardmap.get_location(info[i].database); + SERVER* target = this->shardmap.get_location(info[i].database); if (target) { @@ -383,7 +1820,7 @@ SERVER* get_shard_target(SCHEMAROUTER* router, if (tok) { - rval = client->shardmap.get_location(tok); + rval = this->shardmap.get_location(tok); if (rval) { @@ -395,12 +1832,12 @@ SERVER* get_shard_target(SCHEMAROUTER* router, if (rval == NULL) { - rval = client->shardmap.get_location(client->current_db); + rval = this->shardmap.get_location(this->current_db); if (rval) { MXS_INFO("SHOW TABLES query, current database '%s' on server '%s'", - client->current_db.c_str(), rval->unique_name); + this->current_db.c_str(), rval->unique_name); } } else @@ -410,30 +1847,30 @@ SERVER* get_shard_target(SCHEMAROUTER* router, } else if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER) { - for (int i = 0; i < client->rses_nbackends; i++) + for (int i = 0; i < this->rses_nbackends; i++) { - char *srvnm = client->rses_backend_ref[i].bref_backend->server->unique_name; + char *srvnm = this->rses_backend_ref[i].bref_backend->server->unique_name; if (strcmp(srvnm, (char*)buffer->hint->data) == 0) { - rval = client->rses_backend_ref[i].bref_backend->server; + rval = this->rses_backend_ref[i].bref_backend->server; MXS_INFO("Routing hint found (%s)", rval->unique_name); } } - if (rval == NULL && !has_dbs && client->current_db.length()) + if (rval == NULL && !has_dbs && this->current_db.length()) { /** * If the target name has not been found and the session has an * active database, set is as the target */ - rval = client->shardmap.get_location(client->current_db); + rval = this->shardmap.get_location(this->current_db); if (rval) { MXS_INFO("Using active database '%s' on '%s'", - client->current_db.c_str(), rval->unique_name); + this->current_db.c_str(), rval->unique_name); } } } @@ -450,15 +1887,11 @@ SERVER* get_shard_target(SCHEMAROUTER* router, * much. * * @param p_dcb Address of the pointer to the resulting DCB - * @param rses Pointer to router client session - * @param btype Backend type * @param name Name of the backend which is primarily searched. May be NULL. * * @return True if proper DCB was found, false otherwise. */ -static bool get_shard_dcb(DCB** p_dcb, - SCHEMAROUTER_SESSION* rses, - char* name) +bool SchemaRouterSession::get_shard_dcb(DCB** p_dcb, char* name) { backend_ref_t* backend_ref; int i; @@ -471,9 +1904,9 @@ static bool get_shard_dcb(DCB** p_dcb, { goto return_succp; } - backend_ref = rses->rses_backend_ref; + backend_ref = this->rses_backend_ref; - for (i = 0; i < rses->rses_nbackends; i++) + for (i = 0; i < this->rses_nbackends; i++) { SERVER_REF* b = backend_ref[i].bref_backend; /** @@ -508,7 +1941,7 @@ return_succp: * @return bitfield including the routing target, or the target server name * if the query would otherwise be routed to slave. */ -static route_target_t get_shard_route_target(uint32_t qtype) +route_target_t get_shard_route_target(uint32_t qtype) { route_target_t target = TARGET_UNDEFINED; @@ -571,19 +2004,19 @@ RESULT_ROW *result_set_cb(struct resultset * rset, void *data) * @param client Router client session * @return True if the sending of the database list was successful, otherwise false */ -bool send_database_list(SCHEMAROUTER* router, SCHEMAROUTER_SESSION* client) +bool SchemaRouterSession::send_database_list() { bool rval = false; ServerMap dblist; - client->shardmap.get_content(dblist); + this->shardmap.get_content(dblist); RESULTSET* resultset = resultset_create(result_set_cb, &dblist); if (resultset_add_column(resultset, "Database", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR)) { - resultset_stream_mysql(resultset, client->rses_client_dcb); + resultset_stream_mysql(resultset, this->rses_client_dcb); rval = true; } resultset_free(resultset); @@ -591,47 +2024,7 @@ bool send_database_list(SCHEMAROUTER* router, SCHEMAROUTER_SESSION* client) return rval; } -/** Compare number of connections from this router in backend servers */ -int bref_cmp_router_conn(const void* bref1, const void* bref2) -{ - SERVER_REF* b1 = ((backend_ref_t *)bref1)->bref_backend; - SERVER_REF* b2 = ((backend_ref_t *)bref2)->bref_backend; - - return ((1000 * b1->connections) / b1->weight) - - ((1000 * b2->connections) / b2->weight); -} - -/** Compare number of global connections in backend servers */ -int bref_cmp_global_conn(const void* bref1, const void* bref2) -{ - SERVER_REF* b1 = ((backend_ref_t *)bref1)->bref_backend; - SERVER_REF* b2 = ((backend_ref_t *)bref2)->bref_backend; - - return ((1000 * b1->server->stats.n_current) / b1->weight) - - ((1000 * b2->server->stats.n_current) / b2->weight); -} - - -/** Compare replication lag between backend servers */ -int bref_cmp_behind_master(const void* bref1, const void* bref2) -{ - SERVER_REF* b1 = ((backend_ref_t *)bref1)->bref_backend; - SERVER_REF* b2 = ((backend_ref_t *)bref2)->bref_backend; - - return b1->server->rlag - b2->server->rlag; -} - -/** Compare number of current operations in backend servers */ -int bref_cmp_current_load(const void* bref1, const void* bref2) -{ - SERVER_REF* b1 = ((backend_ref_t *)bref1)->bref_backend; - SERVER_REF* b2 = ((backend_ref_t *)bref2)->bref_backend; - - return ((1000 * b1->server->stats.n_current_ops) - b1->weight) - - ((1000 * b2->server->stats.n_current_ops) - b2->weight); -} - -static void bref_clear_state(backend_ref_t* bref, bref_state_t state) +void bref_clear_state(backend_ref_t* bref, bref_state_t state) { if (bref == NULL) { @@ -670,7 +2063,7 @@ static void bref_clear_state(backend_ref_t* bref, bref_state_t state) } } -static void bref_set_state(backend_ref_t* bref, bref_state_t state) +void bref_set_state(backend_ref_t* bref, bref_state_t state) { if (bref == NULL) { @@ -735,16 +2128,14 @@ static void bref_set_state(backend_ref_t* bref, bref_state_t state) * connections because all servers are supposed to be operational. It is, * however, possible that there are less available servers than expected. */ -static bool connect_backend_servers(backend_ref_t* backend_ref, - int router_nservers, - MXS_SESSION* session, - SCHEMAROUTER* router) +bool connect_backend_servers(backend_ref_t* backend_ref, + int router_nservers, + MXS_SESSION* session) { bool succp = false; int servers_found = 0; int servers_connected = 0; int slaves_connected = 0; - int i; if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) { @@ -845,1905 +2236,3 @@ static bool connect_backend_servers(backend_ref_t* backend_ref, return succp; } - -/** - * If session command cursor is passive, sends the command to backend for - * execution. - * - * Returns true if command was sent or added successfully to the queue. - * Returns false if command sending failed or if there are no pending session - * commands. - * - * Router session must be locked. - */ -static bool execute_sescmd_in_backend(backend_ref_t* backend_ref) -{ - if (BREF_IS_CLOSED(backend_ref)) - { - return false; - } - - DCB *dcb = backend_ref->bref_dcb; - - CHK_DCB(dcb); - - int rc = 0; - - /** Return if there are no pending ses commands */ - if (backend_ref->session_commands.size() == 0) - { - MXS_INFO("Cursor had no pending session commands."); - return false; - } - - SessionCommandList::iterator iter = backend_ref->session_commands.begin(); - GWBUF *buffer = iter->copy_buffer().release(); - - switch (iter->get_command()) - { - case MYSQL_COM_CHANGE_USER: - /** This makes it possible to handle replies correctly */ - gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD); - rc = dcb->func.auth(dcb, NULL, dcb->session, buffer); - break; - - case MYSQL_COM_QUERY: - default: - /** - * Mark session command buffer, it triggers writing - * MySQL command to protocol - */ - gwbuf_set_type(buffer, GWBUF_TYPE_SESCMD); - rc = dcb->func.write(dcb, buffer); - break; - } - - return rc == 1; -} - -/** - * Execute in backends used by current router session. - * Save session variable commands to router session property - * struct. Thus, they can be replayed in backends which are - * started and joined later. - * - * Suppress redundant OK packets sent by backends. - * - * The first OK packet is replied to the client. - * Return true if succeed, false is returned if router session was closed or - * if execute_sescmd_in_backend failed. - */ -static bool route_session_write(SCHEMAROUTER_SESSION* router_cli_ses, - GWBUF* querybuf, - SCHEMAROUTER* inst, - unsigned char packet_type, - uint32_t qtype) -{ - bool succp = false; - backend_ref_t *backend_ref = router_cli_ses->rses_backend_ref; - - MXS_INFO("Session write, routing to all servers."); - atomic_add(&router_cli_ses->stats.longest_sescmd, 1); - - /** Increment the session command count */ - ++router_cli_ses->sent_sescmd; - - for (int i = 0; i < router_cli_ses->rses_nbackends; i++) - { - if (BREF_IS_IN_USE((&backend_ref[i]))) - { - GWBUF *buffer = gwbuf_clone(querybuf); - backend_ref[i].session_commands.push_back(SessionCommand(buffer, router_cli_ses->sent_sescmd)); - - if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) - { - MXS_INFO("Route query to %s\t%s:%d%s", - (SERVER_IS_MASTER(backend_ref[i].bref_backend->server) ? - "master" : "slave"), - backend_ref[i].bref_backend->server->name, - backend_ref[i].bref_backend->server->port, - (i + 1 == router_cli_ses->rses_nbackends ? " <" : "")); - } - - if (backend_ref[i].session_commands.size() == 1) - { - /** Only one command, execute it */ - switch (packet_type) - { - /** These types of commands don't generate responses */ - case MYSQL_COM_QUIT: - case MYSQL_COM_STMT_CLOSE: - break; - - default: - bref_set_state(&backend_ref[i], BREF_WAITING_RESULT); - break; - } - - if (execute_sescmd_in_backend(&backend_ref[i])) - { - succp = true; - } - else - { - MXS_ERROR("Failed to execute session " - "command in %s:%d", - backend_ref[i].bref_backend->server->name, - backend_ref[i].bref_backend->server->port); - } - } - else - { - ss_dassert(backend_ref[i].session_commands.size() > 1); - /** The server is already executing a session command */ - MXS_INFO("Backend %s:%d already executing sescmd.", - backend_ref[i].bref_backend->server->name, - backend_ref[i].bref_backend->server->port); - succp = true; - } - } - } - - return succp; -} - -static void handle_error_reply_client(MXS_SESSION* ses, - SCHEMAROUTER_SESSION* rses, - DCB* backend_dcb, - GWBUF* errmsg) -{ - backend_ref_t* bref = get_bref_from_dcb(rses, backend_dcb); - - if (bref) - { - - bref_clear_state(bref, BREF_IN_USE); - bref_set_state(bref, BREF_CLOSED); - } - - if (ses->state == SESSION_STATE_ROUTER_READY) - { - ses->client_dcb->func.write(ses->client_dcb, gwbuf_clone(errmsg)); - } -} - -/** - * Check if a router session has servers in use - * @param rses Router client session - * @return True if session has a single backend server in use that is running. - * False if no backends are in use or running. - */ -bool have_servers(SCHEMAROUTER_SESSION* rses) -{ - for (int i = 0; i < rses->rses_nbackends; i++) - { - if (BREF_IS_IN_USE(&rses->rses_backend_ref[i]) && - !BREF_IS_CLOSED(&rses->rses_backend_ref[i])) - { - return true; - } - } - - return false; -} - -/** - * Check if there is backend reference pointing at failed DCB, and reset its - * flags. Then clear DCB's callback and finally try to reconnect. - * - * This must be called with router lock. - * - * @param inst router instance - * @param rses router client session - * @param dcb failed DCB - * @param errmsg error message which is sent to client if it is waiting - * - * @return true if there are enough backend connections to continue, false if not - */ -static bool handle_error_new_connection(SCHEMAROUTER* inst, - SCHEMAROUTER_SESSION* rses, - DCB* backend_dcb, - GWBUF* errmsg) -{ - backend_ref_t* bref; - bool succp; - - MXS_SESSION *ses = backend_dcb->session; - CHK_SESSION(ses); - - /** - * If bref == NULL it has been replaced already with another one. - */ - if ((bref = get_bref_from_dcb(rses, backend_dcb)) == NULL) - { - /** This should not happen */ - ss_dassert(false); - return false; - } - - - - /** - * If query was sent through the bref and it is waiting for reply from - * the backend server it is necessary to send an error to the client - * because it is waiting for reply. - */ - if (BREF_IS_WAITING_RESULT(bref)) - { - DCB* client_dcb; - client_dcb = ses->client_dcb; - client_dcb->func.write(client_dcb, gwbuf_clone(errmsg)); - bref_clear_state(bref, BREF_WAITING_RESULT); - } - bref_clear_state(bref, BREF_IN_USE); - bref_set_state(bref, BREF_CLOSED); - - return have_servers(rses); -} - -/** - * Finds out if there is a backend reference pointing at the DCB given as - * parameter. - * @param rses router client session - * @param dcb DCB - * - * @return backend reference pointer if succeed or NULL - */ -static backend_ref_t* get_bref_from_dcb(SCHEMAROUTER_SESSION *rses, - DCB *dcb) -{ - CHK_DCB(dcb); - - - for (int i = 0; i < rses->rses_nbackends; i++) - { - if (rses->rses_backend_ref[i].bref_dcb == dcb) - { - return &rses->rses_backend_ref[i]; - } - } - - return NULL; -} - -/** - * Detect if a query contains a SHOW SHARDS query. - * @param query Query to inspect - * @return true if the query is a SHOW SHARDS query otherwise false - */ -bool detect_show_shards(GWBUF* query) -{ - bool rval = false; - char *querystr, *tok, *sptr; - - if (query == NULL) - { - MXS_ERROR("NULL value passed at %s:%d", __FILE__, __LINE__); - return false; - } - - if (!modutil_is_SQL(query) && !modutil_is_SQL_prepare(query)) - { - return false; - } - - if ((querystr = modutil_get_SQL(query)) == NULL) - { - MXS_ERROR("Failure to parse SQL at %s:%d", __FILE__, __LINE__); - return false; - } - - tok = strtok_r(querystr, " ", &sptr); - if (tok && strcasecmp(tok, "show") == 0) - { - tok = strtok_r(NULL, " ", &sptr); - if (tok && strcasecmp(tok, "shards") == 0) - { - rval = true; - } - } - - MXS_FREE(querystr); - return rval; -} - -/** - * Callback for the shard list result set creation - */ -RESULT_ROW* shard_list_cb(struct resultset* rset, void* data) -{ - ServerMap* pContent = (ServerMap*)data; - RESULT_ROW* rval = resultset_make_row(rset); - - if (rval) - { - resultset_row_set(rval, 0, pContent->begin()->first.c_str()); - resultset_row_set(rval, 1, pContent->begin()->second->unique_name); - pContent->erase(pContent->begin()); - } - - return rval; -} - -/** - * Send a result set of all shards and their locations to the client. - * @param rses Router client session - * @return 0 on success, -1 on error - */ -int process_show_shards(SCHEMAROUTER_SESSION* rses) -{ - int rval = -1; - - ServerMap pContent; - rses->shardmap.get_content(pContent); - RESULTSET* rset = resultset_create(shard_list_cb, &pContent); - - if (rset) - { - resultset_add_column(rset, "Database", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); - resultset_add_column(rset, "Server", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); - resultset_stream_mysql(rset, rses->rses_client_dcb); - resultset_free(rset); - rval = 0; - } - - return rval; -} - -/** - * - * @param dcb - * @param errnum - * @param mysqlstate - * @param errmsg - */ -void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const char* errmsg) -{ - GWBUF* errbuff = modutil_create_mysql_err_msg(1, 0, errnum, mysqlstate, errmsg); - if (errbuff) - { - if (dcb->func.write(dcb, errbuff) != 1) - { - MXS_ERROR("Failed to write error packet to client."); - } - } - else - { - MXS_ERROR("Memory allocation failed when creating error packet."); - } -} - -/** - * - * @param router_cli_ses - * @return - */ -bool handle_default_db(SCHEMAROUTER_SESSION *router_cli_ses) -{ - bool rval = false; - SERVER* target = router_cli_ses->shardmap.get_location(router_cli_ses->connect_db); - - if (target) - { - /* Send a COM_INIT_DB packet to the server with the right database - * and set it as the client's active database */ - - unsigned int qlen = router_cli_ses->connect_db.length(); - GWBUF* buffer = gwbuf_alloc(qlen + 5); - - if (buffer) - { - uint8_t *data = GWBUF_DATA(buffer); - gw_mysql_set_byte3(data, qlen + 1); - gwbuf_set_type(buffer, GWBUF_TYPE_MYSQL); - data[3] = 0x0; - data[4] = 0x2; - memcpy(data + 5, router_cli_ses->connect_db.c_str(), qlen); - DCB* dcb = NULL; - - if (get_shard_dcb(&dcb, router_cli_ses, target->unique_name)) - { - dcb->func.write(dcb, buffer); - MXS_DEBUG("USE '%s' sent to %s for session %p", - router_cli_ses->connect_db.c_str(), - target->unique_name, - router_cli_ses->rses_client_dcb->session); - rval = true; - } - else - { - MXS_INFO("Couldn't find target DCB for '%s'.", target->unique_name); - } - } - else - { - MXS_ERROR("Buffer allocation failed."); - } - } - else - { - /** Unknown database, hang up on the client*/ - MXS_INFO("Connecting to a non-existent database '%s'", router_cli_ses->connect_db.c_str()); - char errmsg[128 + MYSQL_DATABASE_MAXLEN + 1]; - sprintf(errmsg, "Unknown database '%s'", router_cli_ses->connect_db.c_str()); - if (router_cli_ses->rses_config.debug) - { - sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)", - router_cli_ses->rses_client_dcb->session->ses_id); - } - write_error_to_client(router_cli_ses->rses_client_dcb, - SCHEMA_ERR_DBNOTFOUND, - SCHEMA_ERRSTR_DBNOTFOUND, - errmsg); - } - - return rval; -} - -void route_queued_query(SCHEMAROUTER_SESSION *router_cli_ses) -{ - GWBUF* tmp = router_cli_ses->queue; - router_cli_ses->queue = router_cli_ses->queue->next; - tmp->next = NULL; -#ifdef SS_DEBUG - char* querystr = modutil_get_SQL(tmp); - MXS_DEBUG("Sending queued buffer for session %p: %s", - router_cli_ses->rses_client_dcb->session, - querystr); - MXS_FREE(querystr); -#endif - poll_add_epollin_event_to_dcb(router_cli_ses->rses_client_dcb, tmp); -} - -/** - * - * @param router_cli_ses Router client session - * @return 1 if mapping is done, 0 if it is still ongoing and -1 on error - */ -int inspect_backend_mapping_states(SCHEMAROUTER_SESSION *router_cli_ses, - backend_ref_t *bref, - GWBUF** wbuf) -{ - bool mapped = true; - GWBUF* writebuf = *wbuf; - backend_ref_t* bkrf = router_cli_ses->rses_backend_ref; - - for (int i = 0; i < router_cli_ses->rses_nbackends; i++) - { - if (bref->bref_dcb == bkrf[i].bref_dcb && !BREF_IS_MAPPED(&bkrf[i])) - { - if (bref->map_queue) - { - writebuf = gwbuf_append(bref->map_queue, writebuf); - bref->map_queue = NULL; - } - showdb_response_t rc = parse_showdb_response(router_cli_ses, - &router_cli_ses->rses_backend_ref[i], - &writebuf); - if (rc == SHOWDB_FULL_RESPONSE) - { - router_cli_ses->rses_backend_ref[i].bref_mapped = true; - MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p", - router_cli_ses->rses_backend_ref[i].bref_backend->server->unique_name, - router_cli_ses->rses_client_dcb->session); - } - else if (rc == SHOWDB_PARTIAL_RESPONSE) - { - bref->map_queue = writebuf; - writebuf = NULL; - MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p", - router_cli_ses->rses_backend_ref[i].bref_backend->server->unique_name, - router_cli_ses->rses_client_dcb->session); - } - else - { - DCB* client_dcb = NULL; - - if ((router_cli_ses->state & INIT_FAILED) == 0) - { - if (rc == SHOWDB_DUPLICATE_DATABASES) - { - MXS_ERROR("Duplicate databases found, closing session."); - } - else - { - MXS_ERROR("Fatal error when processing SHOW DATABASES response, closing session."); - } - client_dcb = router_cli_ses->rses_client_dcb; - - /** This is the first response to the database mapping which - * has duplicate database conflict. Set the initialization bitmask - * to INIT_FAILED */ - router_cli_ses->state |= INIT_FAILED; - - /** Send the client an error about duplicate databases - * if there is a queued query from the client. */ - if (router_cli_ses->queue) - { - GWBUF* error = modutil_create_mysql_err_msg(1, 0, - SCHEMA_ERR_DUPLICATEDB, - SCHEMA_ERRSTR_DUPLICATEDB, - "Error: duplicate databases " - "found on two different shards."); - - if (error) - { - client_dcb->func.write(client_dcb, error); - } - else - { - MXS_ERROR("Creating buffer for error message failed."); - } - } - } - *wbuf = writebuf; - return -1; - } - } - - if (BREF_IS_IN_USE(&bkrf[i]) && !BREF_IS_MAPPED(&bkrf[i])) - { - mapped = false; - MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p", - bkrf[i].bref_backend->server->unique_name, - router_cli_ses->rses_client_dcb->session); - } - } - *wbuf = writebuf; - return mapped ? 1 : 0; -} - -/** - * Synchronize the router client session shard map with the global shard map for - * this user. - * - * If the router doesn't have a shard map for this user then the current shard map - * of the client session is added to the router. If the shard map in the router is - * out of date, its contents are replaced with the contents of the current client - * session. If the router has a usable shard map, the current shard map of the client - * is discarded and the router's shard map is used. - * @param client Router session - */ -void synchronize_shard_map(SCHEMAROUTER_SESSION *client) -{ - client->router->stats.shmap_cache_miss++; - client->router->shard_manager.update_shard(client->shardmap, client->rses_client_dcb->user); -} - -/** - * Extract the database name from a COM_INIT_DB or literal USE ... query. - * @param buf Buffer with the database change query - * @param str Pointer where the database name is copied - * @return True for success, false for failure - */ -bool extract_database(GWBUF* buf, char* str) -{ - uint8_t* packet; - char *saved, *tok, *query = NULL; - bool succp = true; - unsigned int plen; - - packet = GWBUF_DATA(buf); - plen = gw_mysql_get_byte3(packet) - 1; - - /** Copy database name from MySQL packet to session */ - if (qc_get_operation(buf) == QUERY_OP_CHANGE_DB) - { - const char *delim = "` \n\t;"; - - query = modutil_get_SQL(buf); - tok = strtok_r(query, delim, &saved); - - if (tok == NULL || strcasecmp(tok, "use") != 0) - { - MXS_ERROR("extract_database: Malformed chage database packet."); - succp = false; - goto retblock; - } - - tok = strtok_r(NULL, delim, &saved); - - if (tok == NULL) - { - MXS_ERROR("extract_database: Malformed change database packet."); - succp = false; - goto retblock; - } - - strncpy(str, tok, MYSQL_DATABASE_MAXLEN); - } - else - { - memcpy(str, packet + 5, plen); - memset(str + plen, 0, 1); - } -retblock: - MXS_FREE(query); - return succp; -} - -/** - * Create a fake error message from a DCB. - * @param fail_str Custom error message - * @param dcb DCB to use as the origin of the error - */ -void create_error_reply(char* fail_str, DCB* dcb) -{ - MXS_INFO("change_current_db: failed to change database: %s", fail_str); - GWBUF* errbuf = modutil_create_mysql_err_msg(1, 0, 1049, "42000", fail_str); - - if (errbuf == NULL) - { - MXS_ERROR("Creating buffer for error message failed."); - return; - } - /** Set flags that help router to identify session commands reply */ - gwbuf_set_type(errbuf, GWBUF_TYPE_MYSQL); - gwbuf_set_type(errbuf, GWBUF_TYPE_SESCMD_RESPONSE); - gwbuf_set_type(errbuf, GWBUF_TYPE_RESPONSE_END); - - poll_add_epollin_event_to_dcb(dcb, - errbuf); -} - -/** - * Read new database name from MYSQL_COM_INIT_DB packet or a literal USE ... COM_QUERY - * packet, check that it exists in the hashtable and copy its name to MYSQL_session. - * - * @param dest Destination where the database name will be written - * @param dbhash Hashtable containing valid databases - * @param buf Buffer containing the database change query - * - * @return true if new database is set, false if non-existent database was tried - * to be set - */ -bool change_current_db(string& dest, Shard& shard, GWBUF* buf) -{ - bool succp = false; - char db[MYSQL_DATABASE_MAXLEN + 1]; - - if (GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5) - { - /** Copy database name from MySQL packet to session */ - if (extract_database(buf, db)) - { - MXS_INFO("change_current_db: INIT_DB with database '%s'", db); - /** - * Update the session's active database only if it's in the hashtable. - * If it isn't found, send a custom error packet to the client. - */ - - SERVER* target = shard.get_location(db); - - if (target) - { - dest = db; - MXS_INFO("change_current_db: database is on server: '%s'.", target->unique_name); - succp = true; - } - } - } - else - { - MXS_ERROR("change_current_db: failed to change database: Query buffer too large"); - } - - return succp; -} - -MXS_BEGIN_DECLS - -/** - * Create an instance of schemarouter router within the MaxScale. - * - * - * @param service The service this router is being create for - * @param options The options for this query router - * - * @return NULL in failure, pointer to router in success. - */ -static MXS_ROUTER* do_createInstance(SERVICE *service, char **options) -{ - MXS_CONFIG_PARAMETER* conf; - MXS_CONFIG_PARAMETER* param; - - SCHEMAROUTER* router = new SCHEMAROUTER; - - /** Add default system databases to ignore */ - router->ignored_dbs.insert("mysql"); - router->ignored_dbs.insert("information_schema"); - router->ignored_dbs.insert("performance_schema"); - router->service = service; - router->stats.longest_sescmd = 0; - router->stats.n_hist_exceeded = 0; - router->stats.n_queries = 0; - router->stats.n_sescmd = 0; - router->stats.ses_longest = 0; - router->stats.ses_shortest = (double)((unsigned long)(~0)); - spinlock_init(&router->lock); - - conf = service->svc_config_param; - - router->schemarouter_config.refresh_databases = config_get_bool(conf, "refresh_databases"); - router->schemarouter_config.refresh_min_interval = config_get_integer(conf, "refresh_interval"); - router->schemarouter_config.debug = config_get_bool(conf, "debug"); - - if ((config_get_param(conf, "auth_all_servers")) == NULL) - { - MXS_NOTICE("Authentication data is fetched from all servers. To disable this " - "add 'auth_all_servers=0' to the service."); - service->users_from_all = true; - } - - if ((param = config_get_param(conf, "ignore_databases_regex"))) - { - int errcode; - PCRE2_SIZE erroffset; - pcre2_code* re = pcre2_compile((PCRE2_SPTR)param->value, PCRE2_ZERO_TERMINATED, 0, - &errcode, &erroffset, NULL); - - if (re == NULL) - { - PCRE2_UCHAR errbuf[512]; - pcre2_get_error_message(errcode, errbuf, sizeof(errbuf)); - MXS_ERROR("Regex compilation failed at %d for regex '%s': %s", - (int)erroffset, param->value, errbuf); - delete router; - return NULL; - } - - pcre2_match_data* match_data = pcre2_match_data_create_from_pattern(re, NULL); - - if (match_data == NULL) - { - MXS_ERROR("PCRE2 match data creation failed. This" - " is most likely caused by a lack of available memory."); - pcre2_code_free(re); - delete router; - return NULL; - } - - router->ignore_regex = re; - router->ignore_match_data = match_data; - } - - if ((param = config_get_param(conf, "ignore_databases"))) - { - char val[strlen(param->value) + 1]; - strcpy(val, param->value); - - const char *sep = ", \t"; - char *sptr; - char *tok = strtok_r(val, sep, &sptr); - - while (tok) - { - router->ignored_dbs.insert(tok); - tok = strtok_r(NULL, sep, &sptr); - } - } - - bool failure = false; - - for (int i = 0; options && options[i]; i++) - { - char* value = strchr(options[i], '='); - - if (value == NULL) - { - MXS_ERROR("Unknown router options for %s", options[i]); - failure = true; - break; - } - - *value = '\0'; - value++; - - if (strcmp(options[i], "max_sescmd_history") == 0) - { - MXS_WARNING("Use of 'max_sescmd_history' is deprecated"); - } - else if (strcmp(options[i], "disable_sescmd_history") == 0) - { - MXS_WARNING("Use of 'disable_sescmd_history' is deprecated"); - } - else if (strcmp(options[i], "refresh_databases") == 0) - { - router->schemarouter_config.refresh_databases = config_truth_value(value); - } - else if (strcmp(options[i], "refresh_interval") == 0) - { - router->schemarouter_config.refresh_min_interval = atof(value); - } - else if (strcmp(options[i], "debug") == 0) - { - router->schemarouter_config.debug = config_truth_value(value); - } - else - { - MXS_ERROR("Unknown router options for %s", options[i]); - failure = true; - break; - } - } - - if (failure) - { - delete router; - router = NULL; - } - - return (MXS_ROUTER *)router; -} - -static MXS_ROUTER* createInstance(SERVICE *service, char **options) -{ - MXS_ROUTER* rval = NULL; - - MXS_EXCEPTION_GUARD((rval = do_createInstance(service, options))); - - return rval; -} - -/** - * Associate a new session with this instance of the router. - * - * The session is used to store all the data required for a particular - * client connection. - * - * @param instance The router instance data - * @param session The session itself - * @return Session specific data for this session - */ -static MXS_ROUTER_SESSION* do_newSession(MXS_ROUTER* router_inst, MXS_SESSION* session) -{ - char db[MYSQL_DATABASE_MAXLEN + 1] = ""; - MySQLProtocol* protocol = (MySQLProtocol*)session->client_dcb->protocol; - MYSQL_session* data = (MYSQL_session*)session->client_dcb->data; - bool using_db = false; - bool have_db = false; - - /* To enable connecting directly to a sharded database we first need - * to disable it for the client DCB's protocol so that we can connect to them*/ - if (protocol->client_capabilities & GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB && - (have_db = strnlen(data->db, MYSQL_DATABASE_MAXLEN) > 0)) - { - protocol->client_capabilities &= ~GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB; - strcpy(db, data->db); - *data->db = 0; - using_db = true; - MXS_INFO("Client logging in directly to a database '%s', " - "postponing until databases have been mapped.", db); - } - - if (!have_db) - { - MXS_INFO("Client'%s' connecting with empty database.", data->user); - } - - SCHEMAROUTER_SESSION* client_rses = new SCHEMAROUTER_SESSION; - - SCHEMAROUTER* router = (SCHEMAROUTER*)router_inst; - - - client_rses->router = router; - client_rses->rses_client_dcb = (DCB*)session->client_dcb; - client_rses->queue = NULL; - client_rses->closed = false; - client_rses->sent_sescmd = 0; - client_rses->replied_sescmd = 0; - - client_rses->shardmap = router->shard_manager.get_shard(session->client_dcb->user, - router->schemarouter_config.refresh_min_interval); - - memcpy(&client_rses->rses_config, &router->schemarouter_config, sizeof(schemarouter_config_t)); - - if (using_db) - { - client_rses->state |= INIT_USE_DB; - } - /** - * Set defaults to session variables. - */ - - /** - * Instead of calling this, ensure that there is at least one - * responding server. - */ - - int router_nservers = router->service->n_dbref; - - /** - * Create backend reference objects for this session. - */ - - backend_ref_t* backend_ref = new backend_ref_t[router_nservers]; - - /** - * Initialize backend references with BACKEND ptr. - * Initialize session command cursors for each backend reference. - */ - - int i = 0; - - for (SERVER_REF *ref = router->service->dbref; ref; ref = ref->next) - { - if (ref->active) - { - backend_ref[i].bref_state = 0; - backend_ref[i].n_mapping_eof = 0; - backend_ref[i].map_queue = NULL; - backend_ref[i].bref_backend = ref; - backend_ref[i].bref_pending_cmd = NULL; - backend_ref[i].bref_num_result_wait = 0; - i++; - } - } - - if (i < router_nservers) - { - router_nservers = i; - } - - client_rses->rses_backend_ref = backend_ref; - client_rses->rses_nbackends = router_nservers; - - /** - * Connect to all backend servers - */ - bool succp = connect_backend_servers(backend_ref, router_nservers, session, router); - - if (!succp || client_rses->closed) - { - delete[] client_rses->rses_backend_ref; - delete client_rses; - return NULL; - } - - if (db[0]) - { - /* Store the database the client is connecting to */ - client_rses->connect_db = db; - } - - atomic_add(&router->stats.sessions, 1); - - return (MXS_ROUTER_SESSION*)client_rses; -} - -static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* session) -{ - MXS_ROUTER_SESSION* rval = NULL; - - MXS_EXCEPTION_GUARD((rval = do_newSession(router_inst, session))); - - return rval; -} - -/** - * Close a session with the router, this is the mechanism - * by which a router may cleanup data structure etc. - * - * @param instance The router instance data - * @param session The session being closed - */ -static void do_closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session) -{ - SCHEMAROUTER_SESSION *router_cli_ses = (SCHEMAROUTER_SESSION *)router_session; - - ss_dassert(!router_cli_ses->closed); - - /** - * Lock router client session for secure read and update. - */ - if (!router_cli_ses->closed) - { - router_cli_ses->closed = true; - - for (int i = 0; i < router_cli_ses->rses_nbackends; i++) - { - backend_ref_t* bref = &router_cli_ses->rses_backend_ref[i]; - DCB* dcb = bref->bref_dcb; - /** Close those which had been connected */ - if (BREF_IS_IN_USE(bref)) - { - CHK_DCB(dcb); - - /** Clean operation counter in bref and in SERVER */ - while (BREF_IS_WAITING_RESULT(bref)) - { - bref_clear_state(bref, BREF_WAITING_RESULT); - } - bref_clear_state(bref, BREF_IN_USE); - bref_set_state(bref, BREF_CLOSED); - /** - * closes protocol and dcb - */ - dcb_close(dcb); - /** decrease server current connection counters */ - atomic_add(&bref->bref_backend->connections, -1); - } - } - - gwbuf_free(router_cli_ses->queue); - - SCHEMAROUTER *inst = router_cli_ses->router; - - spinlock_acquire(&inst->lock); - if (inst->stats.longest_sescmd < router_cli_ses->stats.longest_sescmd) - { - inst->stats.longest_sescmd = router_cli_ses->stats.longest_sescmd; - } - double ses_time = difftime(time(NULL), router_cli_ses->rses_client_dcb->session->stats.connect); - if (inst->stats.ses_longest < ses_time) - { - inst->stats.ses_longest = ses_time; - } - if (inst->stats.ses_shortest > ses_time && inst->stats.ses_shortest > 0) - { - inst->stats.ses_shortest = ses_time; - } - - inst->stats.ses_average = - (ses_time + ((inst->stats.sessions - 1) * inst->stats.ses_average)) / - (inst->stats.sessions); - - spinlock_release(&inst->lock); - } -} - -static void closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session) -{ - MXS_EXCEPTION_GUARD(do_closeSession(instance, router_session)); -} - -static void do_freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_client_session) -{ - SCHEMAROUTER_SESSION* router_cli_ses = (SCHEMAROUTER_SESSION *)router_client_session; - - for (int i = 0; i < router_cli_ses->rses_nbackends; i++) - { - gwbuf_free(router_cli_ses->rses_backend_ref[i].bref_pending_cmd); - } - - /* - * We are no longer in the linked list, free - * all the memory and other resources associated - * to the client session. - */ - delete[] router_cli_ses->rses_backend_ref; - delete router_cli_ses; - return; -} - -static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_client_session) -{ - MXS_EXCEPTION_GUARD(do_freeSession(router_instance, router_client_session)); -} - -/** - * The main routing entry, this is called with every packet that is - * received and has to be forwarded to the backend database. - * - * The routeQuery will make the routing decision based on the contents - * of the instance, session and the query itself in the queue. The - * data in the queue may not represent a complete query, it represents - * the data that has been received. The query router itself is responsible - * for buffering the partial query, a later call to the query router will - * contain the remainder, or part thereof of the query. - * - * @param instance The query router instance - * @param router_session The session associated with the client - * @param querybuf MaxScale buffer queue with received packet - * - * @return if succeed 1, otherwise 0 - * If routeQuery fails, it means that router session has failed. - * In any tolerated failure, handleError is called and if necessary, - * an error message is sent to the client. - * - */ -static int do_routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, GWBUF* querybuf) -{ - uint32_t qtype = QUERY_TYPE_UNKNOWN; - uint8_t packet_type; - uint8_t* packet; - int ret = 0; - DCB* target_dcb = NULL; - SCHEMAROUTER* inst = (SCHEMAROUTER *)instance; - SCHEMAROUTER_SESSION* router_cli_ses = (SCHEMAROUTER_SESSION *)router_session; - bool change_successful = false; - route_target_t route_target = TARGET_UNDEFINED; - bool succp = false; - char db[MYSQL_DATABASE_MAXLEN + 1]; - char errbuf[26 + MYSQL_DATABASE_MAXLEN]; - - SERVER* target = NULL; - - ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf)); - - if (router_cli_ses->closed) - { - return 0; - } - - if (router_cli_ses->shardmap.empty()) - { - /* Generate database list */ - gen_databaselist(inst, router_cli_ses); - } - - /** - * If the databases are still being mapped or if the client connected - * with a default database but no database mapping was performed we need - * to store the query. Once the databases have been mapped and/or the - * default database is taken into use we can send the query forward. - */ - if (router_cli_ses->state & (INIT_MAPPING | INIT_USE_DB)) - { - int init_rval = 1; - char* querystr = modutil_get_SQL(querybuf); - MXS_INFO("Storing query for session %p: %s", - router_cli_ses->rses_client_dcb->session, - querystr); - MXS_FREE(querystr); - querybuf = gwbuf_make_contiguous(querybuf); - GWBUF* ptr = router_cli_ses->queue; - - while (ptr && ptr->next) - { - ptr = ptr->next; - } - - if (ptr == NULL) - { - router_cli_ses->queue = querybuf; - } - else - { - ptr->next = querybuf; - - } - - if (router_cli_ses->state == (INIT_READY | INIT_USE_DB)) - { - /** - * This state is possible if a client connects with a default database - * and the shard map was found from the router cache - */ - if (!handle_default_db(router_cli_ses)) - { - init_rval = 0; - } - } - - return init_rval; - } - - packet = GWBUF_DATA(querybuf); - packet_type = packet[4]; - qc_query_op_t op = QUERY_OP_UNDEFINED; - - if (detect_show_shards(querybuf)) - { - process_show_shards(router_cli_ses); - gwbuf_free(querybuf); - return 1; - } - - switch (packet_type) - { - case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */ - case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */ - case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */ - case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */ - case MYSQL_COM_PING: /*< 0e all servers are pinged */ - case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */ - case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */ - case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */ - case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */ - qtype = QUERY_TYPE_SESSION_WRITE; - break; - - case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */ - case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */ - qtype = QUERY_TYPE_WRITE; - break; - - case MYSQL_COM_QUERY: - qtype = qc_get_type_mask(querybuf); - op = qc_get_operation(querybuf); - break; - - case MYSQL_COM_STMT_PREPARE: - qtype = qc_get_type_mask(querybuf); - qtype |= QUERY_TYPE_PREPARE_STMT; - break; - - case MYSQL_COM_STMT_EXECUTE: - /** Parsing is not needed for this type of packet */ - qtype = QUERY_TYPE_EXEC_STMT; - break; - - case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */ - case MYSQL_COM_STATISTICS: /**< 9 ? */ - case MYSQL_COM_PROCESS_INFO: /**< 0a ? */ - case MYSQL_COM_CONNECT: /**< 0b ? */ - case MYSQL_COM_PROCESS_KILL: /**< 0c ? */ - case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */ - case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */ - case MYSQL_COM_DAEMON: /**< 1d ? */ - default: - break; - } - - if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) - { - char *sql; - int sql_len; - char* qtypestr = qc_typemask_to_string(qtype); - modutil_extract_SQL(querybuf, &sql, &sql_len); - - MXS_INFO("> Command: %s, stmt: %.*s %s%s", - STRPACKETTYPE(packet_type), sql_len, sql, - (querybuf->hint == NULL ? "" : ", Hint:"), - (querybuf->hint == NULL ? "" : STRHINTTYPE(querybuf->hint->type))); - - MXS_FREE(qtypestr); - } - /** - * Find out whether the query should be routed to single server or to - * all of them. - */ - - if (packet_type == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB) - { - change_successful = change_current_db(router_cli_ses->current_db, - router_cli_ses->shardmap, - querybuf); - if (!change_successful) - { - extract_database(querybuf, db); - snprintf(errbuf, 25 + MYSQL_DATABASE_MAXLEN, "Unknown database: %s", db); - - if (router_cli_ses->rses_config.debug) - { - sprintf(errbuf + strlen(errbuf), - " ([%lu]: DB change failed)", - router_cli_ses->rses_client_dcb->session->ses_id); - } - - write_error_to_client(router_cli_ses->rses_client_dcb, - SCHEMA_ERR_DBNOTFOUND, - SCHEMA_ERRSTR_DBNOTFOUND, - errbuf); - - MXS_ERROR("Changing database failed."); - gwbuf_free(querybuf); - return 1; - } - } - - /** Create the response to the SHOW DATABASES from the mapped databases */ - if (qc_query_is_type(qtype, QUERY_TYPE_SHOW_DATABASES)) - { - if (send_database_list(inst, router_cli_ses)) - { - ret = 1; - } - - gwbuf_free(querybuf); - return ret; - } - - route_target = get_shard_route_target(qtype); - - if (packet_type == MYSQL_COM_INIT_DB || op == QUERY_OP_CHANGE_DB) - { - route_target = TARGET_UNDEFINED; - target = router_cli_ses->shardmap.get_location(router_cli_ses->current_db); - - if (target) - { - MXS_INFO("INIT_DB for database '%s' on server '%s'", - router_cli_ses->current_db.c_str(), target->unique_name); - route_target = TARGET_NAMED_SERVER; - } - else - { - MXS_INFO("INIT_DB with unknown database"); - } - } - else if (route_target != TARGET_ALL) - { - /** If no database is found in the query and there is no active database - * or hints in the query we route the query to the first available - * server. This isn't ideal for monitoring server status but works if - * we just want the server to send an error back. */ - - target = get_shard_target(inst, router_cli_ses, querybuf, qtype); - - if (target) - { - if (SERVER_IS_RUNNING(target)) - { - route_target = TARGET_NAMED_SERVER; - } - else - { - MXS_INFO("Backend server '%s' is not in a viable state", target->unique_name); - - /** - * Shard is not a viable target right now so we check - * for an alternate backend with the database. If this is not found - * the target is undefined and an error will be returned to the client. - */ - } - } - } - - if (TARGET_IS_UNDEFINED(route_target)) - { - target = get_shard_target(inst, router_cli_ses, querybuf, qtype); - - if ((target == NULL && - packet_type != MYSQL_COM_INIT_DB && - router_cli_ses->current_db.length() == 0) || - packet_type == MYSQL_COM_FIELD_LIST || - (router_cli_ses->current_db.length() == 0)) - { - /** - * No current database and no databases in query or - * the database is ignored, route to first available backend. - */ - - route_target = TARGET_ANY; - MXS_INFO("Routing query to first available backend."); - - } - else - { - if (!change_successful) - { - /** - * Bad shard status. The changing of the database - * was not successful and the error message was already sent. - */ - - ret = 1; - } - else - { - MXS_ERROR("Error : Router internal failure (schemarouter)"); - /** Something else went wrong, terminate connection */ - ret = 0; - } - - gwbuf_free(querybuf); - return ret; - } - } - - if (TARGET_IS_ALL(route_target)) - { - /** - * It is not sure if the session command in question requires - * response. Statement is examined in route_session_write. - * Router locking is done inside the function. - */ - succp = route_session_write(router_cli_ses, - querybuf, - inst, - packet_type, - qtype); - - if (succp) - { - atomic_add(&inst->stats.n_sescmd, 1); - atomic_add(&inst->stats.n_queries, 1); - ret = 1; - } - - gwbuf_free(querybuf); - return ret; - } - - if (TARGET_IS_ANY(route_target)) - { - for (int i = 0; i < router_cli_ses->rses_nbackends; i++) - { - SERVER *server = router_cli_ses->rses_backend_ref[i].bref_backend->server; - if (SERVER_IS_RUNNING(server)) - { - route_target = TARGET_NAMED_SERVER; - target = server; - break; - } - } - - if (TARGET_IS_ANY(route_target)) - { - /**No valid backends alive*/ - MXS_ERROR("Failed to route query, no backends are available."); - gwbuf_free(querybuf); - return 0; - } - - } - - /** - * Query is routed to one of the backends - */ - if (TARGET_IS_NAMED_SERVER(route_target) && target) - { - /** - * Search backend server by name or replication lag. - * If it fails, then try to find valid slave or master. - */ - - succp = get_shard_dcb(&target_dcb, router_cli_ses, target->unique_name); - - if (!succp) - { - MXS_INFO("Was supposed to route to named server " - "%s but couldn't find the server in a " - "suitable state.", target->unique_name); - } - - } - - if (succp) /*< Have DCB of the target backend */ - { - backend_ref_t *bref = get_bref_from_dcb(router_cli_ses, target_dcb); - - MXS_INFO("Route query to \t%s:%d <", - bref->bref_backend->server->name, - bref->bref_backend->server->port); - /** - * Store current stmt if execution of previous session command - * haven't completed yet. Note that according to MySQL protocol - * there can only be one such non-sescmd stmt at the time. - */ - if (bref->session_commands.size() > 0) - { - ss_dassert((bref->bref_pending_cmd == NULL || - router_cli_ses->closed)); - bref->bref_pending_cmd = querybuf; - return 1; - } - - if ((ret = target_dcb->func.write(target_dcb, gwbuf_clone(querybuf))) == 1) - { - backend_ref_t* bref; - - atomic_add(&inst->stats.n_queries, 1); - - /** - * Add one query response waiter to backend reference - */ - bref = get_bref_from_dcb(router_cli_ses, target_dcb); - bref_set_state(bref, BREF_QUERY_ACTIVE); - bref_set_state(bref, BREF_WAITING_RESULT); - } - else - { - MXS_ERROR("Routing query failed."); - } - } - - gwbuf_free(querybuf); - - return ret; -} - -static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, GWBUF* qbuf) -{ - int rval = 0; - - MXS_EXCEPTION_GUARD((rval = do_routeQuery(instance, router_session, qbuf))); - - return rval; -} - -/** - * Diagnostics routine - * - * Print query router statistics to the DCB passed in - * - * @param instance The router instance - * @param dcb The DCB for diagnostic output - */ -static void do_diagnostic(MXS_ROUTER *instance, DCB *dcb) -{ - SCHEMAROUTER *router = (SCHEMAROUTER *)instance; - int i = 0; - - double sescmd_pct = router->stats.n_sescmd != 0 ? - 100.0 * ((double)router->stats.n_sescmd / (double)router->stats.n_queries) : - 0.0; - - /** Session command statistics */ - dcb_printf(dcb, "\n\33[1;4mSession Commands\33[0m\n"); - dcb_printf(dcb, "Total number of queries: %d\n", - router->stats.n_queries); - dcb_printf(dcb, "Percentage of session commands: %.2f\n", - sescmd_pct); - dcb_printf(dcb, "Longest chain of stored session commands: %d\n", - router->stats.longest_sescmd); - dcb_printf(dcb, "Session command history limit exceeded: %d times\n", - router->stats.n_hist_exceeded); - - /** Session time statistics */ - - if (router->stats.sessions > 0) - { - dcb_printf(dcb, "\n\33[1;4mSession Time Statistics\33[0m\n"); - dcb_printf(dcb, "Longest session: %.2lf seconds\n", router->stats.ses_longest); - dcb_printf(dcb, "Shortest session: %.2lf seconds\n", router->stats.ses_shortest); - dcb_printf(dcb, "Average session length: %.2lf seconds\n", router->stats.ses_average); - } - dcb_printf(dcb, "Shard map cache hits: %d\n", router->stats.shmap_cache_hit); - dcb_printf(dcb, "Shard map cache misses: %d\n", router->stats.shmap_cache_miss); - dcb_printf(dcb, "\n"); -} - -static void diagnostic(MXS_ROUTER *instance, DCB *dcb) -{ - MXS_EXCEPTION_GUARD(do_diagnostic(instance, dcb)); -} - -/** - * Client Reply routine - * - * The routine will reply to client for session change with master server data - * - * @param instance The router instance - * @param router_session The router session - * @param backend_dcb The backend DCB - * @param queue The GWBUF with reply data - */ -static void do_clientReply(MXS_ROUTER* instance, - MXS_ROUTER_SESSION* router_session, - GWBUF* buffer, - DCB* backend_dcb) -{ - backend_ref_t* bref; - GWBUF* writebuf = buffer; - - SCHEMAROUTER_SESSION *router_cli_ses = (SCHEMAROUTER_SESSION *) router_session; - - - /** - * Lock router client session for secure read of router session members. - * Note that this could be done without lock by using version # - */ - if (router_cli_ses->closed) - { - gwbuf_free(buffer); - return; - } - - /** Holding lock ensures that router session remains open */ - ss_dassert(backend_dcb->session != NULL); - DCB *client_dcb = backend_dcb->session->client_dcb; - - bref = get_bref_from_dcb(router_cli_ses, backend_dcb); - - if (bref == NULL) - { - gwbuf_free(writebuf); - return; - } - - MXS_DEBUG("Reply from [%s] session [%p]" - " mapping [%s] queries queued [%s]", - bref->bref_backend->server->unique_name, - router_cli_ses->rses_client_dcb->session, - router_cli_ses->state & INIT_MAPPING ? "true" : "false", - router_cli_ses->queue == NULL ? "none" : - router_cli_ses->queue->next ? "multiple" : "one"); - - - - if (router_cli_ses->state & INIT_MAPPING) - { - int rc = inspect_backend_mapping_states(router_cli_ses, bref, &writebuf); - gwbuf_free(writebuf); - writebuf = NULL; - - if (rc == 1) - { - synchronize_shard_map(router_cli_ses); - - /* - * Check if the session is reconnecting with a database name - * that is not in the hashtable. If the database is not found - * then close the session. - */ - router_cli_ses->state &= ~INIT_MAPPING; - - if (router_cli_ses->state & INIT_USE_DB) - { - bool success = handle_default_db(router_cli_ses); - if (!success) - { - dcb_close(router_cli_ses->rses_client_dcb); - } - return; - } - - if (router_cli_ses->queue) - { - ss_dassert(router_cli_ses->state == INIT_READY); - route_queued_query(router_cli_ses); - } - MXS_DEBUG("session [%p] database map finished.", - router_cli_ses); - } - - if (rc == -1) - { - dcb_close(router_cli_ses->rses_client_dcb); - } - return; - } - - if (router_cli_ses->state & INIT_USE_DB) - { - MXS_DEBUG("Reply to USE '%s' received for session %p", - router_cli_ses->connect_db.c_str(), - router_cli_ses->rses_client_dcb->session); - router_cli_ses->state &= ~INIT_USE_DB; - router_cli_ses->current_db = router_cli_ses->connect_db; - ss_dassert(router_cli_ses->state == INIT_READY); - - if (router_cli_ses->queue) - { - route_queued_query(router_cli_ses); - } - - gwbuf_free(writebuf); - return; - } - - if (router_cli_ses->queue) - { - ss_dassert(router_cli_ses->state == INIT_READY); - route_queued_query(router_cli_ses); - return; - } - - - - /** - * Active cursor means that reply is from session command - * execution. - */ - if (bref->session_commands.size() > 0) - { - if (GWBUF_IS_TYPE_SESCMD_RESPONSE(writebuf)) - { - /** - * Discard all those responses that have already been sent to - * the client. Return with buffer including response that - * needs to be sent to client or NULL. - */ - if (router_cli_ses->replied_sescmd < router_cli_ses->sent_sescmd && - bref->session_commands.front().get_position() == router_cli_ses->replied_sescmd + 1) - { - ++router_cli_ses->replied_sescmd; - } - else - { - /** The reply to this session command has already been sent - * to the client. */ - gwbuf_free(writebuf); - writebuf = NULL; - } - bref->session_commands.pop_front(); - } - /** - * If response will be sent to client, decrease waiter count. - * This applies to session commands only. Counter decrement - * for other type of queries is done outside this block. - */ - if (writebuf != NULL && client_dcb != NULL) - { - /** Set response status as replied */ - bref_clear_state(bref, BREF_WAITING_RESULT); - } - } - /** - * Clear BREF_QUERY_ACTIVE flag and decrease waiter counter. - * This applies for queries other than session commands. - */ - else if (BREF_IS_QUERY_ACTIVE(bref)) - { - bref_clear_state(bref, BREF_QUERY_ACTIVE); - /** Set response status as replied */ - bref_clear_state(bref, BREF_WAITING_RESULT); - } - - if (writebuf != NULL && client_dcb != NULL) - { - unsigned char* cmd = (unsigned char*) writebuf->start; - int state = router_cli_ses->state; - /** Write reply to client DCB */ - MXS_INFO("returning reply [%s] " - "state [%s] session [%p]", - PTR_IS_ERR(cmd) ? "ERR" : PTR_IS_OK(cmd) ? "OK" : "RSET", - state & INIT_UNINT ? "UNINIT" : state & INIT_MAPPING ? "MAPPING" : "READY", - router_cli_ses->rses_client_dcb->session); - MXS_SESSION_ROUTE_REPLY(backend_dcb->session, writebuf); - } - - /** There is one pending session command to be executed. */ - if (bref->session_commands.size() > 0) - { - - MXS_INFO("Backend %s:%d processed reply and starts to execute " - "active cursor.", - bref->bref_backend->server->name, - bref->bref_backend->server->port); - - execute_sescmd_in_backend(bref); - } - else if (bref->bref_pending_cmd != NULL) /*< non-sescmd is waiting to be routed */ - { - int ret; - - CHK_GWBUF(bref->bref_pending_cmd); - - if ((ret = bref->bref_dcb->func.write(bref->bref_dcb, - gwbuf_clone(bref->bref_pending_cmd))) == 1) - { - SCHEMAROUTER* inst = (SCHEMAROUTER *) instance; - atomic_add(&inst->stats.n_queries, 1); - /** - * Add one query response waiter to backend reference - */ - bref_set_state(bref, BREF_QUERY_ACTIVE); - bref_set_state(bref, BREF_WAITING_RESULT); - } - else - { - char* sql = modutil_get_SQL(bref->bref_pending_cmd); - - if (sql) - { - MXS_ERROR("Routing query \"%s\" failed.", sql); - MXS_FREE(sql); - } - else - { - MXS_ERROR("Routing query failed."); - } - } - gwbuf_free(bref->bref_pending_cmd); - bref->bref_pending_cmd = NULL; - } -} - -static void clientReply(MXS_ROUTER* instance, - MXS_ROUTER_SESSION* router_session, - GWBUF* buffer, - DCB* backend_dcb) -{ - MXS_EXCEPTION_GUARD(do_clientReply(instance, router_session, buffer, backend_dcb)); -} - -/** - * Error Handler routine to resolve _backend_ failures. If it succeeds then there - * are enough operative backends available and connected. Otherwise it fails, - * and session is terminated. - * - * @param instance The router instance - * @param router_session The router session - * @param errmsgbuf The error message to reply - * @param backend_dcb The backend DCB - * @param action The action: ERRACT_NEW_CONNECTION or ERRACT_REPLY_CLIENT - * @param succp Result of action: true iff router can continue - * - * Even if succp == true connecting to new slave may have failed. succp is to - * tell whether router has enough master/slave connections to continue work. - */ -static void do_handleError(MXS_ROUTER* instance, - MXS_ROUTER_SESSION* router_session, - GWBUF* errmsgbuf, - DCB* problem_dcb, - mxs_error_action_t action, - bool* succp) -{ - ss_dassert(problem_dcb->dcb_role == DCB_ROLE_BACKEND_HANDLER); - CHK_DCB(problem_dcb); - SCHEMAROUTER* inst = (SCHEMAROUTER *)instance; - SCHEMAROUTER_SESSION* rses = (SCHEMAROUTER_SESSION *)router_session; - MXS_SESSION *session = problem_dcb->session; - ss_dassert(session && rses); - - CHK_SESSION(session); - - switch (action) - { - case ERRACT_NEW_CONNECTION: - *succp = handle_error_new_connection(inst, rses, problem_dcb, errmsgbuf); - break; - - case ERRACT_REPLY_CLIENT: - handle_error_reply_client(session, rses, problem_dcb, errmsgbuf); - *succp = false; /*< no new backend servers were made available */ - break; - - default: - *succp = false; - break; - } - - dcb_close(problem_dcb); -} - -static void handleError(MXS_ROUTER* instance, - MXS_ROUTER_SESSION* router_session, - GWBUF* errmsgbuf, - DCB* problem_dcb, - mxs_error_action_t action, - bool* succp) -{ - MXS_EXCEPTION_GUARD(do_handleError(instance, router_session, errmsgbuf, - problem_dcb, action, succp)); -} - -/** - * The module entry point routine. It is this routine that - * must populate the structure that is referred to as the - * "module object", this is a structure with the set of - * external entry points for this module. - * - * @return The module object - */ -MXS_MODULE* MXS_CREATE_MODULE() -{ - MXS_NOTICE("Initializing Schema Sharding Router."); - - static MXS_ROUTER_OBJECT MyObject = - { - createInstance, - newSession, - closeSession, - freeSession, - routeQuery, - diagnostic, - clientReply, - handleError, - NULL, - NULL - }; - - static MXS_MODULE info = - { - MXS_MODULE_API_ROUTER, - MXS_MODULE_BETA_RELEASE, - MXS_ROUTER_VERSION, - "A database sharding router for simple sharding", - "V1.0.0", - RCAP_TYPE_CONTIGUOUS_INPUT, - &MyObject, - NULL, /* Process init. */ - NULL, /* Process finish. */ - NULL, /* Thread init. */ - NULL, /* Thread finish. */ - { - {"ignore_databases", MXS_MODULE_PARAM_STRING}, - {"ignore_databases_regex", MXS_MODULE_PARAM_STRING}, - {"max_sescmd_history", MXS_MODULE_PARAM_COUNT, "0"}, - {"disable_sescmd_history", MXS_MODULE_PARAM_BOOL, "false"}, - {"refresh_databases", MXS_MODULE_PARAM_BOOL, "true"}, - {"refresh_interval", MXS_MODULE_PARAM_COUNT, DEFAULT_REFRESH_INTERVAL}, - {"debug", MXS_MODULE_PARAM_BOOL, "false"}, - {MXS_END_MODULE_PARAMS} - } - }; - - return &info; -} - -MXS_END_DECLS diff --git a/server/modules/routing/schemarouter/schemaroutersession.hh b/server/modules/routing/schemarouter/schemaroutersession.hh new file mode 100644 index 000000000..5015d86c9 --- /dev/null +++ b/server/modules/routing/schemarouter/schemaroutersession.hh @@ -0,0 +1,198 @@ +/* + * Copyright (c) 2016 MariaDB Corporation Ab + * + * Use of this software is governed by the Business Source License included + * in the LICENSE.TXT file and at www.mariadb.com/bsl11. + * + * Change Date: 2019-07-01 + * + * On the date above, in accordance with the Business Source License, use + * of this software will be governed by version 2 or later of the General + * Public License. + */ + +#pragma once + +#include "schemarouter.hh" + +#include + +#include +#include + +#include "shard_map.hh" +#include "session_command.hh" + +/** + * Bitmask values for the router session's initialization. These values are used + * to prevent responses from internal commands being forwarded to the client. + */ +typedef enum init_mask +{ + INIT_READY = 0x00, + INIT_MAPPING = 0x01, + INIT_USE_DB = 0x02, + INIT_UNINT = 0x04, + INIT_FAILED = 0x08 +} init_mask_t; + +typedef enum showdb_response +{ + SHOWDB_FULL_RESPONSE, + SHOWDB_PARTIAL_RESPONSE, + SHOWDB_DUPLICATE_DATABASES, + SHOWDB_FATAL_ERROR +} showdb_response_t; + +/** + * The state of the backend server reference + */ +typedef enum bref_state +{ + BREF_IN_USE = 0x01, + BREF_WAITING_RESULT = 0x02, /**< for session commands only */ + BREF_QUERY_ACTIVE = 0x04, /**< for other queries */ + BREF_CLOSED = 0x08, + BREF_DB_MAPPED = 0x10 +} bref_state_t; + +#define BREF_IS_NOT_USED(s) ((s)->bref_state & ~BREF_IN_USE) +#define BREF_IS_IN_USE(s) ((s)->bref_state & BREF_IN_USE) +#define BREF_IS_WAITING_RESULT(s) ((s)->bref_num_result_wait > 0) +#define BREF_IS_QUERY_ACTIVE(s) ((s)->bref_state & BREF_QUERY_ACTIVE) +#define BREF_IS_CLOSED(s) ((s)->bref_state & BREF_CLOSED) +#define BREF_IS_MAPPED(s) ((s)->bref_mapped) + +#define SCHEMA_ERR_DUPLICATEDB 5000 +#define SCHEMA_ERRSTR_DUPLICATEDB "DUPDB" +#define SCHEMA_ERR_DBNOTFOUND 1049 +#define SCHEMA_ERRSTR_DBNOTFOUND "42000" + +/** + * Route target types + */ +typedef enum +{ + TARGET_UNDEFINED = (1 << 0), + TARGET_NAMED_SERVER = (1 << 1), + TARGET_ALL = (1 << 2), + TARGET_ANY = (1 << 3) +} route_target_t; + +/** Helper macros for route target type */ +#define TARGET_IS_UNDEFINED(t) (t == TARGET_UNDEFINED) +#define TARGET_IS_NAMED_SERVER(t) (t & TARGET_NAMED_SERVER) +#define TARGET_IS_ALL(t) (t & TARGET_ALL) +#define TARGET_IS_ANY(t) (t & TARGET_ANY) + +/** + * Reference to BACKEND. + * + * Owned by router client session. + */ +typedef struct backend_ref_st +{ + int n_mapping_eof; + GWBUF* map_queue; + SERVER_REF* bref_backend; /*< Backend server */ + DCB* bref_dcb; /*< Backend DCB */ + int bref_state; /*< State of the backend */ + bool bref_mapped; /*< Whether the backend has been mapped */ + bool last_sescmd_replied; + int bref_num_result_wait; /*< Number of not yet received results */ + GWBUF* bref_pending_cmd; /*< Pending commands */ + + SessionCommandList session_commands; /**< List of session commands that are + * to be executed on this backend server */ +} backend_ref_t; + +class SchemaRouter; + +/** + * The client session structure used within this router. + */ +class SchemaRouterSession: public mxs::RouterSession +{ +public: + + SchemaRouterSession(MXS_SESSION* session, SchemaRouter& router); + + /** + * The RouterSession instance will be deleted when a client session + * has terminated. Will be called only after @c close() has been called. + */ + ~SchemaRouterSession(); + + /** + * Called when a client session has been closed. + */ + void close(); + + /** + * Called when a packet being is routed to the backend. The router should + * forward the packet to the appropriate server(s). + * + * @param pPacket A client packet. + */ + int32_t routeQuery(GWBUF* pPacket); + + /** + * Called when a packet is routed to the client. The router should + * forward the packet to the client using `MXS_SESSION_ROUTE_REPLY`. + * + * @param pPacket A client packet. + * @param pBackend The backend the packet is coming from. + */ + void clientReply(GWBUF* pPacket, DCB* pBackend); + + /** + * + * @param pMessage The rror message. + * @param pProblem The DCB on which the error occurred. + * @param action The context. + * @param pSuccess On output, if false, the session will be terminated. + */ + void handleError(GWBUF* pMessage, + DCB* pProblem, + mxs_error_action_t action, + bool* pSuccess); +private: + bool closed; /*< true when closeSession is called */ + DCB* rses_client_dcb; + MYSQL_session* rses_mysql_session; /*< Session client data (username, password, SHA1). */ + backend_ref_t* rses_backend_ref; /*< Pointer to backend reference array */ + schemarouter_config_t rses_config; /*< Copied config info from router instance */ + int rses_nbackends; /*< Number of backends */ + SchemaRouter& m_router; /*< The router instance */ + Shard shardmap; /**< Database hash containing names of the databases + * mapped to the servers that contain them */ + string connect_db; /*< Database the user was trying to connect to */ + string current_db; /*< Current active database */ + int state; /*< Initialization state bitmask */ + GWBUF* queue; /*< Query that was received before the session was ready */ + ROUTER_STATS stats; /*< Statistics for this router */ + + uint64_t sent_sescmd; /**< The latest session command being executed */ + uint64_t replied_sescmd; /**< The last session command reply that was sent to the client */ + + /** Internal functions */ + void synchronize_shard_map(); + + int inspect_backend_mapping_states(backend_ref_t *bref, GWBUF** wbuf); + + bool route_session_write(GWBUF* querybuf, uint8_t command); + + bool handle_error_new_connection(DCB* backend_dcb, GWBUF* errmsg); + void handle_error_reply_client(DCB* backend_dcb, GWBUF* errmsg); + int process_show_shards(); + bool handle_default_db(); + void route_queued_query(); + bool get_shard_dcb(DCB** dcb, char* name); + SERVER* get_shard_target(GWBUF* buffer, uint32_t qtype); + backend_ref_t* get_bref_from_dcb(DCB* dcb); + bool have_servers(); + bool send_database_list(); + int gen_databaselist(); + showdb_response_t parse_showdb_response(backend_ref_t* bref, GWBUF** buffer); + bool execute_sescmd_in_backend(backend_ref_t* backend_ref); +};