partial implementation

This commit is contained in:
Markus Makela
2014-08-30 08:27:05 +03:00
parent 7ea53f0141
commit 7629c455a6
4 changed files with 292 additions and 24 deletions

View File

@ -273,6 +273,33 @@ static bool have_enough_servers(
static SPINLOCK instlock;
static ROUTER_INSTANCE* instances;
static int hashkeyfun(void* key);
static int hashcmpfun (void *, void *);
static int hashkeyfun(
void* key)
{
unsigned int hash = 0,c = 0;
char* ptr = (char*)key;
while((c = *ptr++)){
hash = c + (hash << 6) + (hash << 16) - hash;
}
return *(int *)key;
}
static int hashcmpfun(
void* v1,
void* v2)
{
int i1;
int i2;
i1 = *(int *)v1;
i2 = *(int *)v2;
return (i1 < i2 ? -1 : (i1 > i2 ? 1 : 0));
}
/**
* Implementation of the mandatory version entry point
*
@ -1034,12 +1061,21 @@ static int routeQuery(
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
mysql_server_cmd_t packet_type;
uint8_t* packet;
int ret = 0;
int ret = 0,
tsize = 0,
klen = 0,
i = 0;
DCB* master_dcb = NULL;
DCB* slave_dcb = NULL;
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
rses_property_t* rses_prop;
bool rses_is_closed = false;
bool target_tmp_table = false;
char *dbname,*hkey;
char** tbl;
HASHTABLE* h;
MYSQL_session* data;
CHK_CLIENT_RSES(router_cli_ses);
@ -1053,6 +1089,7 @@ static int routeQuery(
packet = GWBUF_DATA(querybuf);
packet_type = packet[4];
rses_prop = *router_cli_ses->rses_properties;
if (rses_is_closed)
{
@ -1083,6 +1120,9 @@ static int routeQuery(
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
CHK_DCB(master_dcb);
data = (MYSQL_session*)master_dcb->session->data;
dbname = data->db;
switch(packet_type) {
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
@ -1128,6 +1168,51 @@ static int routeQuery(
break;
} /**< switch by packet type */
/**
*Check if the query targets a temporary table
*/
if(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ)){
tbl = skygw_get_table_names(querybuf,&tsize);
if(tsize > 0)
{ /**Query targets at least one table*/
for(i = 0;i<tsize && !target_tmp_table && tbl[i];i++){
klen = strlen(dbname) + strlen(tbl[i]) + 2;
hkey = calloc(klen,sizeof(char));
strcpy(hkey,dbname);
strcat(hkey,".");
strcat(hkey,tbl[0]);
if(rses_prop && rses_prop->rses_prop_data.temp_tables){
if((target_tmp_table = hashtable_fetch(rses_prop->rses_prop_data.temp_tables,(void *)hkey)))
{
/**Query target is a temporary table*/
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Query targets a temporary table: %s",hkey)));
}
}
}
for(i = 0;i<tsize;i++){
free(tbl[i]);
}
free(tbl);
}
}
/**
* If autocommit is disabled or transaction is explicitly started
* transaction becomes active and master gets all statements until
@ -1189,7 +1274,8 @@ static int routeQuery(
goto return_ret;
}
else if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ) &&
!router_cli_ses->rses_transaction_active)
!router_cli_ses->rses_transaction_active &&
!target_tmp_table)
{
bool succp;
@ -1263,19 +1349,98 @@ static int routeQuery(
{
goto return_ret;
}
#if defined(TEMPORARY_TABLES)
//#if defined(TEMPORARY_TABLES)
if(!query_is_parsed(querybuf) && !parse_query(querybuf)){
LOGIF(LE, (skygw_log_write(
LOGFILE_ERROR,
"Error: Unable to parse query.")));
}
qtype = query_classifier_get_type(querybuf);
if(QUERY_IS_TYPE(qtype, QUERY_TYPE_CREATE_TMP_TABLE)){
bool is_temp = true;
if(rses_prop == NULL){
if((rses_prop =
(rses_property_t*)calloc(1,sizeof(rses_property_t)))){
rses_prop->rses_prop_rsession = router_cli_ses;
rses_prop->rses_prop_refcount = 1;
rses_prop->rses_prop_next = NULL;
rses_prop->rses_prop_type = RSES_PROP_TYPE_SESCMD;
*router_cli_ses->rses_properties = rses_prop;
}
}
if( rses_prop->rses_prop_data.temp_tables == NULL){
h = hashtable_alloc(10000, hashkeyfun, hashcmpfun);
if(h){
rses_prop->rses_prop_data.temp_tables = h;
}
}
tbl = skygw_get_table_names(querybuf,&tsize);
if(tsize == 1 && tbl[0])
{ /**One valid table created*/
klen = strlen(dbname) + strlen(tbl[0]) + 2;
hkey = calloc(klen,sizeof(char));
strcpy(hkey,dbname);
strcat(hkey,".");
strcat(hkey,tbl[0]);
if(
hashtable_add(
rses_prop->rses_prop_data.temp_tables,
(void *)hkey,
(void *)&is_temp
)
== 0) /**Conflict in hash table*/
{
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Temporary table conflict in hashtable: %s",hkey)));
}
char* retkey = hashtable_fetch(
rses_prop->rses_prop_data.temp_tables,
hkey);
if(retkey){
LOGIF(LT, (skygw_log_write(
LOGFILE_TRACE,
"Temporary table added: %s",retkey)));
}
}
if(tsize > 0){
for(i = 0;i<tsize;i++){
free(tbl[i]);
}
free(tbl);
}
}
/**
* If query is of type QUERY_TYPE_CREATE_TMP_TABLE then find out
* the database and table name, create a hashvalue and
* add it to the router client session's property. If property
* doesn't exist then create it first.
*/
#endif
//#endif
if (master_dcb == NULL)
{
succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER);
}
{
succp = get_dcb(&master_dcb, router_cli_ses, BE_MASTER);
}
if (succp)
{
@ -2671,11 +2836,24 @@ static bool execute_sescmd_in_backend(
break;
case MYSQL_COM_INIT_DB:
#if defined(TEMPORARY_TABLES)
/**
* Record database name and store to session.
*/
#endif
//#if defined(TEMPORARY_TABLES)
/**
* Record database name and store to session.
*/
{
GWBUF* tmpbuf;
MYSQL_session* data;
unsigned int qlen;
data = dcb->session->data;
tmpbuf = scur->scmd_cur_cmd->my_sescmd_buf;
qlen = MYSQL_GET_PACKET_LEN((unsigned char*)tmpbuf->start);
strncpy(data->db,tmpbuf->start+5,qlen - 1);
}
//#endif
case MYSQL_COM_QUERY:
default:
/**