Fix variable naming and usage
Don't use `this->` when it's not needed. Use snake_case for member variables. Initialize the members using a initialization list.
This commit is contained in:
@ -41,29 +41,30 @@ using std::map;
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
SchemaRouter::SchemaRouter(SERVICE *service, char **options):
|
SchemaRouter::SchemaRouter(SERVICE *service, char **options):
|
||||||
mxs::Router<SchemaRouter, SchemaRouterSession>(service)
|
mxs::Router<SchemaRouter, SchemaRouterSession>(service),
|
||||||
|
m_service(service)
|
||||||
{
|
{
|
||||||
MXS_CONFIG_PARAMETER* conf;
|
MXS_CONFIG_PARAMETER* conf;
|
||||||
MXS_CONFIG_PARAMETER* param;
|
MXS_CONFIG_PARAMETER* param;
|
||||||
|
|
||||||
/** Add default system databases to ignore */
|
/** Add default system databases to ignore */
|
||||||
this->m_ignored_dbs.insert("mysql");
|
m_ignored_dbs.insert("mysql");
|
||||||
this->m_ignored_dbs.insert("information_schema");
|
m_ignored_dbs.insert("information_schema");
|
||||||
this->m_ignored_dbs.insert("performance_schema");
|
m_ignored_dbs.insert("performance_schema");
|
||||||
this->m_service = service;
|
|
||||||
this->m_stats.longest_sescmd = 0;
|
m_stats.longest_sescmd = 0;
|
||||||
this->m_stats.n_hist_exceeded = 0;
|
m_stats.n_hist_exceeded = 0;
|
||||||
this->m_stats.n_queries = 0;
|
m_stats.n_queries = 0;
|
||||||
this->m_stats.n_sescmd = 0;
|
m_stats.n_sescmd = 0;
|
||||||
this->m_stats.ses_longest = 0;
|
m_stats.ses_longest = 0;
|
||||||
this->m_stats.ses_shortest = (double)((unsigned long)(~0));
|
m_stats.ses_shortest = (double)((unsigned long)(~0));
|
||||||
spinlock_init(&this->m_lock);
|
spinlock_init(&m_lock);
|
||||||
|
|
||||||
conf = service->svc_config_param;
|
conf = service->svc_config_param;
|
||||||
|
|
||||||
this->m_config.refresh_databases = config_get_bool(conf, "refresh_databases");
|
m_config.refresh_databases = config_get_bool(conf, "refresh_databases");
|
||||||
this->m_config.refresh_min_interval = config_get_integer(conf, "refresh_interval");
|
m_config.refresh_min_interval = config_get_integer(conf, "refresh_interval");
|
||||||
this->m_config.debug = config_get_bool(conf, "debug");
|
m_config.debug = config_get_bool(conf, "debug");
|
||||||
|
|
||||||
if ((config_get_param(conf, "auth_all_servers")) == NULL)
|
if ((config_get_param(conf, "auth_all_servers")) == NULL)
|
||||||
{
|
{
|
||||||
@ -96,8 +97,8 @@ SchemaRouter::SchemaRouter(SERVICE *service, char **options):
|
|||||||
throw std::bad_alloc();
|
throw std::bad_alloc();
|
||||||
}
|
}
|
||||||
|
|
||||||
this->m_ignore_regex = re;
|
m_ignore_regex = re;
|
||||||
this->m_ignore_match_data = match_data;
|
m_ignore_match_data = match_data;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((param = config_get_param(conf, "ignore_databases")))
|
if ((param = config_get_param(conf, "ignore_databases")))
|
||||||
@ -111,7 +112,7 @@ SchemaRouter::SchemaRouter(SERVICE *service, char **options):
|
|||||||
|
|
||||||
while (tok)
|
while (tok)
|
||||||
{
|
{
|
||||||
this->m_ignored_dbs.insert(tok);
|
m_ignored_dbs.insert(tok);
|
||||||
tok = strtok_r(NULL, sep, &sptr);
|
tok = strtok_r(NULL, sep, &sptr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -142,15 +143,15 @@ SchemaRouter::SchemaRouter(SERVICE *service, char **options):
|
|||||||
}
|
}
|
||||||
else if (strcmp(options[i], "refresh_databases") == 0)
|
else if (strcmp(options[i], "refresh_databases") == 0)
|
||||||
{
|
{
|
||||||
this->m_config.refresh_databases = config_truth_value(value);
|
m_config.refresh_databases = config_truth_value(value);
|
||||||
}
|
}
|
||||||
else if (strcmp(options[i], "refresh_interval") == 0)
|
else if (strcmp(options[i], "refresh_interval") == 0)
|
||||||
{
|
{
|
||||||
this->m_config.refresh_min_interval = atof(value);
|
m_config.refresh_min_interval = atof(value);
|
||||||
}
|
}
|
||||||
else if (strcmp(options[i], "debug") == 0)
|
else if (strcmp(options[i], "debug") == 0)
|
||||||
{
|
{
|
||||||
this->m_config.debug = config_truth_value(value);
|
m_config.debug = config_truth_value(value);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -168,14 +169,14 @@ SchemaRouter::SchemaRouter(SERVICE *service, char **options):
|
|||||||
|
|
||||||
SchemaRouter::~SchemaRouter()
|
SchemaRouter::~SchemaRouter()
|
||||||
{
|
{
|
||||||
if (this->m_ignore_regex)
|
if (m_ignore_regex)
|
||||||
{
|
{
|
||||||
pcre2_code_free(this->m_ignore_regex);
|
pcre2_code_free(m_ignore_regex);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this->m_ignore_match_data)
|
if (m_ignore_match_data)
|
||||||
{
|
{
|
||||||
pcre2_match_data_free(this->m_ignore_match_data);
|
pcre2_match_data_free(m_ignore_match_data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -191,32 +192,32 @@ SchemaRouterSession* SchemaRouter::newSession(MXS_SESSION* pSession)
|
|||||||
|
|
||||||
void SchemaRouter::diagnostics(DCB* dcb)
|
void SchemaRouter::diagnostics(DCB* dcb)
|
||||||
{
|
{
|
||||||
double sescmd_pct = this->m_stats.n_sescmd != 0 ?
|
double sescmd_pct = m_stats.n_sescmd != 0 ?
|
||||||
100.0 * ((double)this->m_stats.n_sescmd / (double)this->m_stats.n_queries) :
|
100.0 * ((double)m_stats.n_sescmd / (double)m_stats.n_queries) :
|
||||||
0.0;
|
0.0;
|
||||||
|
|
||||||
/** Session command statistics */
|
/** Session command statistics */
|
||||||
dcb_printf(dcb, "\n\33[1;4mSession Commands\33[0m\n");
|
dcb_printf(dcb, "\n\33[1;4mSession Commands\33[0m\n");
|
||||||
dcb_printf(dcb, "Total number of queries: %d\n",
|
dcb_printf(dcb, "Total number of queries: %d\n",
|
||||||
this->m_stats.n_queries);
|
m_stats.n_queries);
|
||||||
dcb_printf(dcb, "Percentage of session commands: %.2f\n",
|
dcb_printf(dcb, "Percentage of session commands: %.2f\n",
|
||||||
sescmd_pct);
|
sescmd_pct);
|
||||||
dcb_printf(dcb, "Longest chain of stored session commands: %d\n",
|
dcb_printf(dcb, "Longest chain of stored session commands: %d\n",
|
||||||
this->m_stats.longest_sescmd);
|
m_stats.longest_sescmd);
|
||||||
dcb_printf(dcb, "Session command history limit exceeded: %d times\n",
|
dcb_printf(dcb, "Session command history limit exceeded: %d times\n",
|
||||||
this->m_stats.n_hist_exceeded);
|
m_stats.n_hist_exceeded);
|
||||||
|
|
||||||
/** Session time statistics */
|
/** Session time statistics */
|
||||||
|
|
||||||
if (this->m_stats.sessions > 0)
|
if (m_stats.sessions > 0)
|
||||||
{
|
{
|
||||||
dcb_printf(dcb, "\n\33[1;4mSession Time Statistics\33[0m\n");
|
dcb_printf(dcb, "\n\33[1;4mSession Time Statistics\33[0m\n");
|
||||||
dcb_printf(dcb, "Longest session: %.2lf seconds\n", this->m_stats.ses_longest);
|
dcb_printf(dcb, "Longest session: %.2lf seconds\n", m_stats.ses_longest);
|
||||||
dcb_printf(dcb, "Shortest session: %.2lf seconds\n", this->m_stats.ses_shortest);
|
dcb_printf(dcb, "Shortest session: %.2lf seconds\n", m_stats.ses_shortest);
|
||||||
dcb_printf(dcb, "Average session length: %.2lf seconds\n", this->m_stats.ses_average);
|
dcb_printf(dcb, "Average session length: %.2lf seconds\n", m_stats.ses_average);
|
||||||
}
|
}
|
||||||
dcb_printf(dcb, "Shard map cache hits: %d\n", this->m_stats.shmap_cache_hit);
|
dcb_printf(dcb, "Shard map cache hits: %d\n", m_stats.shmap_cache_hit);
|
||||||
dcb_printf(dcb, "Shard map cache misses: %d\n", this->m_stats.shmap_cache_miss);
|
dcb_printf(dcb, "Shard map cache misses: %d\n", m_stats.shmap_cache_miss);
|
||||||
dcb_printf(dcb, "\n");
|
dcb_printf(dcb, "\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,7 +38,17 @@ void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const c
|
|||||||
|
|
||||||
SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter& router):
|
SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter& router):
|
||||||
mxs::RouterSession(session),
|
mxs::RouterSession(session),
|
||||||
m_router(router)
|
m_closed(false),
|
||||||
|
m_client(session->client_dcb),
|
||||||
|
m_mysql_session((MYSQL_session*)session->client_dcb->data),
|
||||||
|
m_backends(NULL),
|
||||||
|
m_config(router.m_config),
|
||||||
|
m_backend_count(0),
|
||||||
|
m_router(router),
|
||||||
|
m_shard(router.m_shard_manager.get_shard(m_client->user, m_config.refresh_min_interval)),
|
||||||
|
m_state(0),
|
||||||
|
m_sent_sescmd(0),
|
||||||
|
m_replied_sescmd(0)
|
||||||
{
|
{
|
||||||
char db[MYSQL_DATABASE_MAXLEN + 1] = "";
|
char db[MYSQL_DATABASE_MAXLEN + 1] = "";
|
||||||
MySQLProtocol* protocol = (MySQLProtocol*)session->client_dcb->protocol;
|
MySQLProtocol* protocol = (MySQLProtocol*)session->client_dcb->protocol;
|
||||||
@ -64,48 +74,16 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter& rou
|
|||||||
MXS_INFO("Client'%s' connecting with empty database.", data->user);
|
MXS_INFO("Client'%s' connecting with empty database.", data->user);
|
||||||
}
|
}
|
||||||
|
|
||||||
SchemaRouterSession& client_rses = *this;
|
|
||||||
|
|
||||||
this->m_router = router;
|
|
||||||
this->m_client = (DCB*)session->client_dcb;
|
|
||||||
this->m_closed = false;
|
|
||||||
this->m_sent_sescmd = 0;
|
|
||||||
this->m_replied_sescmd = 0;
|
|
||||||
|
|
||||||
this->m_shard = router.m_shard_manager.get_shard(session->client_dcb->user,
|
|
||||||
router.m_config.refresh_min_interval);
|
|
||||||
|
|
||||||
this->m_config = router.m_config;
|
|
||||||
|
|
||||||
if (using_db)
|
if (using_db)
|
||||||
{
|
{
|
||||||
this->m_state |= INIT_USE_DB;
|
m_state |= INIT_USE_DB;
|
||||||
}
|
}
|
||||||
/**
|
|
||||||
* Set defaults to session variables.
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Instead of calling this, ensure that there is at least one
|
|
||||||
* responding server.
|
|
||||||
*/
|
|
||||||
|
|
||||||
int router_nservers = router.m_service->n_dbref;
|
int router_nservers = router.m_service->n_dbref;
|
||||||
|
|
||||||
/**
|
|
||||||
* Create backend reference objects for this session.
|
|
||||||
*/
|
|
||||||
|
|
||||||
backend_ref_t* backend_ref = new backend_ref_t[router_nservers];
|
backend_ref_t* backend_ref = new backend_ref_t[router_nservers];
|
||||||
|
|
||||||
/**
|
|
||||||
* Initialize backend references with BACKEND ptr.
|
|
||||||
* Initialize session command cursors for each backend reference.
|
|
||||||
*/
|
|
||||||
|
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
|
||||||
for (SERVER_REF *ref = router.m_service->dbref; ref; ref = ref->next)
|
for (SERVER_REF *ref = router.m_service->dbref; ref && i < router_nservers; ref = ref->next)
|
||||||
{
|
{
|
||||||
if (ref->active)
|
if (ref->active)
|
||||||
{
|
{
|
||||||
@ -120,18 +98,14 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter& rou
|
|||||||
|
|
||||||
if (i < router_nservers)
|
if (i < router_nservers)
|
||||||
{
|
{
|
||||||
|
/** Service had less than the reported number of servers */
|
||||||
router_nservers = i;
|
router_nservers = i;
|
||||||
}
|
}
|
||||||
|
|
||||||
this->m_backends = backend_ref;
|
m_backends = backend_ref;
|
||||||
this->m_backend_count = router_nservers;
|
m_backend_count = router_nservers;
|
||||||
|
|
||||||
/**
|
if (!connect_backend_servers(backend_ref, router_nservers, session))
|
||||||
* Connect to all backend servers
|
|
||||||
*/
|
|
||||||
bool succp = connect_backend_servers(backend_ref, router_nservers, session);
|
|
||||||
|
|
||||||
if (!succp)
|
|
||||||
{
|
{
|
||||||
throw std::runtime_error("Failed to connect to backend servers");
|
throw std::runtime_error("Failed to connect to backend servers");
|
||||||
}
|
}
|
||||||
@ -139,7 +113,7 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter& rou
|
|||||||
if (db[0])
|
if (db[0])
|
||||||
{
|
{
|
||||||
/* Store the database the client is connecting to */
|
/* Store the database the client is connecting to */
|
||||||
this->m_connect_db = db;
|
m_connect_db = db;
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_add(&router.m_stats.sessions, 1);
|
atomic_add(&router.m_stats.sessions, 1);
|
||||||
@ -147,28 +121,28 @@ SchemaRouterSession::SchemaRouterSession(MXS_SESSION* session, SchemaRouter& rou
|
|||||||
|
|
||||||
SchemaRouterSession::~SchemaRouterSession()
|
SchemaRouterSession::~SchemaRouterSession()
|
||||||
{
|
{
|
||||||
for (int i = 0; i < this->m_backend_count; i++)
|
for (int i = 0; i < m_backend_count; i++)
|
||||||
{
|
{
|
||||||
gwbuf_free(this->m_backends[i].pending_cmd);
|
gwbuf_free(m_backends[i].pending_cmd);
|
||||||
}
|
}
|
||||||
|
|
||||||
delete[] this->m_backends;
|
delete[] m_backends;
|
||||||
}
|
}
|
||||||
|
|
||||||
void SchemaRouterSession::close()
|
void SchemaRouterSession::close()
|
||||||
{
|
{
|
||||||
ss_dassert(!this->m_closed);
|
ss_dassert(!m_closed);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Lock router client session for secure read and update.
|
* Lock router client session for secure read and update.
|
||||||
*/
|
*/
|
||||||
if (!this->m_closed)
|
if (!m_closed)
|
||||||
{
|
{
|
||||||
this->m_closed = true;
|
m_closed = true;
|
||||||
|
|
||||||
for (int i = 0; i < this->m_backend_count; i++)
|
for (int i = 0; i < m_backend_count; i++)
|
||||||
{
|
{
|
||||||
backend_ref_t* bref = &this->m_backends[i];
|
backend_ref_t* bref = &m_backends[i];
|
||||||
DCB* dcb = bref->dcb;
|
DCB* dcb = bref->dcb;
|
||||||
/** Close those which had been connected */
|
/** Close those which had been connected */
|
||||||
if (BREF_IS_IN_USE(bref))
|
if (BREF_IS_IN_USE(bref))
|
||||||
@ -192,11 +166,11 @@ void SchemaRouterSession::close()
|
|||||||
}
|
}
|
||||||
|
|
||||||
spinlock_acquire(&m_router.m_lock);
|
spinlock_acquire(&m_router.m_lock);
|
||||||
if (m_router.m_stats.longest_sescmd < this->m_stats.longest_sescmd)
|
if (m_router.m_stats.longest_sescmd < m_stats.longest_sescmd)
|
||||||
{
|
{
|
||||||
m_router.m_stats.longest_sescmd = this->m_stats.longest_sescmd;
|
m_router.m_stats.longest_sescmd = m_stats.longest_sescmd;
|
||||||
}
|
}
|
||||||
double ses_time = difftime(time(NULL), this->m_client->session->stats.connect);
|
double ses_time = difftime(time(NULL), m_client->session->stats.connect);
|
||||||
if (m_router.m_stats.ses_longest < ses_time)
|
if (m_router.m_stats.ses_longest < ses_time)
|
||||||
{
|
{
|
||||||
m_router.m_stats.ses_longest = ses_time;
|
m_router.m_stats.ses_longest = ses_time;
|
||||||
@ -305,9 +279,9 @@ SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket,
|
|||||||
* the current default database or to the first available server. */
|
* the current default database or to the first available server. */
|
||||||
target = get_shard_target(pPacket, type);
|
target = get_shard_target(pPacket, type);
|
||||||
|
|
||||||
if ((target == NULL && command != MYSQL_COM_INIT_DB && this->m_current_db.length() == 0) ||
|
if ((target == NULL && command != MYSQL_COM_INIT_DB && m_current_db.length() == 0) ||
|
||||||
command == MYSQL_COM_FIELD_LIST ||
|
command == MYSQL_COM_FIELD_LIST ||
|
||||||
this->m_current_db.length() == 0)
|
m_current_db.length() == 0)
|
||||||
{
|
{
|
||||||
/** No current database and no databases in query or the database is
|
/** No current database and no databases in query or the database is
|
||||||
* ignored, route to first available backend. */
|
* ignored, route to first available backend. */
|
||||||
@ -317,9 +291,9 @@ SERVER* SchemaRouterSession::resolve_query_target(GWBUF* pPacket,
|
|||||||
|
|
||||||
if (TARGET_IS_ANY(route_target))
|
if (TARGET_IS_ANY(route_target))
|
||||||
{
|
{
|
||||||
for (int i = 0; i < this->m_backend_count; i++)
|
for (int i = 0; i < m_backend_count; i++)
|
||||||
{
|
{
|
||||||
SERVER *server = this->m_backends[i].backend->server;
|
SERVER *server = m_backends[i].backend->server;
|
||||||
if (SERVER_IS_RUNNING(server))
|
if (SERVER_IS_RUNNING(server))
|
||||||
{
|
{
|
||||||
route_target = TARGET_NAMED_SERVER;
|
route_target = TARGET_NAMED_SERVER;
|
||||||
@ -342,12 +316,12 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
|
|||||||
{
|
{
|
||||||
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(pPacket));
|
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(pPacket));
|
||||||
|
|
||||||
if (this->m_closed)
|
if (m_closed)
|
||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this->m_shard.empty())
|
if (m_shard.empty())
|
||||||
{
|
{
|
||||||
/* Generate database list */
|
/* Generate database list */
|
||||||
gen_databaselist();
|
gen_databaselist();
|
||||||
@ -361,11 +335,11 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
|
|||||||
* to store the query. Once the databases have been mapped and/or the
|
* to store the query. Once the databases have been mapped and/or the
|
||||||
* default database is taken into use we can send the query forward.
|
* default database is taken into use we can send the query forward.
|
||||||
*/
|
*/
|
||||||
if (this->m_state & (INIT_MAPPING | INIT_USE_DB))
|
if (m_state & (INIT_MAPPING | INIT_USE_DB))
|
||||||
{
|
{
|
||||||
this->m_queue.push_back(pPacket);
|
m_queue.push_back(pPacket);
|
||||||
|
|
||||||
if (this->m_state == (INIT_READY | INIT_USE_DB))
|
if (m_state == (INIT_READY | INIT_USE_DB))
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* This state is possible if a client connects with a default database
|
* This state is possible if a client connects with a default database
|
||||||
@ -421,14 +395,14 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
|
|||||||
char errbuf[128 + MYSQL_DATABASE_MAXLEN];
|
char errbuf[128 + MYSQL_DATABASE_MAXLEN];
|
||||||
snprintf(errbuf, sizeof(errbuf), "Unknown database: %s", db);
|
snprintf(errbuf, sizeof(errbuf), "Unknown database: %s", db);
|
||||||
|
|
||||||
if (this->m_config.debug)
|
if (m_config.debug)
|
||||||
{
|
{
|
||||||
sprintf(errbuf + strlen(errbuf),
|
sprintf(errbuf + strlen(errbuf),
|
||||||
" ([%lu]: DB change failed)",
|
" ([%lu]: DB change failed)",
|
||||||
this->m_client->session->ses_id);
|
m_client->session->ses_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
write_error_to_client(this->m_client,
|
write_error_to_client(m_client,
|
||||||
SCHEMA_ERR_DBNOTFOUND,
|
SCHEMA_ERR_DBNOTFOUND,
|
||||||
SCHEMA_ERRSTR_DBNOTFOUND,
|
SCHEMA_ERRSTR_DBNOTFOUND,
|
||||||
errbuf);
|
errbuf);
|
||||||
@ -436,12 +410,12 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
|
|||||||
}
|
}
|
||||||
|
|
||||||
route_target = TARGET_UNDEFINED;
|
route_target = TARGET_UNDEFINED;
|
||||||
target = this->m_shard.get_location(this->m_current_db);
|
target = m_shard.get_location(m_current_db);
|
||||||
|
|
||||||
if (target)
|
if (target)
|
||||||
{
|
{
|
||||||
MXS_INFO("INIT_DB for database '%s' on server '%s'",
|
MXS_INFO("INIT_DB for database '%s' on server '%s'",
|
||||||
this->m_current_db.c_str(), target->unique_name);
|
m_current_db.c_str(), target->unique_name);
|
||||||
route_target = TARGET_NAMED_SERVER;
|
route_target = TARGET_NAMED_SERVER;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -487,7 +461,7 @@ int32_t SchemaRouterSession::routeQuery(GWBUF* pPacket)
|
|||||||
{
|
{
|
||||||
/** Store current statement if execution of the previous
|
/** Store current statement if execution of the previous
|
||||||
* session command hasn't been completed. */
|
* session command hasn't been completed. */
|
||||||
ss_dassert((bref->pending_cmd == NULL || this->m_closed));
|
ss_dassert((bref->pending_cmd == NULL || m_closed));
|
||||||
bref->pending_cmd = pPacket;
|
bref->pending_cmd = pPacket;
|
||||||
ret = 1;
|
ret = 1;
|
||||||
}
|
}
|
||||||
@ -520,13 +494,13 @@ void SchemaRouterSession::handle_mapping_reply(backend_ref_t* bref, GWBUF* pPack
|
|||||||
if (rc == 1)
|
if (rc == 1)
|
||||||
{
|
{
|
||||||
synchronize_shard_map();
|
synchronize_shard_map();
|
||||||
this->m_state &= ~INIT_MAPPING;
|
m_state &= ~INIT_MAPPING;
|
||||||
|
|
||||||
/* Check if the session is reconnecting with a database name
|
/* Check if the session is reconnecting with a database name
|
||||||
* that is not in the hashtable. If the database is not found
|
* that is not in the hashtable. If the database is not found
|
||||||
* then close the session. */
|
* then close the session. */
|
||||||
|
|
||||||
if (this->m_state & INIT_USE_DB)
|
if (m_state & INIT_USE_DB)
|
||||||
{
|
{
|
||||||
if (!handle_default_db())
|
if (!handle_default_db())
|
||||||
{
|
{
|
||||||
@ -534,16 +508,16 @@ void SchemaRouterSession::handle_mapping_reply(backend_ref_t* bref, GWBUF* pPack
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this->m_queue.size() && rc != -1)
|
if (m_queue.size() && rc != -1)
|
||||||
{
|
{
|
||||||
ss_dassert(this->m_state == INIT_READY);
|
ss_dassert(m_state == INIT_READY);
|
||||||
route_queued_query();
|
route_queued_query();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rc == -1)
|
if (rc == -1)
|
||||||
{
|
{
|
||||||
poll_fake_hangup_event(this->m_client);
|
poll_fake_hangup_event(m_client);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -554,11 +528,11 @@ void SchemaRouterSession::process_response(backend_ref_t* bref, GWBUF** ppPacket
|
|||||||
/** We are executing a session command */
|
/** We are executing a session command */
|
||||||
if (GWBUF_IS_TYPE_SESCMD_RESPONSE((*ppPacket)))
|
if (GWBUF_IS_TYPE_SESCMD_RESPONSE((*ppPacket)))
|
||||||
{
|
{
|
||||||
if (this->m_replied_sescmd < this->m_sent_sescmd &&
|
if (m_replied_sescmd < m_sent_sescmd &&
|
||||||
bref->session_commands.front().get_position() == this->m_replied_sescmd + 1)
|
bref->session_commands.front().get_position() == m_replied_sescmd + 1)
|
||||||
{
|
{
|
||||||
/** First reply to this session command, route it to the client */
|
/** First reply to this session command, route it to the client */
|
||||||
++this->m_replied_sescmd;
|
++m_replied_sescmd;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -588,7 +562,7 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
|
|||||||
{
|
{
|
||||||
backend_ref_t* bref = get_bref_from_dcb(pDcb);
|
backend_ref_t* bref = get_bref_from_dcb(pDcb);
|
||||||
|
|
||||||
if (this->m_closed || bref == NULL)
|
if (m_closed || bref == NULL)
|
||||||
{
|
{
|
||||||
gwbuf_free(pPacket);
|
gwbuf_free(pPacket);
|
||||||
return;
|
return;
|
||||||
@ -597,32 +571,32 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
|
|||||||
MXS_DEBUG("Reply from [%s] session [%p]"
|
MXS_DEBUG("Reply from [%s] session [%p]"
|
||||||
" mapping [%s] queries queued [%s]",
|
" mapping [%s] queries queued [%s]",
|
||||||
bref->backend->server->unique_name,
|
bref->backend->server->unique_name,
|
||||||
this->m_client->session,
|
m_client->session,
|
||||||
this->m_state & INIT_MAPPING ? "true" : "false",
|
m_state & INIT_MAPPING ? "true" : "false",
|
||||||
this->m_queue.size() == 0 ? "none" :
|
m_queue.size() == 0 ? "none" :
|
||||||
this->m_queue.size() > 0 ? "multiple" : "one");
|
m_queue.size() > 0 ? "multiple" : "one");
|
||||||
|
|
||||||
if (this->m_state & INIT_MAPPING)
|
if (m_state & INIT_MAPPING)
|
||||||
{
|
{
|
||||||
handle_mapping_reply(bref, pPacket);
|
handle_mapping_reply(bref, pPacket);
|
||||||
}
|
}
|
||||||
else if (this->m_state & INIT_USE_DB)
|
else if (m_state & INIT_USE_DB)
|
||||||
{
|
{
|
||||||
MXS_DEBUG("Reply to USE '%s' received for session %p",
|
MXS_DEBUG("Reply to USE '%s' received for session %p",
|
||||||
this->m_connect_db.c_str(), this->m_client->session);
|
m_connect_db.c_str(), m_client->session);
|
||||||
this->m_state &= ~INIT_USE_DB;
|
m_state &= ~INIT_USE_DB;
|
||||||
this->m_current_db = this->m_connect_db;
|
m_current_db = m_connect_db;
|
||||||
ss_dassert(this->m_state == INIT_READY);
|
ss_dassert(m_state == INIT_READY);
|
||||||
|
|
||||||
if (this->m_queue.size())
|
if (m_queue.size())
|
||||||
{
|
{
|
||||||
route_queued_query();
|
route_queued_query();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
else if (this->m_queue.size())
|
else if (m_queue.size())
|
||||||
{
|
{
|
||||||
ss_dassert(this->m_state == INIT_READY);
|
ss_dassert(m_state == INIT_READY);
|
||||||
route_queued_query();
|
route_queued_query();
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -650,7 +624,7 @@ void SchemaRouterSession::clientReply(GWBUF* pPacket, DCB* pDcb)
|
|||||||
|
|
||||||
if (ret == 1)
|
if (ret == 1)
|
||||||
{
|
{
|
||||||
atomic_add(&this->m_router.m_stats.n_queries, 1);
|
atomic_add(&m_router.m_stats.n_queries, 1);
|
||||||
bref_set_state(bref, BREF_QUERY_ACTIVE);
|
bref_set_state(bref, BREF_QUERY_ACTIVE);
|
||||||
bref_set_state(bref, BREF_WAITING_RESULT);
|
bref_set_state(bref, BREF_WAITING_RESULT);
|
||||||
}
|
}
|
||||||
@ -714,7 +688,7 @@ void SchemaRouterSession::handleError(GWBUF* pMessage,
|
|||||||
void SchemaRouterSession::synchronize_shard_map()
|
void SchemaRouterSession::synchronize_shard_map()
|
||||||
{
|
{
|
||||||
m_router.m_stats.shmap_cache_miss++;
|
m_router.m_stats.shmap_cache_miss++;
|
||||||
m_router.m_shard_manager.update_shard(this->m_shard, this->m_client->user);
|
m_router.m_shard_manager.update_shard(m_shard, m_client->user);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -839,20 +813,20 @@ bool SchemaRouterSession::execute_sescmd_in_backend(backend_ref_t* backend_ref)
|
|||||||
bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
|
bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
|
||||||
{
|
{
|
||||||
bool succp = false;
|
bool succp = false;
|
||||||
backend_ref_t *backend_ref = this->m_backends;
|
backend_ref_t *backend_ref = m_backends;
|
||||||
|
|
||||||
MXS_INFO("Session write, routing to all servers.");
|
MXS_INFO("Session write, routing to all servers.");
|
||||||
atomic_add(&this->m_stats.longest_sescmd, 1);
|
atomic_add(&m_stats.longest_sescmd, 1);
|
||||||
|
|
||||||
/** Increment the session command count */
|
/** Increment the session command count */
|
||||||
++this->m_sent_sescmd;
|
++m_sent_sescmd;
|
||||||
|
|
||||||
for (int i = 0; i < this->m_backend_count; i++)
|
for (int i = 0; i < m_backend_count; i++)
|
||||||
{
|
{
|
||||||
if (BREF_IS_IN_USE((&backend_ref[i])))
|
if (BREF_IS_IN_USE((&backend_ref[i])))
|
||||||
{
|
{
|
||||||
GWBUF *buffer = gwbuf_clone(querybuf);
|
GWBUF *buffer = gwbuf_clone(querybuf);
|
||||||
backend_ref[i].session_commands.push_back(SessionCommand(buffer, this->m_sent_sescmd));
|
backend_ref[i].session_commands.push_back(SessionCommand(buffer, m_sent_sescmd));
|
||||||
|
|
||||||
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
|
if (MXS_LOG_PRIORITY_IS_ENABLED(LOG_INFO))
|
||||||
{
|
{
|
||||||
@ -861,7 +835,7 @@ bool SchemaRouterSession::route_session_write(GWBUF* querybuf, uint8_t command)
|
|||||||
"master" : "slave"),
|
"master" : "slave"),
|
||||||
backend_ref[i].backend->server->name,
|
backend_ref[i].backend->server->name,
|
||||||
backend_ref[i].backend->server->port,
|
backend_ref[i].backend->server->port,
|
||||||
(i + 1 == this->m_backend_count ? " <" : ""));
|
(i + 1 == m_backend_count ? " <" : ""));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (backend_ref[i].session_commands.size() == 1)
|
if (backend_ref[i].session_commands.size() == 1)
|
||||||
@ -931,10 +905,10 @@ void SchemaRouterSession::handle_error_reply_client(DCB* dcb, GWBUF* errmsg)
|
|||||||
*/
|
*/
|
||||||
bool SchemaRouterSession::have_servers()
|
bool SchemaRouterSession::have_servers()
|
||||||
{
|
{
|
||||||
for (int i = 0; i < this->m_backend_count; i++)
|
for (int i = 0; i < m_backend_count; i++)
|
||||||
{
|
{
|
||||||
if (BREF_IS_IN_USE(&this->m_backends[i]) &&
|
if (BREF_IS_IN_USE(&m_backends[i]) &&
|
||||||
!BREF_IS_CLOSED(&this->m_backends[i]))
|
!BREF_IS_CLOSED(&m_backends[i]))
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -1003,11 +977,11 @@ backend_ref_t* SchemaRouterSession::get_bref_from_dcb(DCB* dcb)
|
|||||||
{
|
{
|
||||||
CHK_DCB(dcb);
|
CHK_DCB(dcb);
|
||||||
|
|
||||||
for (int i = 0; i < this->m_backend_count; i++)
|
for (int i = 0; i < m_backend_count; i++)
|
||||||
{
|
{
|
||||||
if (this->m_backends[i].dcb == dcb)
|
if (m_backends[i].dcb == dcb)
|
||||||
{
|
{
|
||||||
return &this->m_backends[i];
|
return &m_backends[i];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1083,14 +1057,14 @@ bool SchemaRouterSession::process_show_shards()
|
|||||||
bool rval = false;
|
bool rval = false;
|
||||||
|
|
||||||
ServerMap pContent;
|
ServerMap pContent;
|
||||||
this->m_shard.get_content(pContent);
|
m_shard.get_content(pContent);
|
||||||
RESULTSET* rset = resultset_create(shard_list_cb, &pContent);
|
RESULTSET* rset = resultset_create(shard_list_cb, &pContent);
|
||||||
|
|
||||||
if (rset)
|
if (rset)
|
||||||
{
|
{
|
||||||
resultset_add_column(rset, "Database", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR);
|
resultset_add_column(rset, "Database", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR);
|
||||||
resultset_add_column(rset, "Server", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR);
|
resultset_add_column(rset, "Server", MYSQL_DATABASE_MAXLEN, COL_TYPE_VARCHAR);
|
||||||
resultset_stream_mysql(rset, this->m_client);
|
resultset_stream_mysql(rset, m_client);
|
||||||
resultset_free(rset);
|
resultset_free(rset);
|
||||||
rval = true;
|
rval = true;
|
||||||
}
|
}
|
||||||
@ -1129,14 +1103,14 @@ void write_error_to_client(DCB* dcb, int errnum, const char* mysqlstate, const c
|
|||||||
bool SchemaRouterSession::handle_default_db()
|
bool SchemaRouterSession::handle_default_db()
|
||||||
{
|
{
|
||||||
bool rval = false;
|
bool rval = false;
|
||||||
SERVER* target = this->m_shard.get_location(this->m_connect_db);
|
SERVER* target = m_shard.get_location(m_connect_db);
|
||||||
|
|
||||||
if (target)
|
if (target)
|
||||||
{
|
{
|
||||||
/* Send a COM_INIT_DB packet to the server with the right database
|
/* Send a COM_INIT_DB packet to the server with the right database
|
||||||
* and set it as the client's active database */
|
* and set it as the client's active database */
|
||||||
|
|
||||||
unsigned int qlen = this->m_connect_db.length();
|
unsigned int qlen = m_connect_db.length();
|
||||||
GWBUF* buffer = gwbuf_alloc(qlen + 5);
|
GWBUF* buffer = gwbuf_alloc(qlen + 5);
|
||||||
|
|
||||||
if (buffer)
|
if (buffer)
|
||||||
@ -1146,16 +1120,16 @@ bool SchemaRouterSession::handle_default_db()
|
|||||||
gwbuf_set_type(buffer, GWBUF_TYPE_MYSQL);
|
gwbuf_set_type(buffer, GWBUF_TYPE_MYSQL);
|
||||||
data[3] = 0x0;
|
data[3] = 0x0;
|
||||||
data[4] = 0x2;
|
data[4] = 0x2;
|
||||||
memcpy(data + 5, this->m_connect_db.c_str(), qlen);
|
memcpy(data + 5, m_connect_db.c_str(), qlen);
|
||||||
DCB* dcb = NULL;
|
DCB* dcb = NULL;
|
||||||
|
|
||||||
if (get_shard_dcb(&dcb, target->unique_name))
|
if (get_shard_dcb(&dcb, target->unique_name))
|
||||||
{
|
{
|
||||||
dcb->func.write(dcb, buffer);
|
dcb->func.write(dcb, buffer);
|
||||||
MXS_DEBUG("USE '%s' sent to %s for session %p",
|
MXS_DEBUG("USE '%s' sent to %s for session %p",
|
||||||
this->m_connect_db.c_str(),
|
m_connect_db.c_str(),
|
||||||
target->unique_name,
|
target->unique_name,
|
||||||
this->m_client->session);
|
m_client->session);
|
||||||
rval = true;
|
rval = true;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -1171,15 +1145,15 @@ bool SchemaRouterSession::handle_default_db()
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
/** Unknown database, hang up on the client*/
|
/** Unknown database, hang up on the client*/
|
||||||
MXS_INFO("Connecting to a non-existent database '%s'", this->m_connect_db.c_str());
|
MXS_INFO("Connecting to a non-existent database '%s'", m_connect_db.c_str());
|
||||||
char errmsg[128 + MYSQL_DATABASE_MAXLEN + 1];
|
char errmsg[128 + MYSQL_DATABASE_MAXLEN + 1];
|
||||||
sprintf(errmsg, "Unknown database '%s'", this->m_connect_db.c_str());
|
sprintf(errmsg, "Unknown database '%s'", m_connect_db.c_str());
|
||||||
if (this->m_config.debug)
|
if (m_config.debug)
|
||||||
{
|
{
|
||||||
sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)",
|
sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)",
|
||||||
this->m_client->session->ses_id);
|
m_client->session->ses_id);
|
||||||
}
|
}
|
||||||
write_error_to_client(this->m_client,
|
write_error_to_client(m_client,
|
||||||
SCHEMA_ERR_DBNOTFOUND,
|
SCHEMA_ERR_DBNOTFOUND,
|
||||||
SCHEMA_ERRSTR_DBNOTFOUND,
|
SCHEMA_ERRSTR_DBNOTFOUND,
|
||||||
errmsg);
|
errmsg);
|
||||||
@ -1190,18 +1164,18 @@ bool SchemaRouterSession::handle_default_db()
|
|||||||
|
|
||||||
void SchemaRouterSession::route_queued_query()
|
void SchemaRouterSession::route_queued_query()
|
||||||
{
|
{
|
||||||
GWBUF* tmp = this->m_queue.front().release();
|
GWBUF* tmp = m_queue.front().release();
|
||||||
this->m_queue.pop_front();
|
m_queue.pop_front();
|
||||||
|
|
||||||
#ifdef SS_DEBUG
|
#ifdef SS_DEBUG
|
||||||
char* querystr = modutil_get_SQL(tmp);
|
char* querystr = modutil_get_SQL(tmp);
|
||||||
MXS_DEBUG("Sending queued buffer for session %p: %s",
|
MXS_DEBUG("Sending queued buffer for session %p: %s",
|
||||||
this->m_client->session,
|
m_client->session,
|
||||||
querystr);
|
querystr);
|
||||||
MXS_FREE(querystr);
|
MXS_FREE(querystr);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
poll_add_epollin_event_to_dcb(this->m_client, tmp);
|
poll_add_epollin_event_to_dcb(m_client, tmp);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1214,9 +1188,9 @@ int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref,
|
|||||||
{
|
{
|
||||||
bool mapped = true;
|
bool mapped = true;
|
||||||
GWBUF* writebuf = *wbuf;
|
GWBUF* writebuf = *wbuf;
|
||||||
backend_ref_t* bkrf = this->m_backends;
|
backend_ref_t* bkrf = m_backends;
|
||||||
|
|
||||||
for (int i = 0; i < this->m_backend_count; i++)
|
for (int i = 0; i < m_backend_count; i++)
|
||||||
{
|
{
|
||||||
if (bref->dcb == bkrf[i].dcb && !BREF_IS_MAPPED(&bkrf[i]))
|
if (bref->dcb == bkrf[i].dcb && !BREF_IS_MAPPED(&bkrf[i]))
|
||||||
{
|
{
|
||||||
@ -1225,28 +1199,28 @@ int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref,
|
|||||||
writebuf = gwbuf_append(bref->map_queue, writebuf);
|
writebuf = gwbuf_append(bref->map_queue, writebuf);
|
||||||
bref->map_queue = NULL;
|
bref->map_queue = NULL;
|
||||||
}
|
}
|
||||||
showdb_response_t rc = parse_showdb_response(&this->m_backends[i],
|
showdb_response_t rc = parse_showdb_response(&m_backends[i],
|
||||||
&writebuf);
|
&writebuf);
|
||||||
if (rc == SHOWDB_FULL_RESPONSE)
|
if (rc == SHOWDB_FULL_RESPONSE)
|
||||||
{
|
{
|
||||||
this->m_backends[i].mapped = true;
|
m_backends[i].mapped = true;
|
||||||
MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p",
|
MXS_DEBUG("Received SHOW DATABASES reply from %s for session %p",
|
||||||
this->m_backends[i].backend->server->unique_name,
|
m_backends[i].backend->server->unique_name,
|
||||||
this->m_client->session);
|
m_client->session);
|
||||||
}
|
}
|
||||||
else if (rc == SHOWDB_PARTIAL_RESPONSE)
|
else if (rc == SHOWDB_PARTIAL_RESPONSE)
|
||||||
{
|
{
|
||||||
bref->map_queue = writebuf;
|
bref->map_queue = writebuf;
|
||||||
writebuf = NULL;
|
writebuf = NULL;
|
||||||
MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p",
|
MXS_DEBUG("Received partial SHOW DATABASES reply from %s for session %p",
|
||||||
this->m_backends[i].backend->server->unique_name,
|
m_backends[i].backend->server->unique_name,
|
||||||
this->m_client->session);
|
m_client->session);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
DCB* client_dcb = NULL;
|
DCB* client_dcb = NULL;
|
||||||
|
|
||||||
if ((this->m_state & INIT_FAILED) == 0)
|
if ((m_state & INIT_FAILED) == 0)
|
||||||
{
|
{
|
||||||
if (rc == SHOWDB_DUPLICATE_DATABASES)
|
if (rc == SHOWDB_DUPLICATE_DATABASES)
|
||||||
{
|
{
|
||||||
@ -1256,16 +1230,16 @@ int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref,
|
|||||||
{
|
{
|
||||||
MXS_ERROR("Fatal error when processing SHOW DATABASES response, closing session.");
|
MXS_ERROR("Fatal error when processing SHOW DATABASES response, closing session.");
|
||||||
}
|
}
|
||||||
client_dcb = this->m_client;
|
client_dcb = m_client;
|
||||||
|
|
||||||
/** This is the first response to the database mapping which
|
/** This is the first response to the database mapping which
|
||||||
* has duplicate database conflict. Set the initialization bitmask
|
* has duplicate database conflict. Set the initialization bitmask
|
||||||
* to INIT_FAILED */
|
* to INIT_FAILED */
|
||||||
this->m_state |= INIT_FAILED;
|
m_state |= INIT_FAILED;
|
||||||
|
|
||||||
/** Send the client an error about duplicate databases
|
/** Send the client an error about duplicate databases
|
||||||
* if there is a queued query from the client. */
|
* if there is a queued query from the client. */
|
||||||
if (this->m_queue.size())
|
if (m_queue.size())
|
||||||
{
|
{
|
||||||
GWBUF* error = modutil_create_mysql_err_msg(1, 0,
|
GWBUF* error = modutil_create_mysql_err_msg(1, 0,
|
||||||
SCHEMA_ERR_DUPLICATEDB,
|
SCHEMA_ERR_DUPLICATEDB,
|
||||||
@ -1293,7 +1267,7 @@ int SchemaRouterSession::inspect_backend_mapping_states(backend_ref_t *bref,
|
|||||||
mapped = false;
|
mapped = false;
|
||||||
MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p",
|
MXS_DEBUG("Still waiting for reply to SHOW DATABASES from %s for session %p",
|
||||||
bkrf[i].backend->server->unique_name,
|
bkrf[i].backend->server->unique_name,
|
||||||
this->m_client->session);
|
m_client->session);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*wbuf = writebuf;
|
*wbuf = writebuf;
|
||||||
@ -1496,25 +1470,25 @@ showdb_response_t SchemaRouterSession::parse_showdb_response(backend_ref_t* bref
|
|||||||
|
|
||||||
if (data)
|
if (data)
|
||||||
{
|
{
|
||||||
if (this->m_shard.add_location(data, target))
|
if (m_shard.add_location(data, target))
|
||||||
{
|
{
|
||||||
MXS_INFO("<%s, %s>", target->unique_name, data);
|
MXS_INFO("<%s, %s>", target->unique_name, data);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (!(this->m_router.m_ignored_dbs.find(data) != this->m_router.m_ignored_dbs.end() ||
|
if (!(m_router.m_ignored_dbs.find(data) != m_router.m_ignored_dbs.end() ||
|
||||||
(this->m_router.m_ignore_regex &&
|
(m_router.m_ignore_regex &&
|
||||||
pcre2_match(this->m_router.m_ignore_regex, (PCRE2_SPTR)data,
|
pcre2_match(m_router.m_ignore_regex, (PCRE2_SPTR)data,
|
||||||
PCRE2_ZERO_TERMINATED, 0, 0,
|
PCRE2_ZERO_TERMINATED, 0, 0,
|
||||||
this->m_router.m_ignore_match_data, NULL) >= 0)))
|
m_router.m_ignore_match_data, NULL) >= 0)))
|
||||||
{
|
{
|
||||||
duplicate_found = true;
|
duplicate_found = true;
|
||||||
SERVER *duplicate = this->m_shard.get_location(data);
|
SERVER *duplicate = m_shard.get_location(data);
|
||||||
|
|
||||||
MXS_ERROR("Database '%s' found on servers '%s' and '%s' for user %s@%s.",
|
MXS_ERROR("Database '%s' found on servers '%s' and '%s' for user %s@%s.",
|
||||||
data, target->unique_name, duplicate->unique_name,
|
data, target->unique_name, duplicate->unique_name,
|
||||||
this->m_client->user,
|
m_client->user,
|
||||||
this->m_client->remote);
|
m_client->remote);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
MXS_FREE(data);
|
MXS_FREE(data);
|
||||||
@ -1565,14 +1539,14 @@ int SchemaRouterSession::gen_databaselist()
|
|||||||
int i, rval = 0;
|
int i, rval = 0;
|
||||||
unsigned int len;
|
unsigned int len;
|
||||||
|
|
||||||
for (i = 0; i < this->m_backend_count; i++)
|
for (i = 0; i < m_backend_count; i++)
|
||||||
{
|
{
|
||||||
this->m_backends[i].mapped = false;
|
m_backends[i].mapped = false;
|
||||||
this->m_backends[i].n_mapping_eof = 0;
|
m_backends[i].n_mapping_eof = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
this->m_state |= INIT_MAPPING;
|
m_state |= INIT_MAPPING;
|
||||||
this->m_state &= ~INIT_UNINT;
|
m_state &= ~INIT_UNINT;
|
||||||
len = strlen(query) + 1;
|
len = strlen(query) + 1;
|
||||||
buffer = gwbuf_alloc(len + 4);
|
buffer = gwbuf_alloc(len + 4);
|
||||||
uint8_t *data = GWBUF_DATA(buffer);
|
uint8_t *data = GWBUF_DATA(buffer);
|
||||||
@ -1583,18 +1557,18 @@ int SchemaRouterSession::gen_databaselist()
|
|||||||
*(data + 4) = 0x03;
|
*(data + 4) = 0x03;
|
||||||
memcpy(data + 5, query, strlen(query));
|
memcpy(data + 5, query, strlen(query));
|
||||||
|
|
||||||
for (i = 0; i < this->m_backend_count; i++)
|
for (i = 0; i < m_backend_count; i++)
|
||||||
{
|
{
|
||||||
if (BREF_IS_IN_USE(&this->m_backends[i]) &&
|
if (BREF_IS_IN_USE(&m_backends[i]) &&
|
||||||
!BREF_IS_CLOSED(&this->m_backends[i]) &
|
!BREF_IS_CLOSED(&m_backends[i]) &
|
||||||
SERVER_IS_RUNNING(this->m_backends[i].backend->server))
|
SERVER_IS_RUNNING(m_backends[i].backend->server))
|
||||||
{
|
{
|
||||||
clone = gwbuf_clone(buffer);
|
clone = gwbuf_clone(buffer);
|
||||||
dcb = this->m_backends[i].dcb;
|
dcb = m_backends[i].dcb;
|
||||||
rval |= !dcb->func.write(dcb, clone);
|
rval |= !dcb->func.write(dcb, clone);
|
||||||
MXS_DEBUG("Wrote SHOW DATABASES to %s for session %p: returned %d",
|
MXS_DEBUG("Wrote SHOW DATABASES to %s for session %p: returned %d",
|
||||||
this->m_backends[i].backend->server->unique_name,
|
m_backends[i].backend->server->unique_name,
|
||||||
this->m_client->session,
|
m_client->session,
|
||||||
rval);
|
rval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1628,7 +1602,7 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
SERVER* target = this->m_shard.get_location(info[i].database);
|
SERVER* target = m_shard.get_location(info[i].database);
|
||||||
|
|
||||||
if (target)
|
if (target)
|
||||||
{
|
{
|
||||||
@ -1665,7 +1639,7 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
|
|||||||
|
|
||||||
if (tok)
|
if (tok)
|
||||||
{
|
{
|
||||||
rval = this->m_shard.get_location(tok);
|
rval = m_shard.get_location(tok);
|
||||||
|
|
||||||
if (rval)
|
if (rval)
|
||||||
{
|
{
|
||||||
@ -1677,12 +1651,12 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
|
|||||||
|
|
||||||
if (rval == NULL)
|
if (rval == NULL)
|
||||||
{
|
{
|
||||||
rval = this->m_shard.get_location(this->m_current_db);
|
rval = m_shard.get_location(m_current_db);
|
||||||
|
|
||||||
if (rval)
|
if (rval)
|
||||||
{
|
{
|
||||||
MXS_INFO("SHOW TABLES query, current database '%s' on server '%s'",
|
MXS_INFO("SHOW TABLES query, current database '%s' on server '%s'",
|
||||||
this->m_current_db.c_str(), rval->unique_name);
|
m_current_db.c_str(), rval->unique_name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -1692,30 +1666,30 @@ SERVER* SchemaRouterSession::get_shard_target(GWBUF* buffer, uint32_t qtype)
|
|||||||
}
|
}
|
||||||
else if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER)
|
else if (buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER)
|
||||||
{
|
{
|
||||||
for (int i = 0; i < this->m_backend_count; i++)
|
for (int i = 0; i < m_backend_count; i++)
|
||||||
{
|
{
|
||||||
char *srvnm = this->m_backends[i].backend->server->unique_name;
|
char *srvnm = m_backends[i].backend->server->unique_name;
|
||||||
|
|
||||||
if (strcmp(srvnm, (char*)buffer->hint->data) == 0)
|
if (strcmp(srvnm, (char*)buffer->hint->data) == 0)
|
||||||
{
|
{
|
||||||
rval = this->m_backends[i].backend->server;
|
rval = m_backends[i].backend->server;
|
||||||
MXS_INFO("Routing hint found (%s)", rval->unique_name);
|
MXS_INFO("Routing hint found (%s)", rval->unique_name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rval == NULL && !has_dbs && this->m_current_db.length())
|
if (rval == NULL && !has_dbs && m_current_db.length())
|
||||||
{
|
{
|
||||||
/**
|
/**
|
||||||
* If the target name has not been found and the session has an
|
* If the target name has not been found and the session has an
|
||||||
* active database, set is as the target
|
* active database, set is as the target
|
||||||
*/
|
*/
|
||||||
|
|
||||||
rval = this->m_shard.get_location(this->m_current_db);
|
rval = m_shard.get_location(m_current_db);
|
||||||
|
|
||||||
if (rval)
|
if (rval)
|
||||||
{
|
{
|
||||||
MXS_INFO("Using active database '%s' on '%s'",
|
MXS_INFO("Using active database '%s' on '%s'",
|
||||||
this->m_current_db.c_str(), rval->unique_name);
|
m_current_db.c_str(), rval->unique_name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1749,9 +1723,9 @@ bool SchemaRouterSession::get_shard_dcb(DCB** p_dcb, char* name)
|
|||||||
{
|
{
|
||||||
goto return_succp;
|
goto return_succp;
|
||||||
}
|
}
|
||||||
backend_ref = this->m_backends;
|
backend_ref = m_backends;
|
||||||
|
|
||||||
for (i = 0; i < this->m_backend_count; i++)
|
for (i = 0; i < m_backend_count; i++)
|
||||||
{
|
{
|
||||||
SERVER_REF* b = backend_ref[i].backend;
|
SERVER_REF* b = backend_ref[i].backend;
|
||||||
/**
|
/**
|
||||||
@ -1854,14 +1828,14 @@ bool SchemaRouterSession::send_database_list()
|
|||||||
bool rval = false;
|
bool rval = false;
|
||||||
|
|
||||||
ServerMap dblist;
|
ServerMap dblist;
|
||||||
this->m_shard.get_content(dblist);
|
m_shard.get_content(dblist);
|
||||||
|
|
||||||
RESULTSET* resultset = resultset_create(result_set_cb, &dblist);
|
RESULTSET* resultset = resultset_create(result_set_cb, &dblist);
|
||||||
|
|
||||||
if (resultset_add_column(resultset, "Database", MYSQL_DATABASE_MAXLEN,
|
if (resultset_add_column(resultset, "Database", MYSQL_DATABASE_MAXLEN,
|
||||||
COL_TYPE_VARCHAR))
|
COL_TYPE_VARCHAR))
|
||||||
{
|
{
|
||||||
resultset_stream_mysql(resultset, this->m_client);
|
resultset_stream_mysql(resultset, m_client);
|
||||||
rval = true;
|
rval = true;
|
||||||
}
|
}
|
||||||
resultset_free(resultset);
|
resultset_free(resultset);
|
||||||
|
@ -17,12 +17,12 @@
|
|||||||
|
|
||||||
void SessionCommand::mark_reply_received()
|
void SessionCommand::mark_reply_received()
|
||||||
{
|
{
|
||||||
m_replySent = true;
|
m_reply_sent = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool SessionCommand::is_reply_received() const
|
bool SessionCommand::is_reply_received() const
|
||||||
{
|
{
|
||||||
return m_replySent;
|
return m_reply_sent;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint8_t SessionCommand::get_command() const
|
uint8_t SessionCommand::get_command() const
|
||||||
@ -44,7 +44,7 @@ SessionCommand::SessionCommand(GWBUF *buffer, uint64_t id):
|
|||||||
m_buffer(buffer),
|
m_buffer(buffer),
|
||||||
m_command(0),
|
m_command(0),
|
||||||
m_pos(id),
|
m_pos(id),
|
||||||
m_replySent(false)
|
m_reply_sent(false)
|
||||||
{
|
{
|
||||||
if (buffer)
|
if (buffer)
|
||||||
{
|
{
|
||||||
|
@ -79,7 +79,7 @@ private:
|
|||||||
Buffer m_buffer; /**< The buffer containing the command */
|
Buffer m_buffer; /**< The buffer containing the command */
|
||||||
uint8_t m_command; /**< The command being executed */
|
uint8_t m_command; /**< The command being executed */
|
||||||
uint64_t m_pos; /**< Unique position identifier */
|
uint64_t m_pos; /**< Unique position identifier */
|
||||||
bool m_replySent; /**< Whether the session command reply has been sent */
|
bool m_reply_sent; /**< Whether the session command reply has been sent */
|
||||||
|
|
||||||
SessionCommand();
|
SessionCommand();
|
||||||
SessionCommand& operator = (const SessionCommand& command);
|
SessionCommand& operator = (const SessionCommand& command);
|
||||||
|
Reference in New Issue
Block a user