Merged some of the rabbitmq branch changes
query_classifier.cc: updated skygw_get_table_names to allow for partial or full table names readwritesplit.c: transferred temporary table detection to separate functions
This commit is contained in:
@ -890,26 +890,61 @@ char* skygw_query_classifier_get_stmtname(
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*Returns the LEX struct of the parsed GWBUF
|
||||||
|
*@param The parsed GWBUF
|
||||||
|
*@return Pointer to the LEX struct or NULL if an error occurred or the query was not parsed
|
||||||
|
*/
|
||||||
|
LEX* get_lex(GWBUF* querybuf)
|
||||||
|
{
|
||||||
|
|
||||||
|
parsing_info_t* pi;
|
||||||
|
MYSQL* mysql;
|
||||||
|
THD* thd;
|
||||||
|
|
||||||
|
if (!GWBUF_IS_PARSED(querybuf))
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pi = (parsing_info_t *)gwbuf_get_buffer_object_data(querybuf,
|
||||||
|
GWBUF_PARSING_INFO);
|
||||||
|
|
||||||
|
if (pi == NULL)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((mysql = (MYSQL *)pi->pi_handle) == NULL ||
|
||||||
|
(thd = (THD *)mysql->thd) == NULL)
|
||||||
|
{
|
||||||
|
ss_dassert(mysql != NULL &&
|
||||||
|
thd != NULL);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return thd->lex;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Finds the head of the list of tables affected by the current select statement.
|
* Finds the head of the list of tables affected by the current select statement.
|
||||||
* @param thd Pointer to a valid THD
|
* @param thd Pointer to a valid THD
|
||||||
* @return Pointer to the head of the TABLE_LIST chain or NULL in case of an error
|
* @return Pointer to the head of the TABLE_LIST chain or NULL in case of an error
|
||||||
*/
|
*/
|
||||||
void* skygw_get_affected_tables(void* thdp)
|
void* skygw_get_affected_tables(void* lexptr)
|
||||||
{
|
{
|
||||||
THD* thd = (THD*)thdp;
|
LEX* lex = (LEX*)lexptr;
|
||||||
|
|
||||||
if(thd == NULL ||
|
if(lex == NULL ||
|
||||||
thd->lex == NULL ||
|
lex->current_select == NULL)
|
||||||
thd->lex->current_select == NULL)
|
|
||||||
{
|
{
|
||||||
ss_dassert(thd != NULL &&
|
ss_dassert(lex != NULL &&
|
||||||
thd->lex != NULL &&
|
lex->current_select != NULL);
|
||||||
thd->lex->current_select != NULL);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (void*)thd->lex->current_select->table_list.first;
|
return (void*)lex->current_select->table_list.first;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -922,45 +957,25 @@ void* skygw_get_affected_tables(void* thdp)
|
|||||||
* @param tblsize Pointer where the number of tables is written
|
* @param tblsize Pointer where the number of tables is written
|
||||||
* @return Array of null-terminated strings with the table names
|
* @return Array of null-terminated strings with the table names
|
||||||
*/
|
*/
|
||||||
char** skygw_get_table_names(GWBUF* querybuf,int* tblsize)
|
char** skygw_get_table_names(GWBUF* querybuf,int* tblsize, bool fullnames)
|
||||||
{
|
{
|
||||||
parsing_info_t* pi;
|
LEX* lex;
|
||||||
MYSQL* mysql;
|
|
||||||
THD* thd;
|
|
||||||
TABLE_LIST* tbl;
|
TABLE_LIST* tbl;
|
||||||
int i = 0, currtblsz = 0;
|
int i = 0,
|
||||||
char**tables,**tmp;
|
currtblsz = 0;
|
||||||
|
char **tables,
|
||||||
|
**tmp;
|
||||||
|
|
||||||
if (!GWBUF_IS_PARSED(querybuf))
|
if((lex = get_lex(querybuf)) == NULL)
|
||||||
{
|
{
|
||||||
tables = NULL;
|
|
||||||
goto retblock;
|
|
||||||
}
|
|
||||||
pi = (parsing_info_t *)gwbuf_get_buffer_object_data(querybuf,
|
|
||||||
GWBUF_PARSING_INFO);
|
|
||||||
|
|
||||||
if (pi == NULL)
|
|
||||||
{
|
|
||||||
tables = NULL;
|
|
||||||
goto retblock;
|
goto retblock;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pi->pi_query_plain_str == NULL ||
|
lex->current_select = lex->all_selects_list;
|
||||||
(mysql = (MYSQL *)pi->pi_handle) == NULL ||
|
|
||||||
(thd = (THD *)mysql->thd) == NULL)
|
|
||||||
{
|
|
||||||
ss_dassert(pi->pi_query_plain_str != NULL &&
|
|
||||||
mysql != NULL &&
|
|
||||||
thd != NULL);
|
|
||||||
tables = NULL;
|
|
||||||
goto retblock;
|
|
||||||
}
|
|
||||||
|
|
||||||
thd->lex->current_select = thd->lex->all_selects_list;
|
while(lex->current_select){
|
||||||
|
|
||||||
while(thd->lex->current_select){
|
tbl = (TABLE_LIST*)skygw_get_affected_tables(lex);
|
||||||
|
|
||||||
tbl = (TABLE_LIST*)skygw_get_affected_tables(thd);
|
|
||||||
|
|
||||||
while (tbl)
|
while (tbl)
|
||||||
{
|
{
|
||||||
@ -982,51 +997,57 @@ char** skygw_get_table_names(GWBUF* querybuf,int* tblsize)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
tables[i++] = strdup(tbl->alias);
|
|
||||||
|
char *catnm = NULL;
|
||||||
|
|
||||||
|
if(fullnames)
|
||||||
|
{
|
||||||
|
if(tbl->db && strcmp(tbl->db,"skygw_virtual") != 0)
|
||||||
|
{
|
||||||
|
catnm = (char*)calloc(strlen(tbl->db) + strlen(tbl->table_name) + 2,sizeof(char));
|
||||||
|
strcpy(catnm,tbl->db);
|
||||||
|
strcat(catnm,".");
|
||||||
|
strcat(catnm,tbl->table_name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(catnm)
|
||||||
|
{
|
||||||
|
tables[i++] = catnm;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
tables[i++] = strdup(tbl->table_name);
|
||||||
|
}
|
||||||
|
|
||||||
tbl=tbl->next_local;
|
tbl=tbl->next_local;
|
||||||
}
|
}
|
||||||
thd->lex->current_select = thd->lex->current_select->next_select_in_list();
|
lex->current_select = lex->current_select->next_select_in_list();
|
||||||
}
|
}
|
||||||
|
|
||||||
retblock:
|
retblock:
|
||||||
*tblsize = i;
|
*tblsize = i;
|
||||||
return tables;
|
return tables;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extract the name of the created table.
|
* Extract, allocate memory and copy the name of the created table.
|
||||||
* @param querybuf Buffer to use.
|
* @param querybuf Buffer to use.
|
||||||
* @return A pointer to the name if a table was created, otherwise NULL
|
* @return A pointer to the name if a table was created, otherwise NULL
|
||||||
*/
|
*/
|
||||||
char* skygw_get_created_table_name(GWBUF* querybuf)
|
char* skygw_get_created_table_name(GWBUF* querybuf)
|
||||||
{
|
{
|
||||||
parsing_info_t* pi;
|
LEX* lex;
|
||||||
MYSQL* mysql;
|
|
||||||
THD* thd;
|
|
||||||
if (!GWBUF_IS_PARSED(querybuf))
|
|
||||||
{
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
pi = (parsing_info_t *)gwbuf_get_buffer_object_data(querybuf,
|
|
||||||
GWBUF_PARSING_INFO);
|
|
||||||
|
|
||||||
if (pi == NULL)
|
if((lex = get_lex(querybuf)) == NULL)
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((mysql = (MYSQL *)pi->pi_handle) == NULL ||
|
if(lex->create_last_non_select_table &&
|
||||||
(thd = (THD *)mysql->thd) == NULL)
|
lex->create_last_non_select_table->table_name){
|
||||||
{
|
char* name = strdup(lex->create_last_non_select_table->table_name);
|
||||||
ss_dassert(mysql != NULL &&
|
|
||||||
thd != NULL);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(thd->lex->create_last_non_select_table &&
|
|
||||||
thd->lex->create_last_non_select_table->table_name){
|
|
||||||
char* name = strdup(thd->lex->create_last_non_select_table->table_name);
|
|
||||||
return name;
|
return name;
|
||||||
}else{
|
}else{
|
||||||
return NULL;
|
return NULL;
|
||||||
@ -1040,31 +1061,10 @@ char* skygw_get_created_table_name(GWBUF* querybuf)
|
|||||||
*/
|
*/
|
||||||
bool is_drop_table_query(GWBUF* querybuf)
|
bool is_drop_table_query(GWBUF* querybuf)
|
||||||
{
|
{
|
||||||
parsing_info_t* pi;
|
LEX* lex;
|
||||||
MYSQL* mysql;
|
|
||||||
THD* thd;
|
|
||||||
|
|
||||||
if (!GWBUF_IS_PARSED(querybuf))
|
return (lex = get_lex(querybuf)) != NULL &&
|
||||||
{
|
lex->sql_command == SQLCOM_DROP_TABLE;
|
||||||
return false;
|
|
||||||
}
|
|
||||||
pi = (parsing_info_t *)gwbuf_get_buffer_object_data(querybuf,
|
|
||||||
GWBUF_PARSING_INFO);
|
|
||||||
|
|
||||||
if (pi == NULL)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((mysql = (MYSQL *)pi->pi_handle) == NULL ||
|
|
||||||
(thd = (THD *)mysql->thd) == NULL)
|
|
||||||
{
|
|
||||||
ss_dassert(mysql != NULL &&
|
|
||||||
thd != NULL);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return thd->lex->sql_command == SQLCOM_DROP_TABLE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|||||||
@ -76,8 +76,9 @@ skygw_query_type_t query_classifier_get_type(GWBUF* querybuf);
|
|||||||
char* skygw_query_classifier_get_stmtname(MYSQL* mysql);
|
char* skygw_query_classifier_get_stmtname(MYSQL* mysql);
|
||||||
char* skygw_get_created_table_name(GWBUF* querybuf);
|
char* skygw_get_created_table_name(GWBUF* querybuf);
|
||||||
bool is_drop_table_query(GWBUF* querybuf);
|
bool is_drop_table_query(GWBUF* querybuf);
|
||||||
void* skygw_get_affected_tables(void* thdp);
|
bool skygw_is_real_query(GWBUF* querybuf);
|
||||||
char** skygw_get_table_names(GWBUF* querybuf,int* tblsize);
|
void* skygw_get_affected_tables(void* lexptr);
|
||||||
|
char** skygw_get_table_names(GWBUF* querybuf,int* tblsize,bool fullnames);
|
||||||
char* skygw_get_canonical(GWBUF* querybuf);
|
char* skygw_get_canonical(GWBUF* querybuf);
|
||||||
bool parse_query (GWBUF* querybuf);
|
bool parse_query (GWBUF* querybuf);
|
||||||
parsing_info_t* parsing_info_init(void (*donefun)(void *));
|
parsing_info_t* parsing_info_init(void (*donefun)(void *));
|
||||||
|
|||||||
@ -1215,161 +1215,104 @@ static route_target_t get_route_target (
|
|||||||
|
|
||||||
return target;
|
return target;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The main routing entry, this is called with every packet that is
|
* Check if the query is a DROP TABLE... query and
|
||||||
* received and has to be forwarded to the backend database.
|
* if it targets a temporary table, remove it from the hashtable.
|
||||||
*
|
* @param instance Router instance
|
||||||
* The routeQuery will make the routing decision based on the contents
|
* @param router_session Router client session
|
||||||
* of the instance, session and the query itself in the queue. The
|
* @param querybuf GWBUF containing the query
|
||||||
* data in the queue may not represent a complete query, it represents
|
* @param type The type of the query resolved so far
|
||||||
* the data that has been received. The query router itself is responsible
|
|
||||||
* for buffering the partial query, a later call to the query router will
|
|
||||||
* contain the remainder, or part thereof of the query.
|
|
||||||
*
|
|
||||||
* @param instance The query router instance
|
|
||||||
* @param session The session associated with the client
|
|
||||||
* @param queue Gateway buffer queue with the packets received
|
|
||||||
*
|
|
||||||
* @return if succeed 1, otherwise 0
|
|
||||||
* If routeQuery fails, it means that router session has failed.
|
|
||||||
* In any tolerated failure, handleError is called and if necessary,
|
|
||||||
* an error message is sent to the client.
|
|
||||||
*
|
|
||||||
* For now, routeQuery don't tolerate errors, so any error will close
|
|
||||||
* the session. vraa 14.6.14
|
|
||||||
*/
|
*/
|
||||||
static int routeQuery(
|
void check_drop_tmp_table(
|
||||||
ROUTER* instance,
|
ROUTER* instance,
|
||||||
void* router_session,
|
void* router_session,
|
||||||
GWBUF* querybuf)
|
GWBUF* querybuf,
|
||||||
|
skygw_query_type_t type)
|
||||||
{
|
{
|
||||||
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
|
|
||||||
GWBUF* plainsqlbuf = NULL;
|
int tsize = 0, klen = 0,i;
|
||||||
char* querystr = NULL;
|
|
||||||
char* startpos;
|
|
||||||
mysql_server_cmd_t packet_type;
|
|
||||||
uint8_t* packet;
|
|
||||||
int ret = 0;
|
|
||||||
int tsize = 0;
|
|
||||||
int klen = 0;
|
|
||||||
int i = 0;
|
|
||||||
DCB* master_dcb = NULL;
|
|
||||||
DCB* target_dcb = NULL;
|
|
||||||
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
|
||||||
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
|
||||||
rses_property_t* rses_prop_tmp;
|
|
||||||
bool rses_is_closed = false;
|
|
||||||
bool target_tmp_table = false;
|
|
||||||
char* dbname;
|
|
||||||
char* hkey;
|
|
||||||
char** tbl;
|
char** tbl;
|
||||||
HASHTABLE* h;
|
char *hkey,*dbname;
|
||||||
MYSQL_session* data;
|
MYSQL_session* data;
|
||||||
size_t len;
|
|
||||||
route_target_t route_target;
|
|
||||||
bool succp = false;
|
|
||||||
int rlag_max = MAX_RLAG_UNDEFINED;
|
|
||||||
backend_type_t btype; /*< target backend type */
|
|
||||||
|
|
||||||
CHK_CLIENT_RSES(router_cli_ses);
|
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||||
|
DCB* master_dcb = NULL;
|
||||||
|
rses_property_t* rses_prop_tmp;
|
||||||
|
|
||||||
/** Dirty read for quick check if router is closed. */
|
|
||||||
if (router_cli_ses->rses_closed)
|
|
||||||
{
|
|
||||||
rses_is_closed = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
|
||||||
|
|
||||||
packet = GWBUF_DATA(querybuf);
|
|
||||||
packet_type = packet[4];
|
|
||||||
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
|
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
|
||||||
|
|
||||||
if (rses_is_closed)
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* MYSQL_COM_QUIT may have sent by client and as a part of backend
|
|
||||||
* closing procedure.
|
|
||||||
*/
|
|
||||||
if (packet_type != MYSQL_COM_QUIT)
|
|
||||||
{
|
|
||||||
char* query_str = modutil_get_query(querybuf);
|
|
||||||
|
|
||||||
LOGIF(LE,
|
|
||||||
(skygw_log_write_flush(
|
|
||||||
LOGFILE_ERROR,
|
|
||||||
"Error: Failed to route %s:%s:\"%s\" to "
|
|
||||||
"backend server. %s.",
|
|
||||||
STRPACKETTYPE(packet_type),
|
|
||||||
STRQTYPE(qtype),
|
|
||||||
(query_str == NULL ? "(empty)" : query_str),
|
|
||||||
(rses_is_closed ? "Router was closed" :
|
|
||||||
"Router has no backend servers where to "
|
|
||||||
"route to"))));
|
|
||||||
free(querybuf);
|
|
||||||
}
|
|
||||||
goto retblock;
|
|
||||||
}
|
|
||||||
inst->stats.n_queries++;
|
|
||||||
|
|
||||||
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
|
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
|
||||||
|
|
||||||
CHK_DCB(master_dcb);
|
CHK_DCB(master_dcb);
|
||||||
|
|
||||||
data = (MYSQL_session*)master_dcb->session->data;
|
data = (MYSQL_session*)master_dcb->session->data;
|
||||||
dbname = data->db;
|
dbname = (char*)data->db;
|
||||||
|
|
||||||
switch(packet_type) {
|
if (is_drop_table_query(querybuf))
|
||||||
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
|
{
|
||||||
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
|
tbl = skygw_get_table_names(querybuf,&tsize,false);
|
||||||
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
|
|
||||||
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
|
|
||||||
case MYSQL_COM_PING: /*< 0e all servers are pinged */
|
|
||||||
case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
|
|
||||||
case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */
|
|
||||||
case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */
|
|
||||||
case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */
|
|
||||||
qtype = QUERY_TYPE_SESSION_WRITE;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */
|
for(i = 0; i<tsize; i++)
|
||||||
case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */
|
{
|
||||||
qtype = QUERY_TYPE_WRITE;
|
klen = strlen(dbname) + strlen(tbl[i]) + 2;
|
||||||
break;
|
hkey = calloc(klen,sizeof(char));
|
||||||
|
strcpy(hkey,dbname);
|
||||||
|
strcat(hkey,".");
|
||||||
|
strcat(hkey,tbl[i]);
|
||||||
|
|
||||||
case MYSQL_COM_QUERY:
|
if (rses_prop_tmp &&
|
||||||
qtype = query_classifier_get_type(querybuf);
|
rses_prop_tmp->rses_prop_data.temp_tables)
|
||||||
break;
|
{
|
||||||
|
if (hashtable_delete(rses_prop_tmp->rses_prop_data.temp_tables,
|
||||||
case MYSQL_COM_STMT_PREPARE:
|
(void *)hkey))
|
||||||
qtype = query_classifier_get_type(querybuf);
|
{
|
||||||
qtype |= QUERY_TYPE_PREPARE_STMT;
|
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
|
||||||
break;
|
"Temporary table dropped: %s",hkey)));
|
||||||
|
}
|
||||||
case MYSQL_COM_STMT_EXECUTE:
|
}
|
||||||
/** Parsing is not needed for this type of packet */
|
free(tbl[i]);
|
||||||
qtype = QUERY_TYPE_EXEC_STMT;
|
free(hkey);
|
||||||
break;
|
}
|
||||||
|
free(tbl);
|
||||||
case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
|
}
|
||||||
case MYSQL_COM_STATISTICS: /**< 9 ? */
|
}
|
||||||
case MYSQL_COM_PROCESS_INFO: /**< 0a ? */
|
|
||||||
case MYSQL_COM_CONNECT: /**< 0b ? */
|
|
||||||
case MYSQL_COM_PROCESS_KILL: /**< 0c ? */
|
|
||||||
case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */
|
|
||||||
case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */
|
|
||||||
case MYSQL_COM_DAEMON: /**< 1d ? */
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
} /**< switch by packet type */
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the query targets a temporary table
|
* Check if the query targets a temporary table.
|
||||||
|
* @param instance Router instance
|
||||||
|
* @param router_session Router client session
|
||||||
|
* @param querybuf GWBUF containing the query
|
||||||
|
* @param type The type of the query resolved so far
|
||||||
|
* @return The type of the query
|
||||||
*/
|
*/
|
||||||
|
skygw_query_type_t is_read_tmp_table(
|
||||||
|
ROUTER* instance,
|
||||||
|
void* router_session,
|
||||||
|
GWBUF* querybuf,
|
||||||
|
skygw_query_type_t type)
|
||||||
|
{
|
||||||
|
|
||||||
|
bool target_tmp_table = false;
|
||||||
|
int tsize = 0, klen = 0,i;
|
||||||
|
char** tbl;
|
||||||
|
char *hkey,*dbname;
|
||||||
|
MYSQL_session* data;
|
||||||
|
|
||||||
|
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||||
|
DCB* master_dcb = NULL;
|
||||||
|
skygw_query_type_t qtype = type;
|
||||||
|
rses_property_t* rses_prop_tmp;
|
||||||
|
|
||||||
|
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
|
||||||
|
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
|
||||||
|
|
||||||
|
CHK_DCB(master_dcb);
|
||||||
|
|
||||||
|
data = (MYSQL_session*)master_dcb->session->data;
|
||||||
|
dbname = (char*)data->db;
|
||||||
|
|
||||||
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ))
|
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_READ))
|
||||||
{
|
{
|
||||||
tbl = skygw_get_table_names(querybuf,&tsize);
|
tbl = skygw_get_table_names(querybuf,&tsize,false);
|
||||||
|
|
||||||
if (tsize > 0)
|
if (tsize > 0)
|
||||||
{
|
{
|
||||||
@ -1380,7 +1323,7 @@ static int routeQuery(
|
|||||||
hkey = calloc(klen,sizeof(char));
|
hkey = calloc(klen,sizeof(char));
|
||||||
strcpy(hkey,dbname);
|
strcpy(hkey,dbname);
|
||||||
strcat(hkey,".");
|
strcat(hkey,".");
|
||||||
strcat(hkey,tbl[0]);
|
strcat(hkey,tbl[i]);
|
||||||
|
|
||||||
if (rses_prop_tmp &&
|
if (rses_prop_tmp &&
|
||||||
rses_prop_tmp->rses_prop_data.temp_tables)
|
rses_prop_tmp->rses_prop_data.temp_tables)
|
||||||
@ -1396,62 +1339,56 @@ static int routeQuery(
|
|||||||
"Query targets a temporary table: %s",hkey)));
|
"Query targets a temporary table: %s",hkey)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
free(hkey);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (i = 0; i<tsize; i++)
|
free(hkey);
|
||||||
{
|
|
||||||
free(tbl[i]);
|
free(tbl[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
free(tbl);
|
free(tbl);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/**
|
|
||||||
* If autocommit is disabled or transaction is explicitly started
|
|
||||||
* transaction becomes active and master gets all statements until
|
|
||||||
* transaction is committed and autocommit is enabled again.
|
|
||||||
*/
|
|
||||||
if (router_cli_ses->rses_autocommit_enabled &&
|
|
||||||
QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT))
|
|
||||||
{
|
|
||||||
router_cli_ses->rses_autocommit_enabled = false;
|
|
||||||
|
|
||||||
if (!router_cli_ses->rses_transaction_active)
|
return qtype;
|
||||||
{
|
|
||||||
router_cli_ses->rses_transaction_active = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (!router_cli_ses->rses_transaction_active &&
|
|
||||||
QUERY_IS_TYPE(qtype, QUERY_TYPE_BEGIN_TRX))
|
|
||||||
{
|
|
||||||
router_cli_ses->rses_transaction_active = true;
|
|
||||||
}
|
|
||||||
/**
|
|
||||||
* Explicit COMMIT and ROLLBACK, implicit COMMIT.
|
|
||||||
*/
|
|
||||||
if (router_cli_ses->rses_autocommit_enabled &&
|
|
||||||
router_cli_ses->rses_transaction_active &&
|
|
||||||
(QUERY_IS_TYPE(qtype,QUERY_TYPE_COMMIT) ||
|
|
||||||
QUERY_IS_TYPE(qtype,QUERY_TYPE_ROLLBACK)))
|
|
||||||
{
|
|
||||||
router_cli_ses->rses_transaction_active = false;
|
|
||||||
}
|
|
||||||
else if (!router_cli_ses->rses_autocommit_enabled &&
|
|
||||||
QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT))
|
|
||||||
{
|
|
||||||
router_cli_ses->rses_autocommit_enabled = true;
|
|
||||||
router_cli_ses->rses_transaction_active = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* If query is of type QUERY_TYPE_CREATE_TMP_TABLE then find out
|
* If query is of type QUERY_TYPE_CREATE_TMP_TABLE then find out
|
||||||
* the database and table name, create a hashvalue and
|
* the database and table name, create a hashvalue and
|
||||||
* add it to the router client session's property. If property
|
* add it to the router client session's property. If property
|
||||||
* doesn't exist then create it first. If the query is DROP TABLE...
|
* doesn't exist then create it first.
|
||||||
* then see if it targets a temporary table and remove it from the hashtable
|
*
|
||||||
* if it does.
|
* @param instance Router instance
|
||||||
|
* @param router_session Router client session
|
||||||
|
* @param querybuf GWBUF containing the query
|
||||||
|
* @param type The type of the query resolved so far
|
||||||
*/
|
*/
|
||||||
if (QUERY_IS_TYPE(qtype, QUERY_TYPE_CREATE_TMP_TABLE))
|
void check_create_tmp_table(
|
||||||
|
ROUTER* instance,
|
||||||
|
void* router_session,
|
||||||
|
GWBUF* querybuf,
|
||||||
|
skygw_query_type_t type)
|
||||||
|
{
|
||||||
|
|
||||||
|
int klen = 0;
|
||||||
|
|
||||||
|
char *hkey,*dbname;
|
||||||
|
MYSQL_session* data;
|
||||||
|
|
||||||
|
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||||
|
DCB* master_dcb = NULL;
|
||||||
|
rses_property_t* rses_prop_tmp;
|
||||||
|
HASHTABLE* h;
|
||||||
|
|
||||||
|
rses_prop_tmp = router_cli_ses->rses_properties[RSES_PROP_TYPE_TMPTABLES];
|
||||||
|
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
|
||||||
|
|
||||||
|
CHK_DCB(master_dcb);
|
||||||
|
|
||||||
|
data = (MYSQL_session*)master_dcb->session->data;
|
||||||
|
dbname = (char*)data->db;
|
||||||
|
|
||||||
|
|
||||||
|
if (QUERY_IS_TYPE(type, QUERY_TYPE_CREATE_TMP_TABLE))
|
||||||
{
|
{
|
||||||
bool is_temp = true;
|
bool is_temp = true;
|
||||||
char* tblname = NULL;
|
char* tblname = NULL;
|
||||||
@ -1524,36 +1461,189 @@ static int routeQuery(
|
|||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
free(hkey);
|
free(hkey);
|
||||||
|
free(tblname);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Check if DROP TABLE... targets a temporary table */
|
/**
|
||||||
if (is_drop_table_query(querybuf))
|
* The main routing entry, this is called with every packet that is
|
||||||
|
* received and has to be forwarded to the backend database.
|
||||||
|
*
|
||||||
|
* The routeQuery will make the routing decision based on the contents
|
||||||
|
* of the instance, session and the query itself in the queue. The
|
||||||
|
* data in the queue may not represent a complete query, it represents
|
||||||
|
* the data that has been received. The query router itself is responsible
|
||||||
|
* for buffering the partial query, a later call to the query router will
|
||||||
|
* contain the remainder, or part thereof of the query.
|
||||||
|
*
|
||||||
|
* @param instance The query router instance
|
||||||
|
* @param session The session associated with the client
|
||||||
|
* @param queue Gateway buffer queue with the packets received
|
||||||
|
*
|
||||||
|
* @return if succeed 1, otherwise 0
|
||||||
|
* If routeQuery fails, it means that router session has failed.
|
||||||
|
* In any tolerated failure, handleError is called and if necessary,
|
||||||
|
* an error message is sent to the client.
|
||||||
|
*
|
||||||
|
* For now, routeQuery don't tolerate errors, so any error will close
|
||||||
|
* the session. vraa 14.6.14
|
||||||
|
*/
|
||||||
|
static int routeQuery(
|
||||||
|
ROUTER* instance,
|
||||||
|
void* router_session,
|
||||||
|
GWBUF* querybuf)
|
||||||
{
|
{
|
||||||
tbl = skygw_get_table_names(querybuf,&tsize);
|
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
|
||||||
|
char* querystr = NULL;
|
||||||
|
mysql_server_cmd_t packet_type;
|
||||||
|
uint8_t* packet;
|
||||||
|
int ret = 0;
|
||||||
|
DCB* master_dcb = NULL;
|
||||||
|
DCB* target_dcb = NULL;
|
||||||
|
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
||||||
|
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||||
|
bool rses_is_closed = false;
|
||||||
|
route_target_t route_target;
|
||||||
|
bool succp = false;
|
||||||
|
int rlag_max = MAX_RLAG_UNDEFINED;
|
||||||
|
backend_type_t btype; /*< target backend type */
|
||||||
|
|
||||||
for(i = 0; i<tsize; i++)
|
CHK_CLIENT_RSES(router_cli_ses);
|
||||||
{
|
|
||||||
klen = strlen(dbname) + strlen(tbl[i]) + 2;
|
|
||||||
hkey = calloc(klen,sizeof(char));
|
|
||||||
strcpy(hkey,dbname);
|
|
||||||
strcat(hkey,".");
|
|
||||||
strcat(hkey,tbl[i]);
|
|
||||||
|
|
||||||
if (rses_prop_tmp &&
|
/** Dirty read for quick check if router is closed. */
|
||||||
rses_prop_tmp->rses_prop_data.temp_tables)
|
if (router_cli_ses->rses_closed)
|
||||||
{
|
{
|
||||||
if (hashtable_delete(rses_prop_tmp->rses_prop_data.temp_tables,
|
rses_is_closed = true;
|
||||||
(void *)hkey))
|
}
|
||||||
|
|
||||||
|
ss_dassert(!GWBUF_IS_TYPE_UNDEFINED(querybuf));
|
||||||
|
|
||||||
|
packet = GWBUF_DATA(querybuf);
|
||||||
|
packet_type = packet[4];
|
||||||
|
|
||||||
|
|
||||||
|
if (rses_is_closed)
|
||||||
{
|
{
|
||||||
LOGIF(LT, (skygw_log_write(LOGFILE_TRACE,
|
/**
|
||||||
"Temporary table dropped: %s",hkey)));
|
* MYSQL_COM_QUIT may have sent by client and as a part of backend
|
||||||
|
* closing procedure.
|
||||||
|
*/
|
||||||
|
if (packet_type != MYSQL_COM_QUIT)
|
||||||
|
{
|
||||||
|
char* query_str = modutil_get_query(querybuf);
|
||||||
|
|
||||||
|
LOGIF(LE,
|
||||||
|
(skygw_log_write_flush(
|
||||||
|
LOGFILE_ERROR,
|
||||||
|
"Error: Failed to route %s:%s:\"%s\" to "
|
||||||
|
"backend server. %s.",
|
||||||
|
STRPACKETTYPE(packet_type),
|
||||||
|
STRQTYPE(qtype),
|
||||||
|
(query_str == NULL ? "(empty)" : query_str),
|
||||||
|
(rses_is_closed ? "Router was closed" :
|
||||||
|
"Router has no backend servers where to "
|
||||||
|
"route to"))));
|
||||||
|
free(querybuf);
|
||||||
|
}
|
||||||
|
goto retblock;
|
||||||
|
}
|
||||||
|
inst->stats.n_queries++;
|
||||||
|
|
||||||
|
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
|
||||||
|
CHK_DCB(master_dcb);
|
||||||
|
|
||||||
|
|
||||||
|
switch(packet_type) {
|
||||||
|
case MYSQL_COM_QUIT: /*< 1 QUIT will close all sessions */
|
||||||
|
case MYSQL_COM_INIT_DB: /*< 2 DDL must go to the master */
|
||||||
|
case MYSQL_COM_REFRESH: /*< 7 - I guess this is session but not sure */
|
||||||
|
case MYSQL_COM_DEBUG: /*< 0d all servers dump debug info to stdout */
|
||||||
|
case MYSQL_COM_PING: /*< 0e all servers are pinged */
|
||||||
|
case MYSQL_COM_CHANGE_USER: /*< 11 all servers change it accordingly */
|
||||||
|
case MYSQL_COM_STMT_CLOSE: /*< free prepared statement */
|
||||||
|
case MYSQL_COM_STMT_SEND_LONG_DATA: /*< send data to column */
|
||||||
|
case MYSQL_COM_STMT_RESET: /*< resets the data of a prepared statement */
|
||||||
|
qtype = QUERY_TYPE_SESSION_WRITE;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MYSQL_COM_CREATE_DB: /**< 5 DDL must go to the master */
|
||||||
|
case MYSQL_COM_DROP_DB: /**< 6 DDL must go to the master */
|
||||||
|
qtype = QUERY_TYPE_WRITE;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MYSQL_COM_QUERY:
|
||||||
|
qtype = query_classifier_get_type(querybuf);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MYSQL_COM_STMT_PREPARE:
|
||||||
|
qtype = query_classifier_get_type(querybuf);
|
||||||
|
qtype |= QUERY_TYPE_PREPARE_STMT;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MYSQL_COM_STMT_EXECUTE:
|
||||||
|
/** Parsing is not needed for this type of packet */
|
||||||
|
qtype = QUERY_TYPE_EXEC_STMT;
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MYSQL_COM_SHUTDOWN: /**< 8 where should shutdown be routed ? */
|
||||||
|
case MYSQL_COM_STATISTICS: /**< 9 ? */
|
||||||
|
case MYSQL_COM_PROCESS_INFO: /**< 0a ? */
|
||||||
|
case MYSQL_COM_CONNECT: /**< 0b ? */
|
||||||
|
case MYSQL_COM_PROCESS_KILL: /**< 0c ? */
|
||||||
|
case MYSQL_COM_TIME: /**< 0f should this be run in gateway ? */
|
||||||
|
case MYSQL_COM_DELAYED_INSERT: /**< 10 ? */
|
||||||
|
case MYSQL_COM_DAEMON: /**< 1d ? */
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
} /**< switch by packet type */
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the query has anything to do with temporary tables.
|
||||||
|
*/
|
||||||
|
|
||||||
|
qtype = is_read_tmp_table(instance,router_session,querybuf,qtype);
|
||||||
|
check_create_tmp_table(instance,router_session,querybuf,qtype);
|
||||||
|
check_drop_tmp_table(instance,router_session,querybuf,qtype);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If autocommit is disabled or transaction is explicitly started
|
||||||
|
* transaction becomes active and master gets all statements until
|
||||||
|
* transaction is committed and autocommit is enabled again.
|
||||||
|
*/
|
||||||
|
if (router_cli_ses->rses_autocommit_enabled &&
|
||||||
|
QUERY_IS_TYPE(qtype, QUERY_TYPE_DISABLE_AUTOCOMMIT))
|
||||||
|
{
|
||||||
|
router_cli_ses->rses_autocommit_enabled = false;
|
||||||
|
|
||||||
|
if (!router_cli_ses->rses_transaction_active)
|
||||||
|
{
|
||||||
|
router_cli_ses->rses_transaction_active = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
free(tbl[i]);
|
else if (!router_cli_ses->rses_transaction_active &&
|
||||||
free(hkey);
|
QUERY_IS_TYPE(qtype, QUERY_TYPE_BEGIN_TRX))
|
||||||
|
{
|
||||||
|
router_cli_ses->rses_transaction_active = true;
|
||||||
}
|
}
|
||||||
free(tbl);
|
/**
|
||||||
|
* Explicit COMMIT and ROLLBACK, implicit COMMIT.
|
||||||
|
*/
|
||||||
|
if (router_cli_ses->rses_autocommit_enabled &&
|
||||||
|
router_cli_ses->rses_transaction_active &&
|
||||||
|
(QUERY_IS_TYPE(qtype,QUERY_TYPE_COMMIT) ||
|
||||||
|
QUERY_IS_TYPE(qtype,QUERY_TYPE_ROLLBACK)))
|
||||||
|
{
|
||||||
|
router_cli_ses->rses_transaction_active = false;
|
||||||
}
|
}
|
||||||
|
else if (!router_cli_ses->rses_autocommit_enabled &&
|
||||||
|
QUERY_IS_TYPE(qtype, QUERY_TYPE_ENABLE_AUTOCOMMIT))
|
||||||
|
{
|
||||||
|
router_cli_ses->rses_autocommit_enabled = true;
|
||||||
|
router_cli_ses->rses_transaction_active = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Find out where to route the query. Result may not be clear; it is
|
* Find out where to route the query. Result may not be clear; it is
|
||||||
* possible to have a hint for routing to a named server which can
|
* possible to have a hint for routing to a named server which can
|
||||||
@ -3774,8 +3864,8 @@ static void print_error_packet(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
ss_dassert(srv != NULL);
|
ss_dassert(srv != NULL);
|
||||||
|
char* str = (char*)&ptr[7];
|
||||||
bufstr = strndup(&ptr[7], len-3);
|
bufstr = strndup(str, len-3);
|
||||||
|
|
||||||
LOGIF(LE, (skygw_log_write_flush(
|
LOGIF(LE, (skygw_log_write_flush(
|
||||||
LOGFILE_ERROR,
|
LOGFILE_ERROR,
|
||||||
|
|||||||
Reference in New Issue
Block a user