Conflicts:
	server/modules/routing/readwritesplit/readwritesplit.c
This commit is contained in:
VilhoRaatikka
2014-09-03 22:54:32 +03:00
20 changed files with 847 additions and 31 deletions

View File

@ -281,6 +281,47 @@ static bool have_enough_servers(
static SPINLOCK instlock;
static ROUTER_INSTANCE* instances;
static int hashkeyfun(void* key);
static int hashcmpfun (void *, void *);
static int hashkeyfun(
void* key)
{
if(key == NULL){
return 0;
}
unsigned int hash = 0,c = 0;
char* ptr = (char*)key;
while((c = *ptr++)){
hash = c + (hash << 6) + (hash << 16) - hash;
}
return *(int *)key;
}
static int hashcmpfun(
void* v1,
void* v2)
{
char* i1 = (char*) v1;
char* i2 = (char*) v2;
return strcmp(i1,i2);
}
static void* hstrdup(void* fval)
{
char* str = (char*)fval;
return strdup(str);
}
static void* hfree(void* fval)
{
free (fval);
return NULL;
}
/**
* Implementation of the mandatory version entry point
*
@ -1210,12 +1251,22 @@ static int routeQuery(
char* startpos;
mysql_server_cmd_t packet_type;
uint8_t* packet;
int ret = 0;
int ret = 0;
int tsize = 0;
int klen = 0;
int i = 0;
DCB* master_dcb = NULL;
DCB* target_dcb = NULL;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
rses_property_t* rses_prop_tmp;
bool rses_is_closed = false;
bool target_tmp_table = false;
char* dbname;
char* hkey;
char** tbl;
HASHTABLE* h;
MYSQL_session* data;
size_t len;
route_target_t route_target;
bool succp = false;
@ -1234,6 +1285,7 @@ static int routeQuery(
packet = GWBUF_DATA(querybuf);
packet_type = packet[4];
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
if (rses_is_closed)
{
@ -1264,6 +1316,9 @@ static int routeQuery(
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
CHK_DCB(master_dcb);
data = (MYSQL_session*)master_dcb->session->data;
dbname = data->db;
switch(packet_type) {
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
@ -1309,6 +1364,48 @@ static int routeQuery(
break;
} /**< switch by packet type */
/**
* Check if the query targets a temporary table
*/
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ))
{
tbl = skygw_get_table_names(querybuf,&tsize);
if (tsize > 0)
{
/** Query targets at least one table */
for(i = 0; i<tsize && !target_tmp_table && tbl[i]; i++)
{
klen = strlen(dbname) + strlen(tbl[i]) + 2;
hkey = calloc(klen,sizeof(char));
strcpy(hkey,dbname);
strcat(hkey,".");
strcat(hkey,tbl[0]);
if (rses_prop_tmp &&
rses_prop_tmp->rses_prop_data.temp_tables)
{
if( (target_tmp_table =
(bool)hashtable_fetch(rses_prop_tmp->rses_prop_data.temp_tables,(void *)hkey)))
{
/**Query target is a temporary table*/
qtype = QUERY_TYPE_READ_TMP_TABLE;
LOGIF(LT,
(skygw_log_write(LOGFILE_TRACE,
"Query targets a temporary table: %s",hkey)));
}
}
free(hkey);
}
for (i = 0; i<tsize; i++)
{
free(tbl[i]);
}
free(tbl);
}
}
/**
* If autocommit is disabled or transaction is explicitly started
* transaction becomes active and master gets all statements until
@ -1345,6 +1442,118 @@ static int routeQuery(
router_cli_ses->rses_autocommit_enabled = true;
router_cli_ses->rses_transaction_active = false;
}
/**
* If query is of type QUERY_TYPE_CREATE_TMP_TABLE then find out
* the database and table name, create a hashvalue and
* add it to the router client session's property. If property
* doesn't exist then create it first. If the query is DROP TABLE...
* then see if it targets a temporary table and remove it from the hashtable
* if it does.
*/
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_CREATE_TMP_TABLE))
{
bool is_temp = true;
char* tblname = NULL;
tblname = skygw_get_created_table_name(querybuf);
if (tblname && strlen(tblname) > 0)
{
klen = strlen(dbname) + strlen(tblname) + 2;
hkey = calloc(klen,sizeof(char));
strcpy(hkey,dbname);
strcat(hkey,".");
strcat(hkey,tblname);
}
else
{
hkey = NULL;
}
if(rses_prop_tmp == NULL)
{
if((rses_prop_tmp =
(rses_property_t*)calloc(1,sizeof(rses_property_t))))
{
#if defined(SS_DEBUG)
rses_prop_tmp->rses_prop_chk_top = CHK_NUM_ROUTER_PROPERTY;
rses_prop_tmp->rses_prop_chk_tail = CHK_NUM_ROUTER_PROPERTY;
#endif
rses_prop_tmp->rses_prop_rsession = router_cli_ses;
rses_prop_tmp->rses_prop_refcount = 1;
rses_prop_tmp->rses_prop_next = NULL;
rses_prop_tmp->rses_prop_type = RSES_PROP_TYPE_TMPTABLES;
router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES] = rses_prop_tmp;
}
}
if (rses_prop_tmp->rses_prop_data.temp_tables == NULL)
{
h = hashtable_alloc(7, hashkeyfun, hashcmpfun);
hashtable_memory_fns(h,hstrdup,NULL,hfree,NULL);
if (h != NULL)
{
rses_prop_tmp->rses_prop_data.temp_tables = h;
}
}
if (hkey &&
hashtable_add(rses_prop_tmp->rses_prop_data.temp_tables,
(void *)hkey,
(void *)is_temp) == 0) /*< Conflict in hash table */
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Temporary table conflict in hashtable: %s",
hkey)));
}
#if defined(SS_DEBUG)
{
bool retkey =
hashtable_fetch(
rses_prop_tmp->rses_prop_data.temp_tables,
hkey);
if (retkey)
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Temporary table added: %s",
hkey)));
}
}
#endif
free(hkey);
}
/** Check if DROP TABLE... targets a temporary table */
if (is_drop_table_query(querybuf))
{
tbl = skygw_get_table_names(querybuf,&tsize);
for(i = 0; i<tsize; i++)
{
klen = strlen(dbname) + strlen(tbl[i]) + 2;
hkey = calloc(klen,sizeof(char));
strcpy(hkey,dbname);
strcat(hkey,".");
strcat(hkey,tbl[i]);
if (rses_prop_tmp &&
rses_prop_tmp->rses_prop_data.temp_tables)
{
if (hashtable_delete(rses_prop_tmp->rses_prop_data.temp_tables,
(void *)hkey))
{
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
"Temporary table dropped: %s",hkey)));
}
}
free(tbl[i]);
free(hkey);
}
free(tbl);
}
/**
* Find out where to route the query. Result may not be clear; it is
* possible to have a hint for routing to a named server which can
@ -2486,6 +2695,11 @@ static void rses_property_done(
case RSES_PROP_TYPE_SESCMD:
mysql_sescmd_done(&prop->rses_prop_data.sescmd);
break;
case RSES_PROP_TYPE_TMPTABLES:
hashtable_free(prop->rses_prop_data.temp_tables);
break;
default:
LOGIF(LD, (skygw_log_write(
LOGFILE_DEBUG,
@ -2885,8 +3099,23 @@ static bool execute_sescmd_in_backend(
sescmd_cursor_clone_querybuf(scur));
break;
case MYSQL_COM_QUERY:
case MYSQL_COM_INIT_DB:
case MYSQL_COM_INIT_DB:
{
/**
* Record database name and store to session.
*/
GWBUF* tmpbuf;
MYSQL_session* data;
unsigned int qlen;
data = dcb->session->data;
tmpbuf = scur->scmd_cur_cmd->my_sescmd_buf;
qlen = MYSQL_GET_PACKET_LEN((unsigned char*)tmpbuf->start);
memset(data->db,0,MYSQL_DATABASE_MAXLEN+1);
strncpy(data->db,tmpbuf->start+5,qlen - 1);
}
/** Fallthrough */
case MYSQL_COM_QUERY:
default:
/**
* Mark session command buffer, it triggers writing
@ -3171,7 +3400,7 @@ static bool route_session_write(
{
succp = false;
goto return_succp;
}
}
/**
* Additional reference is created to querybuf to
* prevent it from being released before properties