Clean up schemarouter headers
Cleaned up the headers, removed unused structures. Changed some members to strings instead of char arrays. Switch to router templates should now be easier.
This commit is contained in:
@ -11,7 +11,7 @@
|
||||
* Public License.
|
||||
*/
|
||||
|
||||
#include "schemarouter.h"
|
||||
#include "schemarouter.hh"
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdio.h>
|
||||
@ -69,7 +69,7 @@ static void handle_error_reply_client(MXS_SESSION* ses,
|
||||
SCHEMAROUTER_SESSION* rses,
|
||||
DCB* backend_dcb,
|
||||
GWBUF* errmsg);
|
||||
bool change_current_db(char* dest, Shard& shard, GWBUF* buf);
|
||||
bool change_current_db(string& dest, Shard& shard, GWBUF* buf);
|
||||
bool extract_database(GWBUF* buf, char* str);
|
||||
|
||||
bool detect_show_shards(GWBUF* query);
|
||||
@ -286,8 +286,8 @@ int gen_databaselist(SCHEMAROUTER* inst, SCHEMAROUTER_SESSION* session)
|
||||
session->rses_backend_ref[i].n_mapping_eof = 0;
|
||||
}
|
||||
|
||||
session->init |= INIT_MAPPING;
|
||||
session->init &= ~INIT_UNINT;
|
||||
session->state |= INIT_MAPPING;
|
||||
session->state &= ~INIT_UNINT;
|
||||
len = strlen(query) + 1;
|
||||
buffer = gwbuf_alloc(len + 4);
|
||||
uint8_t *data = GWBUF_DATA(buffer);
|
||||
@ -400,7 +400,7 @@ SERVER* get_shard_target(SCHEMAROUTER* router,
|
||||
if (rval)
|
||||
{
|
||||
MXS_INFO("SHOW TABLES query, current database '%s' on server '%s'",
|
||||
client->current_db, rval->unique_name);
|
||||
client->current_db.c_str(), rval->unique_name);
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -421,7 +421,7 @@ SERVER* get_shard_target(SCHEMAROUTER* router,
|
||||
}
|
||||
}
|
||||
|
||||
if (rval == NULL && !has_dbs && client->current_db[0])
|
||||
if (rval == NULL && !has_dbs && client->current_db.length())
|
||||
{
|
||||
/**
|
||||
* If the target name has not been found and the session has an
|
||||
@ -433,7 +433,7 @@ SERVER* get_shard_target(SCHEMAROUTER* router,
|
||||
if (rval)
|
||||
{
|
||||
MXS_INFO("Using active database '%s' on '%s'",
|
||||
client->current_db, rval->unique_name);
|
||||
client->current_db.c_str(), rval->unique_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -464,7 +464,7 @@ static bool get_shard_dcb(DCB** p_dcb,
|
||||
int i;
|
||||
bool succp = false;
|
||||
|
||||
CHK_CLIENT_RSES(rses);
|
||||
|
||||
ss_dassert(p_dcb != NULL && *(p_dcb) == NULL);
|
||||
|
||||
if (p_dcb == NULL || name == NULL)
|
||||
@ -866,7 +866,6 @@ static bool execute_sescmd_in_backend(backend_ref_t* backend_ref)
|
||||
DCB *dcb = backend_ref->bref_dcb;
|
||||
|
||||
CHK_DCB(dcb);
|
||||
CHK_BACKEND_REF(backend_ref);
|
||||
|
||||
int rc = 0;
|
||||
|
||||
@ -925,7 +924,6 @@ static bool route_session_write(SCHEMAROUTER_SESSION* router_cli_ses,
|
||||
|
||||
MXS_INFO("Session write, routing to all servers.");
|
||||
atomic_add(&router_cli_ses->stats.longest_sescmd, 1);
|
||||
atomic_add(&router_cli_ses->n_sescmd, 1);
|
||||
|
||||
/** Increment the session command count */
|
||||
++router_cli_ses->sent_sescmd;
|
||||
@ -998,7 +996,7 @@ static void handle_error_reply_client(MXS_SESSION* ses,
|
||||
|
||||
if (bref)
|
||||
{
|
||||
CHK_BACKEND_REF(bref);
|
||||
|
||||
bref_clear_state(bref, BREF_IN_USE);
|
||||
bref_set_state(bref, BREF_CLOSED);
|
||||
}
|
||||
@ -1063,7 +1061,7 @@ static bool handle_error_new_connection(SCHEMAROUTER* inst,
|
||||
return false;
|
||||
}
|
||||
|
||||
CHK_BACKEND_REF(bref);
|
||||
|
||||
|
||||
/**
|
||||
* If query was sent through the bref and it is waiting for reply from
|
||||
@ -1095,7 +1093,7 @@ static backend_ref_t* get_bref_from_dcb(SCHEMAROUTER_SESSION *rses,
|
||||
DCB *dcb)
|
||||
{
|
||||
CHK_DCB(dcb);
|
||||
CHK_CLIENT_RSES(rses);
|
||||
|
||||
|
||||
for (int i = 0; i < rses->rses_nbackends; i++)
|
||||
{
|
||||
@ -1230,7 +1228,7 @@ bool handle_default_db(SCHEMAROUTER_SESSION *router_cli_ses)
|
||||
/* Send a COM_INIT_DB packet to the server with the right database
|
||||
* and set it as the client's active database */
|
||||
|
||||
unsigned int qlen = strlen(router_cli_ses->connect_db);
|
||||
unsigned int qlen = router_cli_ses->connect_db.length();
|
||||
GWBUF* buffer = gwbuf_alloc(qlen + 5);
|
||||
|
||||
if (buffer)
|
||||
@ -1240,14 +1238,14 @@ bool handle_default_db(SCHEMAROUTER_SESSION *router_cli_ses)
|
||||
gwbuf_set_type(buffer, GWBUF_TYPE_MYSQL);
|
||||
data[3] = 0x0;
|
||||
data[4] = 0x2;
|
||||
memcpy(data + 5, router_cli_ses->connect_db, qlen);
|
||||
memcpy(data + 5, router_cli_ses->connect_db.c_str(), qlen);
|
||||
DCB* dcb = NULL;
|
||||
|
||||
if (get_shard_dcb(&dcb, router_cli_ses, target->unique_name))
|
||||
{
|
||||
dcb->func.write(dcb, buffer);
|
||||
MXS_DEBUG("USE '%s' sent to %s for session %p",
|
||||
router_cli_ses->connect_db,
|
||||
router_cli_ses->connect_db.c_str(),
|
||||
target->unique_name,
|
||||
router_cli_ses->rses_client_dcb->session);
|
||||
rval = true;
|
||||
@ -1265,10 +1263,9 @@ bool handle_default_db(SCHEMAROUTER_SESSION *router_cli_ses)
|
||||
else
|
||||
{
|
||||
/** Unknown database, hang up on the client*/
|
||||
MXS_INFO("Connecting to a non-existent database '%s'",
|
||||
router_cli_ses->connect_db);
|
||||
MXS_INFO("Connecting to a non-existent database '%s'", router_cli_ses->connect_db.c_str());
|
||||
char errmsg[128 + MYSQL_DATABASE_MAXLEN + 1];
|
||||
sprintf(errmsg, "Unknown database '%s'", router_cli_ses->connect_db);
|
||||
sprintf(errmsg, "Unknown database '%s'", router_cli_ses->connect_db.c_str());
|
||||
if (router_cli_ses->rses_config.debug)
|
||||
{
|
||||
sprintf(errmsg + strlen(errmsg), " ([%lu]: DB not found on connect)",
|
||||
@ -1342,7 +1339,7 @@ int inspect_backend_mapping_states(SCHEMAROUTER_SESSION *router_cli_ses,
|
||||
{
|
||||
DCB* client_dcb = NULL;
|
||||
|
||||
if ((router_cli_ses->init & INIT_FAILED) == 0)
|
||||
if ((router_cli_ses->state & INIT_FAILED) == 0)
|
||||
{
|
||||
if (rc == SHOWDB_DUPLICATE_DATABASES)
|
||||
{
|
||||
@ -1357,7 +1354,7 @@ int inspect_backend_mapping_states(SCHEMAROUTER_SESSION *router_cli_ses,
|
||||
/** This is the first response to the database mapping which
|
||||
* has duplicate database conflict. Set the initialization bitmask
|
||||
* to INIT_FAILED */
|
||||
router_cli_ses->init |= INIT_FAILED;
|
||||
router_cli_ses->state |= INIT_FAILED;
|
||||
|
||||
/** Send the client an error about duplicate databases
|
||||
* if there is a queued query from the client. */
|
||||
@ -1500,7 +1497,7 @@ void create_error_reply(char* fail_str, DCB* dcb)
|
||||
* @return true if new database is set, false if non-existent database was tried
|
||||
* to be set
|
||||
*/
|
||||
bool change_current_db(char* dest, Shard& shard, GWBUF* buf)
|
||||
bool change_current_db(string& dest, Shard& shard, GWBUF* buf)
|
||||
{
|
||||
bool succp = false;
|
||||
char db[MYSQL_DATABASE_MAXLEN + 1];
|
||||
@ -1520,7 +1517,7 @@ bool change_current_db(char* dest, Shard& shard, GWBUF* buf)
|
||||
|
||||
if (target)
|
||||
{
|
||||
strcpy(dest, db);
|
||||
dest = db;
|
||||
MXS_INFO("change_current_db: database is on server: '%s'.", target->unique_name);
|
||||
succp = true;
|
||||
}
|
||||
@ -1557,7 +1554,6 @@ static MXS_ROUTER* do_createInstance(SERVICE *service, char **options)
|
||||
router->ignored_dbs.insert("information_schema");
|
||||
router->ignored_dbs.insert("performance_schema");
|
||||
router->service = service;
|
||||
router->schemarouter_config.last_refresh = time(NULL);
|
||||
router->stats.longest_sescmd = 0;
|
||||
router->stats.n_hist_exceeded = 0;
|
||||
router->stats.n_queries = 0;
|
||||
@ -1729,33 +1725,26 @@ static MXS_ROUTER_SESSION* do_newSession(MXS_ROUTER* router_inst, MXS_SESSION* s
|
||||
|
||||
SCHEMAROUTER* router = (SCHEMAROUTER*)router_inst;
|
||||
|
||||
#if defined(SS_DEBUG)
|
||||
client_rses->rses_chk_top = CHK_NUM_ROUTER_SES;
|
||||
client_rses->rses_chk_tail = CHK_NUM_ROUTER_SES;
|
||||
#endif
|
||||
|
||||
client_rses->router = router;
|
||||
client_rses->rses_mysql_session = (MYSQL_session*)session->client_dcb->data;
|
||||
client_rses->rses_client_dcb = (DCB*)session->client_dcb;
|
||||
client_rses->queue = NULL;
|
||||
client_rses->closed = false;
|
||||
client_rses->sent_sescmd = 0;
|
||||
client_rses->replied_sescmd = 0;
|
||||
|
||||
client_rses->shardmap = router->shard_manager.get_shard(session->client_dcb->user,
|
||||
router->schemarouter_config.refresh_min_interval);
|
||||
|
||||
memcpy(&client_rses->rses_config, &router->schemarouter_config, sizeof(schemarouter_config_t));
|
||||
client_rses->n_sescmd = 0;
|
||||
client_rses->rses_config.last_refresh = time(NULL);
|
||||
client_rses->closed = false;
|
||||
|
||||
if (using_db)
|
||||
{
|
||||
client_rses->init |= INIT_USE_DB;
|
||||
client_rses->state |= INIT_USE_DB;
|
||||
}
|
||||
/**
|
||||
* Set defaults to session variables.
|
||||
*/
|
||||
client_rses->rses_autocommit_enabled = true;
|
||||
client_rses->rses_transaction_active = false;
|
||||
client_rses->sent_sescmd = 0;
|
||||
client_rses->replied_sescmd = 0;
|
||||
|
||||
/**
|
||||
* Instead of calling this, ensure that there is at least one
|
||||
@ -1781,10 +1770,6 @@ static MXS_ROUTER_SESSION* do_newSession(MXS_ROUTER* router_inst, MXS_SESSION* s
|
||||
{
|
||||
if (ref->active)
|
||||
{
|
||||
#if defined(SS_DEBUG)
|
||||
backend_ref[i].bref_chk_top = CHK_NUM_BACKEND_REF;
|
||||
backend_ref[i].bref_chk_tail = CHK_NUM_BACKEND_REF;
|
||||
#endif
|
||||
backend_ref[i].bref_state = 0;
|
||||
backend_ref[i].n_mapping_eof = 0;
|
||||
backend_ref[i].map_queue = NULL;
|
||||
@ -1818,7 +1803,7 @@ static MXS_ROUTER_SESSION* do_newSession(MXS_ROUTER* router_inst, MXS_SESSION* s
|
||||
if (db[0])
|
||||
{
|
||||
/* Store the database the client is connecting to */
|
||||
snprintf(client_rses->connect_db, MYSQL_DATABASE_MAXLEN + 1, "%s", db);
|
||||
client_rses->connect_db = db;
|
||||
}
|
||||
|
||||
atomic_add(&router->stats.sessions, 1);
|
||||
@ -1845,7 +1830,7 @@ static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* router_inst, MXS_SESSION* sess
|
||||
static void do_closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session)
|
||||
{
|
||||
SCHEMAROUTER_SESSION *router_cli_ses = (SCHEMAROUTER_SESSION *)router_session;
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
|
||||
ss_dassert(!router_cli_ses->closed);
|
||||
|
||||
/**
|
||||
@ -1971,7 +1956,7 @@ static int do_routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_sessio
|
||||
bool succp = false;
|
||||
char db[MYSQL_DATABASE_MAXLEN + 1];
|
||||
char errbuf[26 + MYSQL_DATABASE_MAXLEN];
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
|
||||
SERVER* target = NULL;
|
||||
|
||||
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
||||
@ -1993,7 +1978,7 @@ static int do_routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_sessio
|
||||
* to store the query. Once the databases have been mapped and/or the
|
||||
* default database is taken into use we can send the query forward.
|
||||
*/
|
||||
if (router_cli_ses->init & (INIT_MAPPING | INIT_USE_DB))
|
||||
if (router_cli_ses->state & (INIT_MAPPING | INIT_USE_DB))
|
||||
{
|
||||
int init_rval = 1;
|
||||
char* querystr = modutil_get_SQL(querybuf);
|
||||
@ -2019,7 +2004,7 @@ static int do_routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_sessio
|
||||
|
||||
}
|
||||
|
||||
if (router_cli_ses->init == (INIT_READY | INIT_USE_DB))
|
||||
if (router_cli_ses->state == (INIT_READY | INIT_USE_DB))
|
||||
{
|
||||
/**
|
||||
* This state is possible if a client connects with a default database
|
||||
@ -2117,23 +2102,9 @@ static int do_routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_sessio
|
||||
querybuf);
|
||||
if (!change_successful)
|
||||
{
|
||||
time_t now = time(NULL);
|
||||
if (router_cli_ses->rses_config.refresh_databases &&
|
||||
difftime(now, router_cli_ses->rses_config.last_refresh) >
|
||||
router_cli_ses->rses_config.refresh_min_interval)
|
||||
{
|
||||
|
||||
router_cli_ses->rses_config.last_refresh = now;
|
||||
router_cli_ses->queue = querybuf;
|
||||
|
||||
// Reset the shard map by constructing a new Shard
|
||||
router_cli_ses->shardmap = Shard();
|
||||
gen_databaselist(inst, router_cli_ses);
|
||||
|
||||
return 1;
|
||||
}
|
||||
extract_database(querybuf, db);
|
||||
snprintf(errbuf, 25 + MYSQL_DATABASE_MAXLEN, "Unknown database: %s", db);
|
||||
|
||||
if (router_cli_ses->rses_config.debug)
|
||||
{
|
||||
sprintf(errbuf + strlen(errbuf),
|
||||
@ -2174,7 +2145,7 @@ static int do_routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_sessio
|
||||
if (target)
|
||||
{
|
||||
MXS_INFO("INIT_DB for database '%s' on server '%s'",
|
||||
router_cli_ses->current_db, target->unique_name);
|
||||
router_cli_ses->current_db.c_str(), target->unique_name);
|
||||
route_target = TARGET_NAMED_SERVER;
|
||||
}
|
||||
else
|
||||
@ -2216,9 +2187,9 @@ static int do_routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_sessio
|
||||
|
||||
if ((target == NULL &&
|
||||
packet_type != MYSQL_COM_INIT_DB &&
|
||||
router_cli_ses->current_db[0] == '\0') ||
|
||||
router_cli_ses->current_db.length() == 0) ||
|
||||
packet_type == MYSQL_COM_FIELD_LIST ||
|
||||
(router_cli_ses->current_db[0] != '\0'))
|
||||
(router_cli_ses->current_db.length() == 0))
|
||||
{
|
||||
/**
|
||||
* No current database and no databases in query or
|
||||
@ -2439,7 +2410,7 @@ static void do_clientReply(MXS_ROUTER* instance,
|
||||
GWBUF* writebuf = buffer;
|
||||
|
||||
SCHEMAROUTER_SESSION *router_cli_ses = (SCHEMAROUTER_SESSION *) router_session;
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
|
||||
|
||||
/**
|
||||
* Lock router client session for secure read of router session members.
|
||||
@ -2467,13 +2438,13 @@ static void do_clientReply(MXS_ROUTER* instance,
|
||||
" mapping [%s] queries queued [%s]",
|
||||
bref->bref_backend->server->unique_name,
|
||||
router_cli_ses->rses_client_dcb->session,
|
||||
router_cli_ses->init & INIT_MAPPING ? "true" : "false",
|
||||
router_cli_ses->state & INIT_MAPPING ? "true" : "false",
|
||||
router_cli_ses->queue == NULL ? "none" :
|
||||
router_cli_ses->queue->next ? "multiple" : "one");
|
||||
|
||||
|
||||
|
||||
if (router_cli_ses->init & INIT_MAPPING)
|
||||
if (router_cli_ses->state & INIT_MAPPING)
|
||||
{
|
||||
int rc = inspect_backend_mapping_states(router_cli_ses, bref, &writebuf);
|
||||
gwbuf_free(writebuf);
|
||||
@ -2488,9 +2459,9 @@ static void do_clientReply(MXS_ROUTER* instance,
|
||||
* that is not in the hashtable. If the database is not found
|
||||
* then close the session.
|
||||
*/
|
||||
router_cli_ses->init &= ~INIT_MAPPING;
|
||||
router_cli_ses->state &= ~INIT_MAPPING;
|
||||
|
||||
if (router_cli_ses->init & INIT_USE_DB)
|
||||
if (router_cli_ses->state & INIT_USE_DB)
|
||||
{
|
||||
bool success = handle_default_db(router_cli_ses);
|
||||
if (!success)
|
||||
@ -2502,7 +2473,7 @@ static void do_clientReply(MXS_ROUTER* instance,
|
||||
|
||||
if (router_cli_ses->queue)
|
||||
{
|
||||
ss_dassert(router_cli_ses->init == INIT_READY);
|
||||
ss_dassert(router_cli_ses->state == INIT_READY);
|
||||
route_queued_query(router_cli_ses);
|
||||
}
|
||||
MXS_DEBUG("session [%p] database map finished.",
|
||||
@ -2516,14 +2487,14 @@ static void do_clientReply(MXS_ROUTER* instance,
|
||||
return;
|
||||
}
|
||||
|
||||
if (router_cli_ses->init & INIT_USE_DB)
|
||||
if (router_cli_ses->state & INIT_USE_DB)
|
||||
{
|
||||
MXS_DEBUG("Reply to USE '%s' received for session %p",
|
||||
router_cli_ses->connect_db,
|
||||
router_cli_ses->connect_db.c_str(),
|
||||
router_cli_ses->rses_client_dcb->session);
|
||||
router_cli_ses->init &= ~INIT_USE_DB;
|
||||
strcpy(router_cli_ses->current_db, router_cli_ses->connect_db);
|
||||
ss_dassert(router_cli_ses->init == INIT_READY);
|
||||
router_cli_ses->state &= ~INIT_USE_DB;
|
||||
router_cli_ses->current_db = router_cli_ses->connect_db;
|
||||
ss_dassert(router_cli_ses->state == INIT_READY);
|
||||
|
||||
if (router_cli_ses->queue)
|
||||
{
|
||||
@ -2536,12 +2507,12 @@ static void do_clientReply(MXS_ROUTER* instance,
|
||||
|
||||
if (router_cli_ses->queue)
|
||||
{
|
||||
ss_dassert(router_cli_ses->init == INIT_READY);
|
||||
ss_dassert(router_cli_ses->state == INIT_READY);
|
||||
route_queued_query(router_cli_ses);
|
||||
return;
|
||||
}
|
||||
|
||||
CHK_BACKEND_REF(bref);
|
||||
|
||||
|
||||
/**
|
||||
* Active cursor means that reply is from session command
|
||||
@ -2595,7 +2566,7 @@ static void do_clientReply(MXS_ROUTER* instance,
|
||||
if (writebuf != NULL && client_dcb != NULL)
|
||||
{
|
||||
unsigned char* cmd = (unsigned char*) writebuf->start;
|
||||
int state = router_cli_ses->init;
|
||||
int state = router_cli_ses->state;
|
||||
/** Write reply to client DCB */
|
||||
MXS_INFO("returning reply [%s] "
|
||||
"state [%s] session [%p]",
|
||||
@ -2690,7 +2661,6 @@ static void do_handleError(MXS_ROUTER* instance,
|
||||
ss_dassert(session && rses);
|
||||
|
||||
CHK_SESSION(session);
|
||||
CHK_CLIENT_RSES(rses);
|
||||
|
||||
switch (action)
|
||||
{
|
||||
|
Reference in New Issue
Block a user