diff --git a/server/modules/routing/schemarouter/schemarouterinstance.cc b/server/modules/routing/schemarouter/schemarouterinstance.cc index a0d14cc66..e1f87f77b 100644 --- a/server/modules/routing/schemarouter/schemarouterinstance.cc +++ b/server/modules/routing/schemarouter/schemarouterinstance.cc @@ -41,29 +41,30 @@ using std::map; */ SchemaRouter::SchemaRouter(SERVICE *service, char **options): - mxs::Router(service) + mxs::Router(service), + m_service(service) { MXS_CONFIG_PARAMETER* conf; MXS_CONFIG_PARAMETER* param; /** Add default system databases to ignore */ - this->m_ignored_dbs.insert("mysql"); - this->m_ignored_dbs.insert("information_schema"); - this->m_ignored_dbs.insert("performance_schema"); - this->m_service = service; - this->m_stats.longest_sescmd = 0; - this->m_stats.n_hist_exceeded = 0; - this->m_stats.n_queries = 0; - this->m_stats.n_sescmd = 0; - this->m_stats.ses_longest = 0; - this->m_stats.ses_shortest = (double)((unsigned long)(~0)); - spinlock_init(&this->m_lock); + m_ignored_dbs.insert("mysql"); + m_ignored_dbs.insert("information_schema"); + m_ignored_dbs.insert("performance_schema"); + + m_stats.longest_sescmd = 0; + m_stats.n_hist_exceeded = 0; + m_stats.n_queries = 0; + m_stats.n_sescmd = 0; + m_stats.ses_longest = 0; + m_stats.ses_shortest = (double)((unsigned long)(~0)); + spinlock_init(&m_lock); conf = service->svc_config_param; - this->m_config.refresh_databases = config_get_bool(conf, "refresh_databases"); - this->m_config.refresh_min_interval = config_get_integer(conf, "refresh_interval"); - this->m_config.debug = config_get_bool(conf, "debug"); + m_config.refresh_databases = config_get_bool(conf, "refresh_databases"); + m_config.refresh_min_interval = config_get_integer(conf, "refresh_interval"); + m_config.debug = config_get_bool(conf, "debug"); if ((config_get_param(conf, "auth_all_servers")) == NULL) { @@ -96,8 +97,8 @@ SchemaRouter::SchemaRouter(SERVICE *service, char **options): throw std::bad_alloc(); } - this->m_ignore_regex = re; - this->m_ignore_match_data = match_data; + m_ignore_regex = re; + m_ignore_match_data = match_data; } if ((param = config_get_param(conf, "ignore_databases"))) @@ -111,7 +112,7 @@ SchemaRouter::SchemaRouter(SERVICE *service, char **options): while (tok) { - this->m_ignored_dbs.insert(tok); + m_ignored_dbs.insert(tok); tok = strtok_r(NULL, sep, &sptr); } } @@ -142,15 +143,15 @@ SchemaRouter::SchemaRouter(SERVICE *service, char **options): } else if (strcmp(options[i], "refresh_databases") == 0) { - this->m_config.refresh_databases = config_truth_value(value); + m_config.refresh_databases = config_truth_value(value); } else if (strcmp(options[i], "refresh_interval") == 0) { - this->m_config.refresh_min_interval = atof(value); + m_config.refresh_min_interval = atof(value); } else if (strcmp(options[i], "debug") == 0) { - this->m_config.debug = config_truth_value(value); + m_config.debug = config_truth_value(value); } else { @@ -168,14 +169,14 @@ SchemaRouter::SchemaRouter(SERVICE *service, char **options): SchemaRouter::~SchemaRouter() { - if (this->m_ignore_regex) + if (m_ignore_regex) { - pcre2_code_free(this->m_ignore_regex); + pcre2_code_free(m_ignore_regex); } - if (this->m_ignore_match_data) + if (m_ignore_match_data) { - pcre2_match_data_free(this->m_ignore_match_data); + pcre2_match_data_free(m_ignore_match_data); } } @@ -191,32 +192,32 @@ SchemaRouterSession* SchemaRouter::newSession(MXS_SESSION* pSession) void SchemaRouter::diagnostics(DCB* dcb) { - double sescmd_pct = this->m_stats.n_sescmd != 0 ? - 100.0 * ((double)this->m_stats.n_sescmd / (double)this->m_stats.n_queries) : + double sescmd_pct = m_stats.n_sescmd != 0 ? + 100.0 * ((double)m_stats.n_sescmd / (double)m_stats.n_queries) : 0.0; /** Session command statistics */ dcb_printf(dcb, "\n\33[1;4mSession Commands\33[0m\n"); dcb_printf(dcb, "Total number of queries: %d\n", - this->m_stats.n_queries); + m_stats.n_queries); dcb_printf(dcb, "Percentage of session commands: %.2f\n", sescmd_pct); dcb_printf(dcb, "Longest chain of stored session commands: %d\n", - this->m_stats.longest_sescmd); + m_stats.longest_sescmd); dcb_printf(dcb, "Session command history limit exceeded: %d times\n", - this->m_stats.n_hist_exceeded); + m_stats.n_hist_exceeded); /** Session time statistics */ - if (this->m_stats.sessions > 0) + if (m_stats.sessions > 0) { dcb_printf(dcb, "\n\33[1;4mSession Time Statistics\33[0m\n"); - dcb_printf(dcb, "Longest session: %.2lf seconds\n", this->m_stats.ses_longest); - dcb_printf(dcb, "Shortest session: %.2lf seconds\n", this->m_stats.ses_shortest); - dcb_printf(dcb, "Average session length: %.2lf seconds\n", this->m_stats.ses_average); + dcb_printf(dcb, "Longest session: %.2lf seconds\n", m_stats.ses_longest); + dcb_printf(dcb, "Shortest session: %.2lf seconds\n", m_stats.ses_shortest); + dcb_printf(dcb, "Average session length: %.2lf seconds\n", m_stats.ses_average); } - dcb_printf(dcb, "Shard map cache hits: %d\n", this->m_stats.shmap_cache_hit); - dcb_printf(dcb, "Shard map cache misses: %d\n", this->m_stats.shmap_cache_miss); + dcb_printf(dcb, "Shard map cache hits: %d\n", m_stats.shmap_cache_hit); + dcb_printf(dcb, "Shard map cache misses: %d\n", m_stats.shmap_cache_miss); dcb_printf(dcb, "\n"); } diff --git a/server/modules/routing/schemarouter/schemaroutersession.cc b/server/modules/routing/schemarouter/schemaroutersession.cc index ab2c4d67d..3994d3690 100644 --- a/server/modules/routing/schemarouter/schemaroutersession.cc +++ b/server/modules/routing/schemarouter/schemaroutersession.cc @@ -38,7 +38,17 @@ void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const c SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter& router): mxs::RouterSession(session), - m_router(router) + m_closed(false), + m_client(session->client_dcb), + m_mysql_session((MYSQL_session*)session->client_dcb->data), + m_backends(NULL), + m_config(router.m_config), + m_backend_count(0), + m_router(router), + m_shard(router.m_shard_manager.get_shard(m_client->user, m_config.refresh_min_interval)), + m_state(0), + m_sent_sescmd(0), + m_replied_sescmd(0) { char db[MYSQL_DATABASE_MAXLEN + 1] = ""; MySQLProtocol* protocol = (MySQLProtocol*)session->client_dcb->protocol; @@ -64,48 +74,16 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter& rou MXS_INFO("Client'%s' connecting with empty database.", data->user); } - SchemaRouterSession& client_rses = *this; - - this->m_router = router; - this->m_client = (DCB*)session->client_dcb; - this->m_closed = false; - this->m_sent_sescmd = 0; - this->m_replied_sescmd = 0; - - this->m_shard = router.m_shard_manager.get_shard(session->client_dcb->user, - router.m_config.refresh_min_interval); - - this->m_config = router.m_config; - if (using_db) { - this->m_state |= INIT_USE_DB; + m_state |= INIT_USE_DB; } - /** - * Set defaults to session variables. - */ - - /** - * Instead of calling this, ensure that there is at least one - * responding server. - */ int router_nservers = router.m_service->n_dbref; - - /** - * Create backend reference objects for this session. - */ - backend_ref_t* backend_ref = new backend_ref_t[router_nservers]; - - /** - * Initialize backend references with BACKEND ptr. - * Initialize session command cursors for each backend reference. - */ - int i = 0; - for (SERVER_REF *ref = router.m_service->dbref; ref; ref = ref->next) + for (SERVER_REF *ref = router.m_service->dbref; ref && i < router_nservers; ref = ref->next) { if (ref->active) { @@ -120,18 +98,14 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter& rou if (i < router_nservers) { + /** Service had less than the reported number of servers */ router_nservers = i; } - this->m_backends = backend_ref; - this->m_backend_count = router_nservers; + m_backends = backend_ref; + m_backend_count = router_nservers; - /** - * Connect to all backend servers - */ - bool succp = connect_backend_servers(backend_ref, router_nservers, session); - - if (!succp) + if (!connect_backend_servers(backend_ref, router_nservers, session)) { throw std::runtime_error("Failed to connect to backend servers"); } @@ -139,7 +113,7 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter& rou if (db[0]) { /* Store the database the client is connecting to */ - this->m_connect_db = db; + m_connect_db = db; } atomic_add(&router.m_stats.sessions, 1); @@ -147,28 +121,28 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter& rou SchemaRouterSession::~SchemaRouterSession() { - for (int i = 0; i < this->m_backend_count; i++) + for (int i = 0; i < m_backend_count; i++) { - gwbuf_free(this->m_backends[i].pending_cmd); + gwbuf_free(m_backends[i].pending_cmd); } - delete[] this->m_backends; + delete[] m_backends; } void SchemaRouterSession::close() { - ss_dassert(!this->m_closed); + ss_dassert(!m_closed); /** * Lock router client session for secure read and update. */ - if (!this->m_closed) + if (!m_closed) { - this->m_closed = true; + m_closed = true; - for (int i = 0; i < this->m_backend_count; i++) + for (int i = 0; i < m_backend_count; i++) { - backend_ref_t* bref = &this->m_backends[i]; + backend_ref_t* bref = &m_backends[i]; DCB* dcb = bref->dcb; /** Close those which had been connected */ if (BREF_IS_IN_USE(bref)) @@ -192,11 +166,11 @@ void SchemaRouterSession::close() } spinlock_acquire(&m_router.m_lock); - if (m_router.m_stats.longest_sescmd < this->m_stats.longest_sescmd) + if (m_router.m_stats.longest_sescmd < m_stats.longest_sescmd) { - m_router.m_stats.longest_sescmd = this->m_stats.longest_sescmd; + m_router.m_stats.longest_sescmd = m_stats.longest_sescmd; } - double ses_time = difftime(time(NULL), this->m_client->session->stats.connect); + double ses_time = difftime(time(NULL), m_client->session->stats.connect); if (m_router.m_stats.ses_longest < ses_time) { m_router.m_stats.ses_longest = ses_time; @@ -305,9 +279,9 @@ SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket, * the current default database or to the first available server. */ target = get_shard_target(pPacket, type); - if ((target == NULL && command != MYSQL_COM_INIT_DB && this->m_current_db.length() == 0) || + if ((target == NULL && command != MYSQL_COM_INIT_DB && m_current_db.length() == 0) || command == MYSQL_COM_FIELD_LIST || - this->m_current_db.length() == 0) + m_current_db.length() == 0) { /** No current database and no databases in query or the database is * ignored, route to first available backend. */ @@ -317,9 +291,9 @@ SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket, if (TARGET_IS_ANY(route_target)) { - for (int i = 0; i < this->m_backend_count; i++) + for (int i = 0; i < m_backend_count; i++) { - SERVER *server = this->m_backends[i].backend->server; + SERVER *server = m_backends[i].backend->server; if (SERVER_IS_RUNNING(server)) { route_target = TARGET_NAMED_SERVER; @@ -342,12 +316,12 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) { ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(pPacket)); - if (this->m_closed) + if (m_closed) { return 0; } - if (this->m_shard.empty()) + if (m_shard.empty()) { /* Generate database list */ gen_databaselist(); @@ -361,11 +335,11 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) * to store the query. Once the databases have been mapped and/or the * default database is taken into use we can send the query forward. */ - if (this->m_state & (INIT_MAPPING | INIT_USE_DB)) + if (m_state & (INIT_MAPPING | INIT_USE_DB)) { - this->m_queue.push_back(pPacket); + m_queue.push_back(pPacket); - if (this->m_state == (INIT_READY | INIT_USE_DB)) + if (m_state == (INIT_READY | INIT_USE_DB)) { /** * This state is possible if a client connects with a default database @@ -421,14 +395,14 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) char errbuf[128 + MYSQL_DATABASE_MAXLEN]; snprintf(errbuf, sizeof(errbuf), "Unknown database: %s", db); - if (this->m_config.debug) + if (m_config.debug) { sprintf(errbuf + strlen(errbuf), " ([%lu]: DB change failed)", - this->m_client->session->ses_id); + m_client->session->ses_id); } - write_error_to_client(this->m_client, + write_error_to_client(m_client, SCHEMA_ERR_DBNOTFOUND, SCHEMA_ERRSTR_DBNOTFOUND, errbuf); @@ -436,12 +410,12 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) } route_target = TARGET_UNDEFINED; - target = this->m_shard.get_location(this->m_current_db); + target = m_shard.get_location(m_current_db); if (target) { MXS_INFO("INIT_DB for database '%s' on server '%s'", - this->m_current_db.c_str(), target->unique_name); + m_current_db.c_str(), target->unique_name); route_target = TARGET_NAMED_SERVER; } else @@ -487,7 +461,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket) { /** Store current statement if execution of the previous * session command hasn't been completed. */ - ss_dassert((bref->pending_cmd == NULL || this->m_closed)); + ss_dassert((bref->pending_cmd == NULL || m_closed)); bref->pending_cmd = pPacket; ret = 1; } @@ -520,13 +494,13 @@ void SchemaRouterSession::handle_mapping_reply(backend_ref_t* bref, GWBUF* pPack if (rc == 1) { synchronize_shard_map(); - this->m_state &= ~INIT_MAPPING; + m_state &= ~INIT_MAPPING; /* Check if the session is reconnecting with a database name * that is not in the hashtable. If the database is not found * then close the session. */ - if (this->m_state & INIT_USE_DB) + if (m_state & INIT_USE_DB) { if (!handle_default_db()) { @@ -534,16 +508,16 @@ void SchemaRouterSession::handle_mapping_reply(backend_ref_t* bref, GWBUF* pPack } } - if (this->m_queue.size() && rc != -1) + if (m_queue.size() && rc != -1) { - ss_dassert(this->m_state == INIT_READY); + ss_dassert(m_state == INIT_READY); route_queued_query(); } } if (rc == -1) { - poll_fake_hangup_event(this->m_client); + poll_fake_hangup_event(m_client); } } @@ -554,11 +528,11 @@ void SchemaRouterSession::process_response(backend_ref_t* bref, GWBUF** ppPacket /** We are executing a session command */ if (GWBUF_IS_TYPE_SESCMD_RESPONSE((*ppPacket))) { - if (this->m_replied_sescmd < this->m_sent_sescmd && - bref->session_commands.front().get_position() == this->m_replied_sescmd + 1) + if (m_replied_sescmd < m_sent_sescmd && + bref->session_commands.front().get_position() == m_replied_sescmd + 1) { /** First reply to this session command, route it to the client */ - ++this->m_replied_sescmd; + ++m_replied_sescmd; } else { @@ -588,7 +562,7 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) { backend_ref_t* bref = get_bref_from_dcb(pDcb); - if (this->m_closed || bref == NULL) + if (m_closed || bref == NULL) { gwbuf_free(pPacket); return; @@ -597,32 +571,32 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) MXS_DEBUG("Reply from [%s] session [%p]" " mapping [%s] queries queued [%s]", bref->backend->server->unique_name, - this->m_client->session, - this->m_state & INIT_MAPPING ? "true" : "false", - this->m_queue.size() == 0 ? "none" : - this->m_queue.size() > 0 ? "multiple" : "one"); + m_client->session, + m_state & INIT_MAPPING ? "true" : "false", + m_queue.size() == 0 ? "none" : + m_queue.size() > 0 ? "multiple" : "one"); - if (this->m_state & INIT_MAPPING) + if (m_state & INIT_MAPPING) { handle_mapping_reply(bref, pPacket); } - else if (this->m_state & INIT_USE_DB) + else if (m_state & INIT_USE_DB) { MXS_DEBUG("Reply to USE '%s' received for session %p", - this->m_connect_db.c_str(), this->m_client->session); - this->m_state &= ~INIT_USE_DB; - this->m_current_db = this->m_connect_db; - ss_dassert(this->m_state == INIT_READY); + m_connect_db.c_str(), m_client->session); + m_state &= ~INIT_USE_DB; + m_current_db = m_connect_db; + ss_dassert(m_state == INIT_READY); - if (this->m_queue.size()) + if (m_queue.size()) { route_queued_query(); } } - else if (this->m_queue.size()) + else if (m_queue.size()) { - ss_dassert(this->m_state == INIT_READY); + ss_dassert(m_state == INIT_READY); route_queued_query(); } else @@ -650,7 +624,7 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb) if (ret == 1) { - atomic_add(&this->m_router.m_stats.n_queries, 1); + atomic_add(&m_router.m_stats.n_queries, 1); bref_set_state(bref, BREF_QUERY_ACTIVE); bref_set_state(bref, BREF_WAITING_RESULT); } @@ -714,7 +688,7 @@ void SchemaRouterSession::handleError(GWBUF* pMessage, void SchemaRouterSession::synchronize_shard_map() { m_router.m_stats.shmap_cache_miss++; - m_router.m_shard_manager.update_shard(this->m_shard, this->m_client->user); + m_router.m_shard_manager.update_shard(m_shard, m_client->user); } /** @@ -839,20 +813,20 @@ bool SchemaRouterSession::execute_sescmd_in_backend(backend_ref_t* backend_ref) bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) { bool succp = false; - backend_ref_t *backend_ref = this->m_backends; + backend_ref_t *backend_ref = m_backends; MXS_INFO("Session write, routing to all servers."); - atomic_add(&this->m_stats.longest_sescmd, 1); + atomic_add(&m_stats.longest_sescmd, 1); /** Increment the session command count */ - ++this->m_sent_sescmd; + ++m_sent_sescmd; - for (int i = 0; i < this->m_backend_count; i++) + for (int i = 0; i < m_backend_count; i++) { if (BREF_IS_IN_USE((&backend_ref[i]))) { GWBUF *buffer = gwbuf_clone(querybuf); - backend_ref[i].session_commands.push_back(SessionCommand(buffer, this->m_sent_sescmd)); + backend_ref[i].session_commands.push_back(SessionCommand(buffer, m_sent_sescmd)); if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO)) { @@ -861,7 +835,7 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command) "master" : "slave"), backend_ref[i].backend->server->name, backend_ref[i].backend->server->port, - (i + 1 == this->m_backend_count ? " <" : "")); + (i + 1 == m_backend_count ? " <" : "")); } if (backend_ref[i].session_commands.size() == 1) @@ -931,10 +905,10 @@ void SchemaRouterSession::handle_error_reply_client(DCB* dcb, GWBUF* errmsg) */ bool SchemaRouterSession::have_servers() { - for (int i = 0; i < this->m_backend_count; i++) + for (int i = 0; i < m_backend_count; i++) { - if (BREF_IS_IN_USE(&this->m_backends[i]) && - !BREF_IS_CLOSED(&this->m_backends[i])) + if (BREF_IS_IN_USE(&m_backends[i]) && + !BREF_IS_CLOSED(&m_backends[i])) { return true; } @@ -1003,11 +977,11 @@ backend_ref_t* SchemaRouterSession::get_bref_from_dcb(DCB* dcb) { CHK_DCB(dcb); - for (int i = 0; i < this->m_backend_count; i++) + for (int i = 0; i < m_backend_count; i++) { - if (this->m_backends[i].dcb == dcb) + if (m_backends[i].dcb == dcb) { - return &this->m_backends[i]; + return &m_backends[i]; } } @@ -1083,14 +1057,14 @@ bool SchemaRouterSession::process_show_shards() bool rval = false; ServerMap pContent; - this->m_shard.get_content(pContent); + m_shard.get_content(pContent); RESULTSET* rset = resultset_create(shard_list_cb, &pContent); if (rset) { resultset_add_column(rset, "Database", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); resultset_add_column(rset, "Server", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR); - resultset_stream_mysql(rset, this->m_client); + resultset_stream_mysql(rset, m_client); resultset_free(rset); rval = true; } @@ -1129,14 +1103,14 @@ void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const c bool SchemaRouterSession::handle_default_db() { bool rval = false; - SERVER* target = this->m_shard.get_location(this->m_connect_db); + SERVER* target = m_shard.get_location(m_connect_db); if (target) { /* Send a COM_INIT_DB packet to the server with the right database * and set it as the client's active database */ - unsigned int qlen = this->m_connect_db.length(); + unsigned int qlen = m_connect_db.length(); GWBUF* buffer = gwbuf_alloc(qlen + 5); if (buffer) @@ -1146,16 +1120,16 @@ bool SchemaRouterSession::handle_default_db() gwbuf_set_type(buffer, GWBUF_TYPE_MYSQL); data[3] = 0x0; data[4] = 0x2; - memcpy(data + 5, this->m_connect_db.c_str(), qlen); + memcpy(data + 5, m_connect_db.c_str(), qlen); DCB* dcb = NULL; if (get_shard_dcb(&dcb, target->unique_name)) { dcb->func.write(dcb, buffer); MXS_DEBUG("USE '%s' sent to %s for session %p", - this->m_connect_db.c_str(), + m_connect_db.c_str(), target->unique_name, - this->m_client->session); + m_client->session); rval = true; } else @@ -1171,15 +1145,15 @@ bool SchemaRouterSession::handle_default_db() else { /** Unknown database, hang up on the client*/ - MXS_INFO("Connecting to a non-existent database '%s'", this->m_connect_db.c_str()); + MXS_INFO("Connecting to a non-existent database '%s'", m_connect_db.c_str()); char errmsg[128 + MYSQL_DATABASE_MAXLEN + 1]; - sprintf(errmsg, "Unknown database '%s'", this->m_connect_db.c_str()); - if (this->m_config.debug) + sprintf(errmsg, "Unknown database '%s'", m_connect_db.c_str()); + if (m_config.debug) { sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)", - this->m_client->session->ses_id); + m_client->session->ses_id); } - write_error_to_client(this->m_client, + write_error_to_client(m_client, SCHEMA_ERR_DBNOTFOUND, SCHEMA_ERRSTR_DBNOTFOUND, errmsg); @@ -1190,18 +1164,18 @@ bool SchemaRouterSession::handle_default_db() void SchemaRouterSession::route_queued_query() { - GWBUF* tmp = this->m_queue.front().release(); - this->m_queue.pop_front(); + GWBUF* tmp = m_queue.front().release(); + m_queue.pop_front(); #ifdef SS_DEBUG char* querystr = modutil_get_SQL(tmp); MXS_DEBUG("Sending queued buffer for session %p: %s", - this->m_client->session, + m_client->session, querystr); MXS_FREE(querystr); #endif - poll_add_epollin_event_to_dcb(this->m_client, tmp); + poll_add_epollin_event_to_dcb(m_client, tmp); } /** @@ -1214,9 +1188,9 @@ int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref, { bool mapped = true; GWBUF* writebuf = *wbuf; - backend_ref_t* bkrf = this->m_backends; + backend_ref_t* bkrf = m_backends; - for (int i = 0; i < this->m_backend_count; i++) + for (int i = 0; i < m_backend_count; i++) { if (bref->dcb == bkrf[i].dcb && !BREF_IS_MAPPED(&bkrf[i])) { @@ -1225,28 +1199,28 @@ int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref, writebuf = gwbuf_append(bref->map_queue, writebuf); bref->map_queue = NULL; } - showdb_response_t rc = parse_showdb_response(&this->m_backends[i], + showdb_response_t rc = parse_showdb_response(&m_backends[i], &writebuf); if (rc == SHOWDB_FULL_RESPONSE) { - this->m_backends[i].mapped = true; + m_backends[i].mapped = true; MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p", - this->m_backends[i].backend->server->unique_name, - this->m_client->session); + m_backends[i].backend->server->unique_name, + m_client->session); } else if (rc == SHOWDB_PARTIAL_RESPONSE) { bref->map_queue = writebuf; writebuf = NULL; MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p", - this->m_backends[i].backend->server->unique_name, - this->m_client->session); + m_backends[i].backend->server->unique_name, + m_client->session); } else { DCB* client_dcb = NULL; - if ((this->m_state & INIT_FAILED) == 0) + if ((m_state & INIT_FAILED) == 0) { if (rc == SHOWDB_DUPLICATE_DATABASES) { @@ -1256,16 +1230,16 @@ int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref, { MXS_ERROR("Fatal error when processing SHOW DATABASES response, closing session."); } - client_dcb = this->m_client; + client_dcb = m_client; /** This is the first response to the database mapping which * has duplicate database conflict. Set the initialization bitmask * to INIT_FAILED */ - this->m_state |= INIT_FAILED; + m_state |= INIT_FAILED; /** Send the client an error about duplicate databases * if there is a queued query from the client. */ - if (this->m_queue.size()) + if (m_queue.size()) { GWBUF* error = modutil_create_mysql_err_msg(1, 0, SCHEMA_ERR_DUPLICATEDB, @@ -1293,7 +1267,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref, mapped = false; MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p", bkrf[i].backend->server->unique_name, - this->m_client->session); + m_client->session); } } *wbuf = writebuf; @@ -1496,25 +1470,25 @@ showdb_response_t SchemaRouterSession::parse_showdb_response(backend_ref_t* bref if (data) { - if (this->m_shard.add_location(data, target)) + if (m_shard.add_location(data, target)) { MXS_INFO("<%s, %s>", target->unique_name, data); } else { - if (!(this->m_router.m_ignored_dbs.find(data) != this->m_router.m_ignored_dbs.end() || - (this->m_router.m_ignore_regex && - pcre2_match(this->m_router.m_ignore_regex, (PCRE2_SPTR)data, + if (!(m_router.m_ignored_dbs.find(data) != m_router.m_ignored_dbs.end() || + (m_router.m_ignore_regex && + pcre2_match(m_router.m_ignore_regex, (PCRE2_SPTR)data, PCRE2_ZERO_TERMINATED, 0, 0, - this->m_router.m_ignore_match_data, NULL) >= 0))) + m_router.m_ignore_match_data, NULL) >= 0))) { duplicate_found = true; - SERVER *duplicate = this->m_shard.get_location(data); + SERVER *duplicate = m_shard.get_location(data); MXS_ERROR("Database '%s' found on servers '%s' and '%s' for user %s@%s.", data, target->unique_name, duplicate->unique_name, - this->m_client->user, - this->m_client->remote); + m_client->user, + m_client->remote); } } MXS_FREE(data); @@ -1565,14 +1539,14 @@ int SchemaRouterSession::gen_databaselist() int i, rval = 0; unsigned int len; - for (i = 0; i < this->m_backend_count; i++) + for (i = 0; i < m_backend_count; i++) { - this->m_backends[i].mapped = false; - this->m_backends[i].n_mapping_eof = 0; + m_backends[i].mapped = false; + m_backends[i].n_mapping_eof = 0; } - this->m_state |= INIT_MAPPING; - this->m_state &= ~INIT_UNINT; + m_state |= INIT_MAPPING; + m_state &= ~INIT_UNINT; len = strlen(query) + 1; buffer = gwbuf_alloc(len + 4); uint8_t *data = GWBUF_DATA(buffer); @@ -1583,18 +1557,18 @@ int SchemaRouterSession::gen_databaselist() *(data + 4) = 0x03; memcpy(data + 5, query, strlen(query)); - for (i = 0; i < this->m_backend_count; i++) + for (i = 0; i < m_backend_count; i++) { - if (BREF_IS_IN_USE(&this->m_backends[i]) && - !BREF_IS_CLOSED(&this->m_backends[i]) & - SERVER_IS_RUNNING(this->m_backends[i].backend->server)) + if (BREF_IS_IN_USE(&m_backends[i]) && + !BREF_IS_CLOSED(&m_backends[i]) & + SERVER_IS_RUNNING(m_backends[i].backend->server)) { clone = gwbuf_clone(buffer); - dcb = this->m_backends[i].dcb; + dcb = m_backends[i].dcb; rval |= !dcb->func.write(dcb, clone); MXS_DEBUG("Wrote SHOW DATABASES to %s for session %p: returned %d", - this->m_backends[i].backend->server->unique_name, - this->m_client->session, + m_backends[i].backend->server->unique_name, + m_client->session, rval); } } @@ -1628,7 +1602,7 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) } else { - SERVER* target = this->m_shard.get_location(info[i].database); + SERVER* target = m_shard.get_location(info[i].database); if (target) { @@ -1665,7 +1639,7 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) if (tok) { - rval = this->m_shard.get_location(tok); + rval = m_shard.get_location(tok); if (rval) { @@ -1677,12 +1651,12 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) if (rval == NULL) { - rval = this->m_shard.get_location(this->m_current_db); + rval = m_shard.get_location(m_current_db); if (rval) { MXS_INFO("SHOW TABLES query, current database '%s' on server '%s'", - this->m_current_db.c_str(), rval->unique_name); + m_current_db.c_str(), rval->unique_name); } } else @@ -1692,30 +1666,30 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype) } else if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER) { - for (int i = 0; i < this->m_backend_count; i++) + for (int i = 0; i < m_backend_count; i++) { - char *srvnm = this->m_backends[i].backend->server->unique_name; + char *srvnm = m_backends[i].backend->server->unique_name; if (strcmp(srvnm, (char*)buffer->hint->data) == 0) { - rval = this->m_backends[i].backend->server; + rval = m_backends[i].backend->server; MXS_INFO("Routing hint found (%s)", rval->unique_name); } } - if (rval == NULL && !has_dbs && this->m_current_db.length()) + if (rval == NULL && !has_dbs && m_current_db.length()) { /** * If the target name has not been found and the session has an * active database, set is as the target */ - rval = this->m_shard.get_location(this->m_current_db); + rval = m_shard.get_location(m_current_db); if (rval) { MXS_INFO("Using active database '%s' on '%s'", - this->m_current_db.c_str(), rval->unique_name); + m_current_db.c_str(), rval->unique_name); } } } @@ -1749,9 +1723,9 @@ bool SchemaRouterSession::get_shard_dcb(DCB** p_dcb, char* name) { goto return_succp; } - backend_ref = this->m_backends; + backend_ref = m_backends; - for (i = 0; i < this->m_backend_count; i++) + for (i = 0; i < m_backend_count; i++) { SERVER_REF* b = backend_ref[i].backend; /** @@ -1854,14 +1828,14 @@ bool SchemaRouterSession::send_database_list() bool rval = false; ServerMap dblist; - this->m_shard.get_content(dblist); + m_shard.get_content(dblist); RESULTSET* resultset = resultset_create(result_set_cb, &dblist); if (resultset_add_column(resultset, "Database", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR)) { - resultset_stream_mysql(resultset, this->m_client); + resultset_stream_mysql(resultset, m_client); rval = true; } resultset_free(resultset); diff --git a/server/modules/routing/schemarouter/session_command.cc b/server/modules/routing/schemarouter/session_command.cc index a0e49b57e..07d3832fe 100644 --- a/server/modules/routing/schemarouter/session_command.cc +++ b/server/modules/routing/schemarouter/session_command.cc @@ -17,12 +17,12 @@ void SessionCommand::mark_reply_received() { - m_replySent = true; + m_reply_sent = true; } bool SessionCommand::is_reply_received() const { - return m_replySent; + return m_reply_sent; } uint8_t SessionCommand::get_command() const @@ -44,7 +44,7 @@ SessionCommand::SessionCommand(GWBUF *buffer, uint64_t id): m_buffer(buffer), m_command(0), m_pos(id), - m_replySent(false) + m_reply_sent(false) { if (buffer) { diff --git a/server/modules/routing/schemarouter/session_command.hh b/server/modules/routing/schemarouter/session_command.hh index f0aefe5c8..af1a939f3 100644 --- a/server/modules/routing/schemarouter/session_command.hh +++ b/server/modules/routing/schemarouter/session_command.hh @@ -79,7 +79,7 @@ private: Buffer m_buffer; /**< The buffer containing the command */ uint8_t m_command; /**< The command being executed */ uint64_t m_pos; /**< Unique position identifier */ - bool m_replySent; /**< Whether the session command reply has been sent */ + bool m_reply_sent; /**< Whether the session command reply has been sent */ SessionCommand(); SessionCommand& operator = (const SessionCommand& command);