From 7629c455a63c56e428b581d2e9cb944f502204eb Mon Sep 17 00:00:00 2001 From: Markus Makela Date: Sat, 30 Aug 2014 08:27:05 +0300 Subject: [PATCH] partial implementation --- query_classifier/query_classifier.cc | 110 +++++++++- query_classifier/query_classifier.h | 2 + server/modules/include/readwritesplit.h | 2 +- .../routing/readwritesplit/readwritesplit.c | 202 ++++++++++++++++-- 4 files changed, 292 insertions(+), 24 deletions(-) diff --git a/query_classifier/query_classifier.cc b/query_classifier/query_classifier.cc index 3f2c18eb4..14fa7ec44 100644 --- a/query_classifier/query_classifier.cc +++ b/query_classifier/query_classifier.cc @@ -736,15 +736,17 @@ static skygw_query_type_t resolve_query_type( } } /**< for */ #if defined(TEMPORARY_TABLES) - if ((skygw_query_type_t)type == QUERY_TYPE_READ) - { - /** - * Find out the database name and all tables the query - * uses. Create a hashvalue from each and if any of the - * values can be found from property's hashtable, set - * query type to QUERY_TYPE_READ_TMP_TABLE. - */ - } + if ((skygw_query_type_t)type == QUERY_TYPE_READ) + { + /** + * Find out the database name and all tables the query + * uses. Create a hashvalue from each and if any of the + * values can be found from property's hashtable, set + * query type to QUERY_TYPE_READ_TMP_TABLE. + */ + + + } #endif } /**< if */ return_qtype: @@ -874,8 +876,8 @@ char* skygw_query_classifier_get_stmtname( /** * Finds the head of the list of tables affected by the current select statement. - * @param thd Pointer to a valid thread descriptor structure - * @return Head of the TABLE_LIST chain or NULL in case of an error + * @param thd Pointer to a valid THD + * @return Pointer to the head of the TABLE_LIST chain or NULL in case of an error */ void* skygw_get_affected_tables(void* thdp) { @@ -893,6 +895,92 @@ void* skygw_get_affected_tables(void* thdp) return (void*)thd->lex->current_select->table_list.first; } + + +/** + * Reads the parsetree and lists all the affected tables and views in the query. + * In the case of an error, the size of the table is set to zero and no memory is allocated. + * The caller must free the allocated memory. + * + * @param querybuf GWBUF where the table names are extracted from + * @param tblsize Pointer where the number of tables is written + * @return Array of null-terminated strings with the table names + */ +char** skygw_get_table_names(GWBUF* querybuf,int* tblsize) +{ + parsing_info_t* pi; + MYSQL* mysql; + THD* thd; + TABLE_LIST* tbl; + SELECT_LEX*slx; + int i = 0, currtblsz = 0; + char**tables,**tmp; + + if (!GWBUF_IS_PARSED(querybuf)) + { + tables = NULL; + goto retblock; + } + pi = (parsing_info_t *)gwbuf_get_buffer_object_data(querybuf, + GWBUF_PARSING_INFO); + + if (pi == NULL) + { + tables = NULL; + goto retblock; + } + + if (pi->pi_query_plain_str == NULL || + (mysql = (MYSQL *)pi->pi_handle) == NULL || + (thd = (THD *)mysql->thd) == NULL) + { + ss_dassert(pi->pi_query_plain_str != NULL && + mysql != NULL && + thd != NULL); + tables = NULL; + goto retblock; + } + + thd->lex->current_select = thd->lex->all_selects_list; + + while(thd->lex->current_select){ + + tbl = (TABLE_LIST*)skygw_get_affected_tables(thd); + + while (tbl) + { + if(i >= currtblsz){ + + tmp = (char**)malloc(sizeof(char*)*(currtblsz*2+1)); + + if(tmp){ + if(currtblsz > 0){ + int x; + for(x = 0;xalias); + tbl=tbl->next_local; + } + thd->lex->current_select = thd->lex->current_select->next_select_in_list(); + } + + retblock: + *tblsize = i; + return tables; +} + + /* * Replace user-provided literals with question marks. Return a copy of the * querystr with replacements. diff --git a/query_classifier/query_classifier.h b/query_classifier/query_classifier.h index 6f5ecd803..595ba502b 100644 --- a/query_classifier/query_classifier.h +++ b/query_classifier/query_classifier.h @@ -73,6 +73,8 @@ skygw_query_type_t query_classifier_get_type(GWBUF* querybuf); /** Free THD context and close MYSQL */ char* skygw_query_classifier_get_stmtname(MYSQL* mysql); +void* skygw_get_affected_tables(void* thdp); +char** skygw_get_table_names(GWBUF* querybuf,int* tblsize); char* skygw_get_canonical(GWBUF* querybuf); bool parse_query (GWBUF* querybuf); parsing_info_t* parsing_info_init(void (*donefun)(void *)); diff --git a/server/modules/include/readwritesplit.h b/server/modules/include/readwritesplit.h index 3530dc086..f8024fde6 100644 --- a/server/modules/include/readwritesplit.h +++ b/server/modules/include/readwritesplit.h @@ -145,7 +145,7 @@ struct rses_property_st { union rses_prop_data { mysql_sescmd_t sescmd; HASHTABLE tmp_table_hash; - void* placeholder; /*< to be removed due new type */ + HASHTABLE* temp_tables; } rses_prop_data; rses_property_t* rses_prop_next; /*< next property of same type */ #if defined(SS_DEBUG) diff --git a/server/modules/routing/readwritesplit/readwritesplit.c b/server/modules/routing/readwritesplit/readwritesplit.c index aa118f005..dbc99d801 100644 --- a/server/modules/routing/readwritesplit/readwritesplit.c +++ b/server/modules/routing/readwritesplit/readwritesplit.c @@ -273,6 +273,33 @@ static bool have_enough_servers( static SPINLOCK instlock; static ROUTER_INSTANCE* instances; +static int hashkeyfun(void* key); +static int hashcmpfun (void *, void *); + +static int hashkeyfun( + void* key) +{ + unsigned int hash = 0,c = 0; + char* ptr = (char*)key; + while((c = *ptr++)){ + hash = c + (hash << 6) + (hash << 16) - hash; + } + return *(int *)key; +} + +static int hashcmpfun( + void* v1, + void* v2) +{ + int i1; + int i2; + + i1 = *(int *)v1; + i2 = *(int *)v2; + + return (i1 < i2 ? -1 : (i1 > i2 ? 1 : 0)); +} + /** * Implementation of the mandatory version entry point * @@ -1034,12 +1061,21 @@ static int routeQuery( skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN; mysql_server_cmd_t packet_type; uint8_t* packet; - int ret = 0; + int ret = 0, + tsize = 0, + klen = 0, + i = 0; DCB* master_dcb = NULL; DCB* slave_dcb = NULL; ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance; ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session; + rses_property_t* rses_prop; bool rses_is_closed = false; + bool target_tmp_table = false; + char *dbname,*hkey; + char** tbl; + HASHTABLE* h; + MYSQL_session* data; CHK_CLIENT_RSES(router_cli_ses); @@ -1053,6 +1089,7 @@ static int routeQuery( packet = GWBUF_DATA(querybuf); packet_type = packet[4]; + rses_prop = *router_cli_ses->rses_properties; if (rses_is_closed) { @@ -1083,6 +1120,9 @@ static int routeQuery( master_dcb = router_cli_ses->rses_master_ref->bref_dcb; CHK_DCB(master_dcb); + + data = (MYSQL_session*)master_dcb->session->data; + dbname = data->db; switch(packet_type) { case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */ @@ -1128,6 +1168,51 @@ static int routeQuery( break; } /**< switch by packet type */ + + /** + *Check if the query targets a temporary table + */ + if(QUERY_IS_TYPE(qtype, QUERY_TYPE_READ)){ + + + tbl = skygw_get_table_names(querybuf,&tsize); + + if(tsize > 0) + { /**Query targets at least one table*/ + for(i = 0;irses_prop_data.temp_tables){ + if((target_tmp_table = hashtable_fetch(rses_prop->rses_prop_data.temp_tables,(void *)hkey))) + { + /**Query target is a temporary table*/ + + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Query targets a temporary table: %s",hkey))); + } + } + + + + } + + for(i = 0;irses_transaction_active) + !router_cli_ses->rses_transaction_active && + !target_tmp_table) { bool succp; @@ -1263,19 +1349,98 @@ static int routeQuery( { goto return_ret; } -#if defined(TEMPORARY_TABLES) + + //#if defined(TEMPORARY_TABLES) + + if(!query_is_parsed(querybuf) && !parse_query(querybuf)){ + LOGIF(LE, (skygw_log_write( + LOGFILE_ERROR, + "Error: Unable to parse query."))); + } + qtype = query_classifier_get_type(querybuf); + + if(QUERY_IS_TYPE(qtype, QUERY_TYPE_CREATE_TMP_TABLE)){ + + + + bool is_temp = true; + + + if(rses_prop == NULL){ + if((rses_prop = + (rses_property_t*)calloc(1,sizeof(rses_property_t)))){ + rses_prop->rses_prop_rsession = router_cli_ses; + rses_prop->rses_prop_refcount = 1; + rses_prop->rses_prop_next = NULL; + rses_prop->rses_prop_type = RSES_PROP_TYPE_SESCMD; + *router_cli_ses->rses_properties = rses_prop; + } + + } + + if( rses_prop->rses_prop_data.temp_tables == NULL){ + + h = hashtable_alloc(10000, hashkeyfun, hashcmpfun); + + if(h){ + rses_prop->rses_prop_data.temp_tables = h; + } + + } + + tbl = skygw_get_table_names(querybuf,&tsize); + + if(tsize == 1 && tbl[0]) + { /**One valid table created*/ + + klen = strlen(dbname) + strlen(tbl[0]) + 2; + hkey = calloc(klen,sizeof(char)); + strcpy(hkey,dbname); + strcat(hkey,"."); + strcat(hkey,tbl[0]); + + if( + hashtable_add( + rses_prop->rses_prop_data.temp_tables, + (void *)hkey, + (void *)&is_temp + ) + == 0) /**Conflict in hash table*/ + { + LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Temporary table conflict in hashtable: %s",hkey))); + } + char* retkey = hashtable_fetch( + rses_prop->rses_prop_data.temp_tables, + hkey); + if(retkey){ +LOGIF(LT, (skygw_log_write( + LOGFILE_TRACE, + "Temporary table added: %s",retkey))); + } + } + + if(tsize > 0){ + for(i = 0;isession->data; + tmpbuf = scur->scmd_cur_cmd->my_sescmd_buf; + qlen = MYSQL_GET_PACKET_LEN((unsigned char*)tmpbuf->start); + strncpy(data->db,tmpbuf->start+5,qlen - 1); + + } + //#endif case MYSQL_COM_QUERY: default: /**