Added support for default databases.

This commit is contained in:
Markus Makela 2015-02-09 16:28:48 +02:00
parent 46ec9abe29
commit d455ec4c53
2 changed files with 199 additions and 54 deletions

View File

@ -51,6 +51,14 @@ typedef enum prep_stmt_state {
#endif /*< PREP_STMT_CACHING */
typedef enum init_state
{
INIT_READY = 0x0,
INIT_MAPPING = 0x1,
INIT_USE_DB = 0x02
} init_state_t;
typedef enum bref_state {
BREF_IN_USE = 0x01,
BREF_WAITING_RESULT = 0x02, /*< for session commands only */
@ -251,8 +259,11 @@ struct router_client_session {
struct router_instance *router; /*< The router instance */
struct router_client_session* next;
HASHTABLE* dbhash;
bool hash_init;
char connect_db[MYSQL_DATABASE_MAXLEN+1];
init_state_t init;
GWBUF* queue;
DCB* dcb_route;
DCB* dcb_reply;
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail;
#endif

View File

@ -300,7 +300,7 @@ int gen_databaselist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
int i,rval;
unsigned int len;
session->hash_init = false;
session->init |= INIT_MAPPING;
len = strlen(query);
buffer = gwbuf_alloc(len + 4);
@ -350,7 +350,16 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client,
for(i = 0; i < sz; i++){
if((rval = (char*)hashtable_fetch(ht,dbnms[i]))){
if(strcmp(dbnms[i],"information_schema") == 0)
{
has_dbs = false;
rval = NULL;
}
else
{
skygw_log_write(LOGFILE_TRACE,"dbshard: Query targets database '%s' on server '%s",dbnms[i],rval);
}
for(j = i;j < sz;j++) free(dbnms[j]);
break;
}
@ -484,6 +493,31 @@ char** tokenize_string(char* str)
return list;
}
int internalRoute(DCB* dcb)
{
if(dcb->dcb_readqueue)
{
GWBUF* tmp = dcb->dcb_readqueue;
void* rinst = dcb->session->service->router_instance;
void *rses = dcb->session->router_session;
dcb->dcb_readqueue = NULL;
return dcb->session->service->router->routeQuery(rinst,rses,tmp);
}
return 1;
}
int internalReply(DCB* dcb)
{
if(dcb->dcb_readqueue)
{
GWBUF* tmp = dcb->dcb_readqueue;
dcb->dcb_readqueue = NULL;
return SESSION_ROUTE_REPLY(dcb->session, tmp);
}
return 1;
}
/**
* Implementation of the mandatory version entry point
*
@ -713,7 +747,27 @@ static void* newSession(
bool succp;
int router_nservers = 0; /*< # of servers in total */
int i;
char db[MYSQL_DATABASE_MAXLEN+1];
MySQLProtocol* protocol = session->client->protocol;
MYSQL_session* data = session->data;
bool using_db = false;
memset(db,0,MYSQL_DATABASE_MAXLEN+1);
spinlock_acquire(&protocol->protocol_lock);
/* To enable connecting directly to a sharded database we first need
* to disable it for the client DCB's protocol so that we can connect to them*/
if(protocol->client_capabilities & GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB)
{
protocol->client_capabilities &= ~GW_MYSQL_CAPABILITIES_CONNECT_WITH_DB;
strncpy(db,data->db,MYSQL_DATABASE_MAXLEN+1);
memset(data->db,0,MYSQL_DATABASE_MAXLEN+1);
using_db = true;
}
spinlock_release(&protocol->protocol_lock);
client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
if (client_rses == NULL)
@ -729,13 +783,18 @@ static void* newSession(
client_rses->router = router;
client_rses->rses_mysql_session = (MYSQL_session*)session->data;
client_rses->rses_client_dcb = (DCB*)session->client;
/**
* If service config has been changed, reload config from service to
* router instance first.
*/
spinlock_acquire(&router->lock);
spinlock_release(&router->lock);
client_rses->dcb_reply = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
client_rses->dcb_reply->func.read = internalReply;
client_rses->dcb_reply->state = DCB_STATE_POLLING;
client_rses->dcb_reply->session = session;
client_rses->dcb_route = dcb_alloc(DCB_ROLE_REQUEST_HANDLER);
client_rses->dcb_route->func.read = internalRoute;
client_rses->dcb_route->state = DCB_STATE_POLLING;
client_rses->dcb_route->session = session;
if(using_db)
client_rses->init |= INIT_USE_DB;
/**
* Set defaults to session variables.
*/
@ -838,9 +897,18 @@ static void* newSession(
goto return_rses;
}
if(db[0] != 0x0)
{
/* Store the database the client is connecting to */
strncpy(client_rses->connect_db,db,MYSQL_DATABASE_MAXLEN+1);
}
/* Generate database list */
gen_databaselist(router,client_rses);
rses_end_locked_router_action(client_rses);
/**
* Version is bigger than zero once initialized.
*/
@ -943,6 +1011,13 @@ static void closeSession(
atomic_add(&bref->bref_backend->backend_conn_count, -1);
}
}
/* Close internal DCBs */
router_cli_ses->dcb_reply->session = NULL;
router_cli_ses->dcb_route->session = NULL;
dcb_close(router_cli_ses->dcb_reply);
dcb_close(router_cli_ses->dcb_route);
/** Unlock */
rses_end_locked_router_action(router_cli_ses);
}
@ -1556,7 +1631,7 @@ static int routeQuery(
route_target_t route_target = TARGET_UNDEFINED;
bool succp = false;
char* tname = NULL;
int i;
CHK_CLIENT_RSES(router_cli_ses);
@ -1567,9 +1642,19 @@ static int routeQuery(
}
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
if(!rses_is_closed && !router_cli_ses->hash_init)
if(!rses_is_closed && router_cli_ses->init != INIT_READY)
{
router_cli_ses->queue = querybuf;
gwbuf_make_contiguous(querybuf);
GWBUF* ptr = router_cli_ses->queue;
while(ptr && ptr->next)
ptr = ptr->next;
if(ptr == NULL)
router_cli_ses->queue = querybuf;
else
ptr->next = querybuf;
return 1;
}
packet = GWBUF_DATA(querybuf);
@ -1694,31 +1779,11 @@ static int routeQuery(
/**
* Generate custom response that contains all the databases
*/
backend_ref_t* backend = NULL;
DCB* backend_dcb = NULL;
for(i = 0;i < router_cli_ses->rses_nbackends;i++)
{
if(SERVER_IS_RUNNING(router_cli_ses->rses_backend_ref[i].bref_backend->backend_server))
{
backend = &router_cli_ses->rses_backend_ref[i];
backend_dcb = backend->bref_dcb;
break;
}
}
if(backend)
{
GWBUF* fake = gen_show_dbs_response(inst,router_cli_ses);
poll_add_epollin_event_to_dcb(backend_dcb,fake);
ret = 1;
}
else
{
ret = 0;
}
GWBUF* fake = gen_show_dbs_response(inst,router_cli_ses);
poll_add_epollin_event_to_dcb(router_cli_ses->dcb_reply,fake);
ret = 1;
goto retblock;
}
@ -1862,7 +1927,6 @@ static int routeQuery(
if(TARGET_IS_ANY(route_target))
{
/**No valid backends alive*/
skygw_log_write(LOGFILE_TRACE,"dbshard: No backends are running");
rses_end_locked_router_action(router_cli_ses);
@ -2156,7 +2220,7 @@ static void clientReply (
}
#endif
if(!router_cli_ses->hash_init)
if(router_cli_ses->init & INIT_MAPPING)
{
bool mapped = true;
int i;
@ -2165,7 +2229,7 @@ static void clientReply (
for(i = 0; i < router_cli_ses->rses_nbackends; i++)
{
if(bref->bref_dcb == bkrf[i].bref_dcb)
if(bref->bref_dcb == bkrf[i].bref_dcb && !BREF_IS_MAPPED(&bkrf[i]))
{
router_cli_ses->rses_backend_ref[i].bref_mapped = true;
parse_showdb_response(router_cli_ses,
@ -2183,8 +2247,7 @@ static void clientReply (
}
}
gwbuf_free(writebuf);
rses_end_locked_router_action(router_cli_ses);
gwbuf_free(writebuf);
if(mapped)
{
@ -2192,28 +2255,99 @@ static void clientReply (
* Check if the session is reconnecting with a database name
* that is not in the hashtable. If the database is not found
* then close the session.
*/
if(router_cli_ses->rses_mysql_session->db[0] != '\0' &&
hashtable_fetch(router_cli_ses->dbhash,
router_cli_ses->rses_mysql_session->db) == NULL)
* */
router_cli_ses->init &= ~INIT_MAPPING;
if(router_cli_ses->init & INIT_USE_DB)
{
router_cli_ses->rses_closed = true;
char* target;
if((target = hashtable_fetch(router_cli_ses->dbhash,
router_cli_ses->connect_db)) == NULL)
{
skygw_log_write_flush(LOGFILE_TRACE,"dbshard: Connecting to a non-existent database '%s'",
router_cli_ses->connect_db);
router_cli_ses->rses_closed = true;
if(router_cli_ses->queue)
gwbuf_free(router_cli_ses->queue);
rses_end_locked_router_action(router_cli_ses);
return;
}
/* Send a COM_INIT_DB packet to the server with the right database
* and set it as the client's active database */
unsigned int qlen;
GWBUF* buffer;
qlen = strlen(router_cli_ses->connect_db);
buffer = gwbuf_alloc(qlen + 5);
if(buffer == NULL)
{
skygw_log_write_flush(LOGFILE_ERROR,"Error : Buffer allocation failed.");
router_cli_ses->rses_closed = true;
if(router_cli_ses->queue)
gwbuf_free(router_cli_ses->queue);
rses_end_locked_router_action(router_cli_ses);
return;
}
gw_mysql_set_byte3((unsigned char*)buffer->start,qlen+1);
gwbuf_set_type(buffer,GWBUF_TYPE_MYSQL);
*((unsigned char*)buffer->start + 3) = 0x0;
*((unsigned char*)buffer->start + 4) = 0x2;
memcpy(buffer->start+5,router_cli_ses->connect_db,qlen);
DCB* dcb = NULL;
if(get_shard_dcb(&dcb,router_cli_ses,target))
{
dcb->func.write(dcb,buffer);
}
else
{
skygw_log_write_flush(LOGFILE_TRACE,"dbshard: Couldn't find target DCB for '%s'.",target);
router_cli_ses->rses_closed = true;
if(router_cli_ses->queue)
gwbuf_free(router_cli_ses->queue);
}
rses_end_locked_router_action(router_cli_ses);
return;
}
router_cli_ses->hash_init = true;
if(router_cli_ses->queue)
{
routeQuery(instance,router_session,router_cli_ses->queue);
router_cli_ses->queue = NULL;
GWBUF* tmp = router_cli_ses->queue;
router_cli_ses->queue = router_cli_ses->queue->next;
tmp->next = NULL;
poll_add_epollin_event_to_dcb(router_cli_ses->dcb_route,tmp);
}
skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] database map finished.",
router_cli_ses);
}
}
rses_end_locked_router_action(router_cli_ses);
return;
}
if(router_cli_ses->queue)
{
GWBUF* tmp = router_cli_ses->queue;
router_cli_ses->queue = router_cli_ses->queue->next;
tmp->next = NULL;
poll_add_epollin_event_to_dcb(router_cli_ses->dcb_route,tmp);
}
if(router_cli_ses->init & INIT_USE_DB)
{
router_cli_ses->init &= ~INIT_USE_DB;
strcpy(router_cli_ses->rses_mysql_session->db,router_cli_ses->connect_db);
rses_end_locked_router_action(router_cli_ses);
return;
}
CHK_BACKEND_REF(bref);
scur = &bref->bref_sescmd_cur;
/**
@ -3701,7 +3835,7 @@ static bool handle_error_new_connection(
goto return_succp;
}
rses->hash_init = false;
rses->init |= INIT_MAPPING;
for(i = 0;i<rses->rses_nbackends;i++)
{
@ -4009,7 +4143,7 @@ reply_error:
skygw_log_write_flush(LOGFILE_ERROR,"Error : All backend connections are down.");
return false;
}
poll_add_epollin_event_to_dcb(rses->rses_backend_ref->bref_dcb,
poll_add_epollin_event_to_dcb(rses->dcb_reply,
gwbuf_clone(errbuf));
gwbuf_free(errbuf);
}