Merge branch 'develop' into 1.2.1-binlog_router_trx

This commit is contained in:
MassimilianoPinto
2015-08-17 12:12:16 +02:00
10 changed files with 270 additions and 60 deletions

View File

@ -1584,9 +1584,24 @@ void check_drop_tmp_table(
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
if(master_dcb == NULL)
{
skygw_log_write(LE,"[%s] Error: Master server DBC is NULL. "
"This means that the connection to the master server is already "
"closed while a query is still being routed.",__FUNCTION__);
return;
}
CHK_DCB(master_dcb);
data = (MYSQL_session*)master_dcb->session->data;
if(data == NULL)
{
skygw_log_write(LE,"[%s] Error: User data in master server DBC is NULL.",__FUNCTION__);
return;
}
dbname = (char*)data->db;
if (is_drop_table_query(querybuf))
@ -1647,9 +1662,23 @@ static skygw_query_type_t is_read_tmp_table(
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
if(master_dcb == NULL)
{
skygw_log_write(LE,"[%s] Error: Master server DBC is NULL. "
"This means that the connection to the master server is already "
"closed while a query is still being routed.",__FUNCTION__);
return qtype;
}
CHK_DCB(master_dcb);
data = (MYSQL_session*)master_dcb->session->data;
if(data == NULL)
{
skygw_log_write(LE,"[%s] Error: User data in master server DBC is NULL.",__FUNCTION__);
return qtype;
}
dbname = (char*)data->db;
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) ||
@ -1724,9 +1753,24 @@ static void check_create_tmp_table(
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
if(master_dcb == NULL)
{
skygw_log_write(LE,"[%s] Error: Master server DBC is NULL. "
"This means that the connection to the master server is already "
"closed while a query is still being routed.",__FUNCTION__);
return;
}
CHK_DCB(master_dcb);
data = (MYSQL_session*)master_dcb->session->data;
if(data == NULL)
{
skygw_log_write(LE,"[%s] Error: User data in master server DBC is NULL.",__FUNCTION__);
return;
}
dbname = (char*)data->db;
@ -2110,10 +2154,15 @@ static bool route_single_stmt(
/**
* Check if the query has anything to do with temporary tables.
*/
if (!rses_begin_locked_router_action(rses))
{
succp = false;
goto retblock;
}
qtype = is_read_tmp_table(rses, querybuf, qtype);
check_create_tmp_table(rses, querybuf, qtype);
check_drop_tmp_table(rses, querybuf,qtype);
rses_end_locked_router_action(rses);
/**
* If autocommit is disabled or transaction is explicitly started
* transaction becomes active and master gets all statements until
@ -2561,7 +2610,10 @@ static bool rses_begin_locked_router_action(
ROUTER_CLIENT_SES* rses)
{
bool succp = false;
if(rses == NULL)
return false;
CHK_CLIENT_RSES(rses);
if (rses->rses_closed) {
@ -2617,6 +2669,7 @@ ROUTER_INSTANCE *router = (ROUTER_INSTANCE *)instance;
int i = 0;
BACKEND *backend;
char *weightby;
double master_pct = 0.0;
spinlock_acquire(&router->lock);
router_cli_ses = router->connections;
@ -2626,7 +2679,12 @@ char *weightby;
router_cli_ses = router_cli_ses->next;
}
spinlock_release(&router->lock);
if(router->stats.n_master + router->stats.n_slave > 0)
{
master_pct = (double)router->stats.n_master/(double)(router->stats.n_master + router->stats.n_slave);
}
dcb_printf(dcb,
"\tNumber of router sessions: %d\n",
router->stats.n_sessions);
@ -2645,6 +2703,10 @@ char *weightby;
dcb_printf(dcb,
"\tNumber of queries forwarded to all: %d\n",
router->stats.n_all);
dcb_printf(dcb,
"\tMaster/Slave percentage: %.2f%%\n",
master_pct * 100.0);
if ((weightby = serviceGetWeightingParameter(router->service)) != NULL)
{
dcb_printf(dcb,

View File

@ -35,6 +35,8 @@
#include <modutil.h>
#include <mysql_client_server_protocol.h>
#define DEFAULT_REFRESH_INTERVAL 30.0
MODULE_INFO info = {
MODULE_API_ROUTER,
MODULE_BETA_RELEASE,
@ -206,6 +208,8 @@ static void handle_error_reply_client(
static SPINLOCK instlock;
static ROUTER_INSTANCE* instances;
bool detect_show_shards(GWBUF* query);
int process_show_shards(ROUTER_CLIENT_SES* rses);
static int hashkeyfun(void* key);
static int hashcmpfun (void *, void *);
@ -399,7 +403,13 @@ int gen_databaselist(ROUTER_INSTANCE* inst, ROUTER_CLIENT_SES* session)
GWBUF *buffer,*clone;
int i,rval = 0;
unsigned int len;
for(i = 0;i<session->rses_nbackends;i++)
{
session->rses_backend_ref[i].bref_mapped = false;
session->rses_backend_ref[i].n_mapping_eof = 0;
}
session->init |= INIT_MAPPING;
session->init &= ~INIT_UNINT;
len = strlen(query) + 1;
@ -623,7 +633,7 @@ char** tokenize_string(char* str)
*/
int internalRoute(DCB* dcb)
{
if(dcb->dcb_readqueue)
if(dcb->dcb_readqueue && dcb->session)
{
GWBUF* tmp = dcb->dcb_readqueue;
void* rinst = dcb->session->service->router_instance;
@ -642,7 +652,7 @@ int internalRoute(DCB* dcb)
*/
int internalReply(DCB* dcb)
{
if(dcb->dcb_readqueue)
if(dcb->dcb_readqueue && dcb->session)
{
GWBUF* tmp = dcb->dcb_readqueue;
dcb->dcb_readqueue = NULL;
@ -712,6 +722,9 @@ createInstance(SERVICE *service, char **options)
}
router->service = service;
router->schemarouter_config.max_sescmd_hist = 0;
router->schemarouter_config.last_refresh = time(NULL);
router->schemarouter_config.refresh_databases = false;
router->schemarouter_config.refresh_min_interval = DEFAULT_REFRESH_INTERVAL;
router->stats.longest_sescmd = 0;
router->stats.n_hist_exceeded = 0;
router->stats.n_queries = 0;
@ -753,6 +766,18 @@ createInstance(SERVICE *service, char **options)
{
router->schemarouter_config.disable_sescmd_hist = config_truth_value(value);
}
else if(strcmp(options[i],"refresh_databases") == 0)
{
router->schemarouter_config.refresh_databases = config_truth_value(value);
}
else if(strcmp(options[i],"refresh_interval") == 0)
{
router->schemarouter_config.refresh_min_interval = atof(value);
}
else if(strcmp(options[i],"debug") == 0)
{
router->schemarouter_config.debug = config_truth_value(value);
}
else
{
skygw_log_write(LOGFILE_ERROR,"Error: Unknown router options for Schemarouter: %s",options[i]);
@ -935,6 +960,7 @@ static void* newSession(
client_rses->dcb_route->func.read = internalRoute;
client_rses->dcb_route->state = DCB_STATE_POLLING;
client_rses->dcb_route->session = session;
client_rses->rses_config.last_refresh = time(NULL);
client_rses->init = INIT_UNINT;
if(using_db)
client_rses->init |= INIT_USE_DB;
@ -1931,6 +1957,13 @@ static int routeQuery(
querybuf = gwbuf_make_contiguous(querybuf);
}
if(detect_show_shards(querybuf))
{
process_show_shards(router_cli_ses);
ret = 1;
goto retblock;
}
switch(packet_type) {
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
@ -2014,20 +2047,52 @@ static int routeQuery(
router_cli_ses->dbhash,
querybuf)))
{
time_t now = time(NULL);
if(router_cli_ses->rses_config.refresh_databases &&
difftime(now,router_cli_ses->rses_config.last_refresh) >
router_cli_ses->rses_config.refresh_min_interval)
{
rses_begin_locked_router_action(router_cli_ses);
router_cli_ses->rses_config.last_refresh = now;
router_cli_ses->queue = querybuf;
hashtable_free(router_cli_ses->dbhash);
if((router_cli_ses->dbhash = hashtable_alloc(100, hashkeyfun, hashcmpfun)) == NULL)
{
skygw_log_write(LE,"Error: Hashtable allocation failed.");
rses_end_locked_router_action(router_cli_ses);
return 1;
}
hashtable_memory_fns(router_cli_ses->dbhash,(HASHMEMORYFN)strdup,
(HASHMEMORYFN)strdup,
(HASHMEMORYFN)free,
(HASHMEMORYFN)free);
gen_databaselist(inst,router_cli_ses);
rses_end_locked_router_action(router_cli_ses);
return 1;
}
extract_database(querybuf,db);
snprintf(errbuf,25+MYSQL_DATABASE_MAXLEN,"Unknown database: %s",db);
for(i = 0;i<router_cli_ses->rses_nbackends;i++)
if(router_cli_ses->rses_config.debug)
{
if(SERVER_IS_RUNNING(router_cli_ses->rses_backend_ref[i].bref_backend->backend_server))
{
create_error_reply(errbuf,router_cli_ses->rses_backend_ref[i].bref_dcb);
break;
}
sprintf(errbuf + strlen(errbuf)," ([%lu]: DB change failed)",router_cli_ses->rses_client_dcb->session->ses_id);
}
GWBUF* error = modutil_create_mysql_err_msg(1, 0, 1049, "42000", errbuf);
if (error == NULL)
{
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Creating buffer for error message failed.")));
return 0;
}
/** Set flags that help router to identify session commands reply */
router_cli_ses->rses_client_dcb->func.write(router_cli_ses->rses_client_dcb,error);
LOGIF(LE, (skygw_log_write_flush(
LOGFILE_ERROR,
"Error : Changing database failed.")));
ret = 1;
goto retblock;
}
}
@ -2561,7 +2626,12 @@ static void clientReply (
router_cli_ses->connect_db);
char errmsg[128 + MYSQL_DATABASE_MAXLEN+1];
sprintf(errmsg,"Unknown database '%s'",router_cli_ses->connect_db);
if(router_cli_ses->rses_config.debug)
{
sprintf(errmsg + strlen(errmsg)," ([%lu]: DB not found on connect)",router_cli_ses->rses_client_dcb->session->ses_id);
}
GWBUF* errbuff = modutil_create_mysql_err_msg(1,0,1049,"42000",errmsg);
router_cli_ses->rses_client_dcb->func.write(router_cli_ses->rses_client_dcb,errbuff);
if(router_cli_ses->queue)
{
@ -4255,30 +4325,7 @@ static bool handle_error_new_connection(
succp = false;
goto return_succp;
}
rses->init |= INIT_MAPPING;
for(i = 0;i<rses->rses_nbackends;i++)
{
bref_clear_state(&rses->rses_backend_ref[i],BREF_DB_MAPPED);
rses->rses_backend_ref[i].n_mapping_eof = 0;
}
HASHITERATOR* iter = hashtable_iterator(rses->dbhash);
char* srvnm = bref->bref_backend->backend_server->unique_name;
char *key, *value;
while((key = (char*)hashtable_next(iter)))
{
value = hashtable_fetch(rses->dbhash,key);
if(strcmp(value,srvnm) == 0)
{
hashtable_delete(rses->dbhash,key);
}
}
skygw_log_write(LOGFILE_TRACE,"schemarouter: Re-mapping databases");
gen_databaselist(rses->router,rses);
hashtable_iterator_free(iter);
return_succp:
return succp;
}
@ -4393,4 +4440,89 @@ static sescmd_cursor_t* backend_ref_get_sescmd_cursor (
return scur;
}
/**
* Detect if a query contains a SHOW SHARDS query.
* @param query Query to inspect
* @return true if the query is a SHOW SHARDS query otherwise false
*/
bool detect_show_shards(GWBUF* query)
{
bool rval = false;
char *querystr,*tok,*sptr;
if(query == NULL)
{
skygw_log_write(LE,"Fatal Error: NULL value passed at %s:%d",__FILE__,__LINE__);
return false;
}
if (!modutil_is_SQL(query) && !modutil_is_SQL_prepare(query))
{
return false;
}
if((querystr = modutil_get_SQL(query)) == NULL)
{
skygw_log_write(LE,"Fatal Error: failure to parse SQL at %s:%d",__FILE__,__LINE__);
return false;
}
tok = strtok_r(querystr," ",&sptr);
if(tok && strcasecmp(tok,"show") == 0)
{
tok = strtok_r(NULL," ",&sptr);
if(tok && strcasecmp(tok,"shards") == 0)
rval = true;
}
free(querystr);
return rval;
}
struct shard_list{
HASHITERATOR* iter;
ROUTER_CLIENT_SES* rses;
RESULTSET* rset;
};
/**
* Callback for the shard list result set creation
*/
RESULT_ROW* shard_list_cb(struct resultset* rset, void* data)
{
char *key,*value;
struct shard_list *sl = (struct shard_list*)data;
RESULT_ROW* rval = NULL;
if((key = hashtable_next(sl->iter)) &&
(value = hashtable_fetch(sl->rses->dbhash,key)))
{
if((rval = resultset_make_row(sl->rset)))
{
resultset_row_set(rval,0,key);
resultset_row_set(rval,1,value);
}
}
return rval;
}
/**
* Send a result set of all shards and their locations to the client.
* @param rses Router client session
* @return 0 on success, -1 on error
*/
int process_show_shards(ROUTER_CLIENT_SES* rses)
{
HASHITERATOR* iter = hashtable_iterator(rses->dbhash);
struct shard_list sl;
sl.iter = iter;
sl.rses = rses;
sl.rset = resultset_create(shard_list_cb,&sl);
resultset_add_column(sl.rset,"Database",MYSQL_DATABASE_MAXLEN,COL_TYPE_VARCHAR);
resultset_add_column(sl.rset,"Server",MYSQL_DATABASE_MAXLEN,COL_TYPE_VARCHAR);
resultset_stream_mysql(sl.rset,rses->rses_client_dcb);
resultset_free(sl.rset);
hashtable_iterator_free(iter);
return 0;
}