Added error handling and re-mapping of databases to dbshard router in addition to hint detection.

This commit is contained in:
Markus Makela
2015-02-02 21:36:41 +02:00
parent c9c55ecfa3
commit 9681b9cec4
3 changed files with 229 additions and 127 deletions

View File

@ -44,6 +44,7 @@ MODULE_INFO info = {
#if defined(SS_DEBUG)
# include <mysql_client_server_protocol.h>
#endif
/** Defined in log_manager.cc */
@ -278,7 +279,7 @@ static void* hfree(void* fval)
bool parse_showdb_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf)
{
int rval = 0;
int rval = 0,i;
RESULTSET* rset;
RSET_ROW* row;
@ -292,7 +293,29 @@ bool parse_showdb_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf)
while(row)
{
hashtable_add(rses->dbhash,row->data[0],target);
if(hashtable_add(rses->dbhash,row->data[0],target))
{
skygw_log_write(LOGFILE_TRACE,"dbshard: <%s, %s>",target,row->data[0]);
}
else
{
char* oldval = strdup(hashtable_fetch(rses->dbhash,row->data[0]));
for(i=0;i<rses->rses_nbackends;i++)
{
if(strcmp(oldval,rses->rses_backend_ref[i].bref_backend->backend_server->unique_name) == 0 &&
BREF_IS_CLOSED(&rses->rses_backend_ref[i]))
{
hashtable_delete(rses->dbhash,row->data[0]);
hashtable_add(rses->dbhash,row->data[0],target);
skygw_log_write(LOGFILE_TRACE,"dbshard: <%s, %s> (replaced %s)",target,row->data[0],oldval);
free(oldval);
oldval = NULL;
break;
}
}
free(oldval);
}
row = row->next;
}
resultset_free(rset);
@ -303,7 +326,7 @@ bool parse_showdb_response(ROUTER_CLIENT_SES* rses, char* target, GWBUF* buf)
return rval;
}
int gen_tablelist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
int gen_databaselist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
{
DCB* dcb;
const char* query = "SHOW DATABASES;";
@ -324,15 +347,18 @@ int gen_tablelist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
*((unsigned char*)buffer->start + 4) = 0x03;
memcpy(buffer->start + 5,query,strlen(query));
for(i = 0;i<session->rses_nbackends;i++)
{
clone = gwbuf_clone(buffer);
dcb = backends[i].bref_dcb;
if(BREF_IS_IN_USE(&backends[i]))
if(BREF_IS_IN_USE(&backends[i]) && !BREF_IS_CLOSED(&backends[i]))
{
rval = dcb->func.write(dcb,clone);
}
}
else
{
gwbuf_free(clone);
}
}
return !rval;
@ -347,6 +373,7 @@ int gen_tablelist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
* @return True if all database and server names were successfully retrieved
* otherwise false
*/
/*
bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* hashtable)
{
const unsigned int connect_timeout = 1;
@ -408,7 +435,7 @@ bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* ha
rval = false;
goto cleanup;
}
/** Plain-text password used for authentication for now */
// Plain-text password used for authentication for now
user = server->monuser;
pwd = server->monpw;
@ -432,9 +459,9 @@ bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* ha
rval = false;
goto cleanup;
}
/**
* The server was successfully connected to, proceed to query for database names
*/
//The server was successfully connected to, proceed to query for database names
if((result = mysql_list_dbs(handle,NULL)) == NULL)
{
@ -456,11 +483,13 @@ bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* ha
server->name)));
goto cleanup;
}
*/
/**
* Walk through the list of databases in this backend
* and insert them into the hashtable. If the value is already in the hashtable
* but the backend isn't in the list of backends it is replaced with the first found backend.
*/
/*
while((row = mysql_fetch_row(result)))
{
unsigned long *lengths;
@ -471,12 +500,6 @@ bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* ha
dbnm = (char*)calloc(lengths[0] + 1,sizeof(char));
memcpy(dbnm,row[0],lengths[0]);
/*if(is_ignored_database(inst,dbnm))
{
free(dbnm);
continue;
}*/
servnm = strdup(server->unique_name);
if(hashtable_add(hashtable,dbnm,servnm) == 0)
@ -511,11 +534,13 @@ bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* ha
for(j = 0;backends[j];j++)
{
* */
/**
* See if the old backend is still
* alive. If not then update
* the hashtable with the current backend's name.
*/
/*
if(strcmp(backends[j]->backend_server->unique_name,old_backend) == 0 &&
SERVER_IS_RUNNING(backends[j]->backend_server))
{
@ -544,8 +569,8 @@ bool update_dbnames_hash(ROUTER_INSTANCE* inst,BACKEND** backends, HASHTABLE* ha
}
}
}
} /*< hashtable_add failed */
} /*< while */
}
}
cleanup:
if(result)
@ -554,42 +579,11 @@ cleanup:
}
result = NULL;
mysql_close(handle);
} /*< for */
}
return rval;
}
/**
* Allocates a new hashtable and inserts database names and where to find them
* into it.
* @param inst Router instance
* @param backends Backends to query for database names
* @return Pointer to the newly allocated hashtable or NULL if an error occurred
*/
void* dbnames_hash_init(ROUTER_INSTANCE* inst,BACKEND** backends)
{
HASHTABLE* htbl = hashtable_alloc(512,hashkeyfun,hashcmpfun);
if(htbl == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error: hashtable allocation failed.")));
return NULL;
}
/**Update the new hashtable with the key-value pairs*/
if(!update_dbnames_hash(inst,backends,htbl))
{
/**
* Log if there were some errors during the database configuration.
*/
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Warning : Errors occurred while resolving shard locations.")));
}
return htbl;
}
*/
/**
* Check the hashtable for the right backend for this query.
@ -602,7 +596,7 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client,
HASHTABLE* ht = client->dbhash;
int sz = 0,i,j;
char** dbnms = NULL;
char* rval = NULL;
char* rval = NULL,*query, *tmp = NULL;
bool has_dbs = false; /**If the query targets any database other than the current one*/
if(!query_is_parsed(buffer)){
@ -615,7 +609,8 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client,
has_dbs = true;
for(i = 0; i < sz; i++){
if((rval = (char*)hashtable_fetch(ht,dbnms[i]))){
if((rval = (char*)hashtable_fetch(ht,dbnms[i]))){
skygw_log_write(LOGFILE_TRACE,"dbshard: Query targets specific database (%s)",rval);
for(j = i;j < sz;j++) free(dbnms[j]);
break;
}
@ -624,6 +619,59 @@ char* get_shard_target_name(ROUTER_INSTANCE* router, ROUTER_CLIENT_SES* client,
free(dbnms);
}
/* Check if the query is a show tables query with a specific database */
if(QUERY_IS_TYPE(qtype, QUERY_TYPE_SHOW_TABLES))
{
query = modutil_get_SQL(buffer);
if((tmp = strstr(query,"from")))
{
char* tok = strtok(tmp, " ;");
tok = strtok(NULL," ;");
ss_dassert(tok != NULL);
tmp = (char*) hashtable_fetch(ht, tok);
}
free(query);
if(tmp == NULL)
{
rval = (char*) hashtable_fetch(ht, client->rses_mysql_session->db);
}
else
{
rval = tmp;
has_dbs = true;
skygw_log_write(LOGFILE_TRACE,"dbshard: SHOW TABLES with specific database (%s)", tmp);
}
}
if(buffer->hint && buffer->hint->type == HINT_ROUTE_TO_NAMED_SERVER)
{
for(i = 0; i < client->rses_nbackends; i++)
{
char *srvnm = client->rses_backend_ref[i].bref_backend->backend_server->unique_name;
if(strcmp(srvnm,buffer->hint->data) == 0)
{
rval = srvnm;
skygw_log_write(LOGFILE_TRACE,"dbshard: Routing hint found (%s)",srvnm);
}
}
}
if(rval == NULL && !has_dbs && client->rses_mysql_session->db[0] != '\0')
{
/**
* If the query contains no explicitly stated databases proceed to
* check if the session has an active database and if it is sharded.
*/
rval = (char*) hashtable_fetch(ht, client->rses_mysql_session->db);
}
/**
* If the query contains no explicitly stated databases proceed to
* check if the session has an active database and if it is sharded.
@ -782,7 +830,7 @@ static void refreshInstance(
}
/**
* Create an instance of dbshard statement router within the MaxScale.
* Create an instance of dbshard router within the MaxScale.
*
*
* @param service The service this router is being create for
@ -929,13 +977,7 @@ static void* newSession(
bool succp;
int router_nservers = 0; /*< # of servers in total */
int i;
#if 0
/**
* It could be possibe to accept new session if some of the servers are
* not reachable
*/
const int min_nservers = 1; /*< hard-coded for now */
#endif
client_rses = (ROUTER_CLIENT_SES *)calloc(1, sizeof(ROUTER_CLIENT_SES));
if (client_rses == NULL)
@ -960,7 +1002,6 @@ static void* newSession(
spinlock_release(&router->lock);
/**
* Set defaults to session variables.
* ??? tarvitaanko
*/
client_rses->rses_autocommit_enabled = true;
client_rses->rses_transaction_active = false;
@ -1028,6 +1069,12 @@ static void* newSession(
router_nservers,
session,
router);
client_rses->dbhash = hashtable_alloc(100, hashkeyfun, hashcmpfun);
hashtable_memory_fns(client_rses->dbhash,(HASHMEMORYFN)strdup,
(HASHMEMORYFN)strdup,
(HASHMEMORYFN)free,
(HASHMEMORYFN)free);
rses_end_locked_router_action(client_rses);
@ -1046,6 +1093,18 @@ static void* newSession(
client_rses->rses_nbackends = router_nservers; /*< # of backend servers */
router->stats.n_sessions += 1;
if (!(succp = rses_begin_locked_router_action(client_rses)))
{
free(client_rses->rses_backend_ref);
free(client_rses);
client_rses = NULL;
goto return_rses;
}
/* Generate database list */
gen_databaselist(router,client_rses);
rses_end_locked_router_action(client_rses);
/**
* Version is bigger than zero once initialized.
*/
@ -1766,15 +1825,9 @@ static int routeQuery(
}
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
if(router_cli_ses->dbhash == NULL && !router_cli_ses->hash_init)
if(!router_cli_ses->hash_init)
{
router_cli_ses->queue = querybuf;
router_cli_ses->dbhash = hashtable_alloc(7, hashkeyfun, hashcmpfun);
hashtable_memory_fns(router_cli_ses->dbhash,(HASHMEMORYFN)strdup,
(HASHMEMORYFN)strdup,
(HASHMEMORYFN)free,
(HASHMEMORYFN)free);
gen_tablelist(inst,router_cli_ses);
router_cli_ses->queue = querybuf;
return 1;
}
packet = GWBUF_DATA(querybuf);
@ -1941,22 +1994,20 @@ static int routeQuery(
backend_ref_t* backend = NULL;
DCB* backend_dcb = NULL;
//update_dbnames_hash(inst,inst->servers,inst->dbnames_hash);
for(i = 0;i < router_cli_ses->rses_nbackends;i++)
{
if(SERVER_IS_RUNNING(router_cli_ses->rses_backend_ref[i].bref_backend->backend_server))
{
backend = &router_cli_ses->rses_backend_ref[i];
backend_dcb = backend->bref_dcb;
break;
break;
}
}
if(backend)
{
GWBUF* fake = gen_show_dbs_response(inst,router_cli_ses);
poll_add_epollin_event_to_dcb(backend_dcb,fake);
GWBUF* fake = gen_show_dbs_response(inst,router_cli_ses);
poll_add_epollin_event_to_dcb(backend_dcb,fake);
ret = 1;
}
else
@ -2396,6 +2447,51 @@ static void clientReply (
}
#endif
if(!router_cli_ses->hash_init)
{
bool mapped = true;
int i;
backend_ref_t* bkrf = router_cli_ses->rses_backend_ref;
for(i = 0; i < router_cli_ses->rses_nbackends; i++)
{
if(bref->bref_dcb == bkrf[i].bref_dcb)
{
router_cli_ses->rses_backend_ref[i].bref_mapped = true;
parse_showdb_response(router_cli_ses,
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
writebuf);
skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] server '%s' databases mapped.",
router_cli_ses,
bref->bref_backend->backend_server->unique_name);
}
if(BREF_IS_IN_USE(&bkrf[i]) &&
!BREF_IS_MAPPED(&bkrf[i]))
{
mapped = false;
}
}
gwbuf_free(writebuf);
rses_end_locked_router_action(router_cli_ses);
if(mapped)
{
router_cli_ses->hash_init = true;
if(router_cli_ses->queue)
{
routeQuery(instance,router_session,router_cli_ses->queue);
router_cli_ses->queue = NULL;
}
skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] database map finished.",
router_cli_ses);
}
return;
}
CHK_BACKEND_REF(bref);
scur = &bref->bref_sescmd_cur;
/**
@ -2464,52 +2560,7 @@ static void clientReply (
bref_clear_state(bref, BREF_WAITING_RESULT);
}
if(!router_cli_ses->hash_init)
{
bool mapped = true;
int i;
backend_ref_t* bkrf = router_cli_ses->rses_backend_ref;
for(i = 0; i < router_cli_ses->rses_nbackends; i++)
{
if(bref->bref_dcb == bkrf[i].bref_dcb)
{
router_cli_ses->rses_backend_ref[i].bref_mapped = true;
parse_showdb_response(router_cli_ses,
router_cli_ses->rses_backend_ref[i].bref_backend->backend_server->unique_name,
writebuf);
skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] server '%s' databases mapped.",
router_cli_ses,
bref->bref_backend->backend_server->unique_name);
}
if(BREF_IS_IN_USE(&bkrf[i]) &&
!BREF_IS_MAPPED(&bkrf[i]))
{
mapped = false;
skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] server '%s' databases not yet mapped.",
router_cli_ses,
bkrf[i].bref_backend->backend_server->unique_name);
//break;
}
}
gwbuf_free(writebuf);
rses_end_locked_router_action(router_cli_ses);
if(mapped)
{
router_cli_ses->hash_init = true;
routeQuery(instance,router_session,router_cli_ses->queue);
router_cli_ses->queue = NULL;
skygw_log_write_flush(LOGFILE_DEBUG,"session [%p] database map finished.",
router_cli_ses);
}
return;
}
else if (writebuf != NULL && client_dcb != NULL)
if (writebuf != NULL && client_dcb != NULL)
{
/** Write reply to client DCB */
SESSION_ROUTE_REPLY(backend_dcb->session, writebuf);
@ -2770,7 +2821,8 @@ static bool connect_backend_servers(
router->bitvalue))
{
servers_found += 1;
/** Server is already connected */
if (BREF_IS_IN_USE((&backend_ref[i])))
{
@ -2812,6 +2864,11 @@ static bool connect_backend_servers(
* of dcb_close.
*/
atomic_add(&b->backend_conn_count, 1);
dcb_add_callback(backend_ref[i].bref_dcb,
DCB_REASON_NOT_RESPONDING,
&router_handle_state_switch,
(void *)&backend_ref[i]);
}
else
{
@ -3717,8 +3774,12 @@ static void handleError (
SESSION* session;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* rses = (ROUTER_CLIENT_SES *)router_session;
CHK_DCB(backend_dcb);
if(succp == NULL || action == ERRACT_RESET)
{
return;
}
/** Don't handle same error twice on same DCB */
if (backend_dcb->dcb_errhandle_called)
{
@ -3809,6 +3870,22 @@ static void handle_error_reply_client(
}
}
bool have_servers(ROUTER_CLIENT_SES* rses)
{
int i;
for(i=0;i<rses->rses_nbackends;i++)
{
if(BREF_IS_IN_USE(&rses->rses_backend_ref[i]) &&
!BREF_IS_CLOSED(&rses->rses_backend_ref[i]))
{
return true;
}
}
return false;
}
/**
* Check if there is backend reference pointing at failed DCB, and reset its
* flags. Then clear DCB's callback and finally try to reconnect.
@ -3829,7 +3906,7 @@ static bool handle_error_new_connection(
GWBUF* errmsg)
{
SESSION* ses;
int router_nservers;
int router_nservers,i;
backend_ref_t* bref;
@ -3896,6 +3973,22 @@ static bool handle_error_new_connection(
router_nservers,
ses,
inst);
if(!have_servers(rses))
{
skygw_log_write(LOGFILE_ERROR,"Error : No more valid servers, closing session");
succp = false;
goto return_succp;
}
rses->hash_init = false;
for(i = 0;i<rses->rses_nbackends;i++)
{
bref_clear_state(&rses->rses_backend_ref[i],BREF_DB_MAPPED);
}
skygw_log_write(LOGFILE_TRACE,"dbshard: Re-mapping databases");
gen_databaselist(rses->router,rses);
return_succp:
return succp;
@ -4037,7 +4130,7 @@ static int router_handle_state_switch(
switch (reason) {
case DCB_REASON_NOT_RESPONDING:
dcb->func.hangup(dcb);
dcb->func.hangup(dcb);
break;
default:
@ -4080,7 +4173,7 @@ static bool change_current_db(
bool succp;
uint8_t* packet;
unsigned int plen;
int message_len,i;
int message_len;
char* fail_str;
if(GWBUF_LENGTH(buf) <= MYSQL_DATABASE_MAXLEN - 5)