Changes related to canonical query format implementation.
query_classifier.cc: Now query can be parsed outside query_classifier_get_type by calling function parse_query. It creates parsing_info_t struct which is then added to the GWBUF which also includes the query. Parsing information follows the buffered query and it is freed at the same time with query buffer, in gwbuf_free. buffer.c: additions of parsing information to gwbuf struct. modutil.c: added function which returns query from GWBUF in plain text string. readwritesplit.c:routeQuery now only calls query_classifier_get_type to get the query type instead of extracting plain text query from the GWBUF buffer.
This commit is contained in:
@ -83,7 +83,7 @@ SHARED_BUF *sbuf;
|
||||
rval->sbuf = sbuf;
|
||||
rval->next = NULL;
|
||||
rval->gwbuf_type = GWBUF_TYPE_UNDEFINED;
|
||||
rval->command = 0;
|
||||
rval->gwbuf_parsing_info = NULL;
|
||||
CHK_GWBUF(rval);
|
||||
return rval;
|
||||
}
|
||||
@ -102,6 +102,11 @@ gwbuf_free(GWBUF *buf)
|
||||
free(buf->sbuf->data);
|
||||
free(buf->sbuf);
|
||||
}
|
||||
if (buf->gwbuf_parsing_info != NULL)
|
||||
{
|
||||
parsing_info_t* pi = (parsing_info_t *)buf->gwbuf_parsing_info;
|
||||
pi->pi_done_fp(pi);
|
||||
}
|
||||
free(buf);
|
||||
}
|
||||
|
||||
@ -131,6 +136,7 @@ GWBUF *rval;
|
||||
rval->end = buf->end;
|
||||
rval->gwbuf_type = buf->gwbuf_type;
|
||||
rval->next = NULL;
|
||||
rval->gwbuf_parsing_info = NULL;
|
||||
CHK_GWBUF(rval);
|
||||
return rval;
|
||||
}
|
||||
@ -157,6 +163,7 @@ GWBUF *gwbuf_clone_portion(
|
||||
clonebuf->end = (void *)((char *)clonebuf->start)+length;
|
||||
clonebuf->gwbuf_type = buf->gwbuf_type; /*< clone the type for now */
|
||||
clonebuf->next = NULL;
|
||||
clonebuf->gwbuf_parsing_info = NULL;
|
||||
CHK_GWBUF(clonebuf);
|
||||
return clonebuf;
|
||||
|
||||
@ -336,5 +343,10 @@ void gwbuf_set_type(
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void* gwbuf_get_parsing_info(
|
||||
GWBUF* buf)
|
||||
{
|
||||
CHK_GWBUF(buf);
|
||||
return buf->gwbuf_parsing_info;
|
||||
}
|
||||
|
||||
|
||||
@ -29,6 +29,7 @@
|
||||
*/
|
||||
#include <buffer.h>
|
||||
#include <string.h>
|
||||
#include <mysql_client_server_protocol.h>
|
||||
|
||||
/**
|
||||
* Check if a GWBUF structure is a MySQL COM_QUERY packet
|
||||
@ -171,3 +172,57 @@ GWBUF *addition;
|
||||
|
||||
return orig;
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy query string from GWBUF buffer to separate memory area.
|
||||
*
|
||||
* @param buf GWBUF buffer including the query
|
||||
*
|
||||
* @return Plaint text query if the packet type is COM_QUERY. Otherwise return
|
||||
* a string including the packet type.
|
||||
*/
|
||||
char* modutil_get_query(
|
||||
GWBUF* buf)
|
||||
{
|
||||
uint8_t* packet;
|
||||
mysql_server_cmd_t packet_type;
|
||||
size_t len;
|
||||
char* query_str;
|
||||
|
||||
packet = GWBUF_DATA(buf);
|
||||
packet_type = packet[4];
|
||||
|
||||
switch (packet_type) {
|
||||
case MYSQL_COM_QUIT:
|
||||
len = strlen("[Quit msg]")+1;
|
||||
if ((query_str = (char *)malloc(len+1)) == NULL)
|
||||
{
|
||||
goto retblock;
|
||||
}
|
||||
memcpy(query_str, "[Quit msg]", len);
|
||||
memset(&query_str[len], 0, 1);
|
||||
break;
|
||||
|
||||
case MYSQL_COM_QUERY:
|
||||
len = MYSQL_GET_PACKET_LEN(packet)-1; /*< distract 1 for packet type byte */
|
||||
if ((query_str = (char *)malloc(len+1)) == NULL)
|
||||
{
|
||||
goto retblock;
|
||||
}
|
||||
memcpy(query_str, &packet[5], len);
|
||||
memset(&query_str[len], 0, 1);
|
||||
break;
|
||||
|
||||
default:
|
||||
len = strlen(STRPACKETTYPE(packet_type))+1;
|
||||
if ((query_str = (char *)malloc(len+1)) == NULL)
|
||||
{
|
||||
goto retblock;
|
||||
}
|
||||
memcpy(query_str, STRPACKETTYPE(packet_type), len);
|
||||
memset(&query_str[len], 0, 1);
|
||||
break;
|
||||
} /*< switch */
|
||||
retblock:
|
||||
return query_str;
|
||||
}
|
||||
@ -43,6 +43,7 @@
|
||||
*/
|
||||
#include <skygw_debug.h>
|
||||
|
||||
EXTERN_C_BLOCK_BEGIN
|
||||
|
||||
typedef enum
|
||||
{
|
||||
@ -73,6 +74,20 @@ typedef struct {
|
||||
int refcount; /*< Reference count on the buffer */
|
||||
} SHARED_BUF;
|
||||
|
||||
|
||||
typedef struct parsing_info_st {
|
||||
#if defined(SS_DEBUG)
|
||||
skygw_chk_t pi_chk_top;
|
||||
#endif
|
||||
void* pi_handle; /*< parsing info object pointer */
|
||||
char* pi_query_plain_str; /*< query as plain string */
|
||||
void (*pi_done_fp)(void *); /*< clean-up function for parsing info */
|
||||
#if defined(SS_DEBUG)
|
||||
skygw_chk_t pi_chk_tail;
|
||||
#endif
|
||||
} parsing_info_t;
|
||||
|
||||
|
||||
/**
|
||||
* The buffer structure used by the descriptor control blocks.
|
||||
*
|
||||
@ -86,7 +101,7 @@ typedef struct gwbuf {
|
||||
void *start; /*< Start of the valid data */
|
||||
void *end; /*< First byte after the valid data */
|
||||
SHARED_BUF *sbuf; /*< The shared buffer with the real data */
|
||||
int command;/*< The command type for the queue */
|
||||
void *gwbuf_parsing_info; /*< parsing info object pointer */
|
||||
gwbuf_type_t gwbuf_type; /*< buffer's data type information */
|
||||
} GWBUF;
|
||||
|
||||
@ -121,4 +136,10 @@ extern unsigned int gwbuf_length(GWBUF *head);
|
||||
extern GWBUF *gwbuf_clone_portion(GWBUF *head, size_t offset, size_t len);
|
||||
extern GWBUF *gwbuf_clone_transform(GWBUF *head, gwbuf_type_t type);
|
||||
extern void gwbuf_set_type(GWBUF *head, gwbuf_type_t type);
|
||||
void* gwbuf_get_parsing_info(GWBUF* buf);
|
||||
|
||||
|
||||
EXTERN_C_BLOCK_END
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
@ -36,4 +36,6 @@ extern int modutil_is_SQL(GWBUF *);
|
||||
extern int modutil_extract_SQL(GWBUF *, char **, int *);
|
||||
extern int modutil_MySQL_Query(GWBUF *, char **, int *, int *);
|
||||
extern GWBUF *modutil_replace_SQL(GWBUF *, char *);
|
||||
char* modutil_get_query(GWBUF* buf);
|
||||
|
||||
#endif
|
||||
|
||||
@ -31,6 +31,7 @@
|
||||
#include <dcb.h>
|
||||
#include <spinlock.h>
|
||||
#include <modinfo.h>
|
||||
#include <modutil.h>
|
||||
#include <mysql_client_server_protocol.h>
|
||||
|
||||
MODULE_INFO info = {
|
||||
@ -703,7 +704,7 @@ static void* newSession(
|
||||
backend_ref[i].bref_sescmd_cur.scmd_cur_ptr_property =
|
||||
&client_rses->rses_properties[RSES_PROP_TYPE_SESCMD];
|
||||
backend_ref[i].bref_sescmd_cur.scmd_cur_cmd = NULL;
|
||||
}
|
||||
}
|
||||
max_nslaves = rses_get_max_slavecount(client_rses, router_nservers);
|
||||
max_slave_rlag = rses_get_max_replication_lag(client_rses);
|
||||
|
||||
@ -1031,9 +1032,6 @@ static int routeQuery(
|
||||
GWBUF* querybuf)
|
||||
{
|
||||
skygw_query_type_t qtype = QUERY_TYPE_UNKNOWN;
|
||||
GWBUF* plainsqlbuf = NULL;
|
||||
char* querystr = NULL;
|
||||
char* startpos;
|
||||
mysql_server_cmd_t packet_type;
|
||||
uint8_t* packet;
|
||||
int ret = 0;
|
||||
@ -1042,8 +1040,6 @@ static int routeQuery(
|
||||
ROUTER_INSTANCE* inst = (ROUTER_INSTANCE *)instance;
|
||||
ROUTER_CLIENT_SES* router_cli_ses = (ROUTER_CLIENT_SES *)router_session;
|
||||
bool rses_is_closed = false;
|
||||
size_t len;
|
||||
MYSQL* mysql = NULL;
|
||||
|
||||
CHK_CLIENT_RSES(router_cli_ses);
|
||||
|
||||
@ -1066,6 +1062,8 @@ static int routeQuery(
|
||||
*/
|
||||
if (packet_type != MYSQL_COM_QUIT)
|
||||
{
|
||||
char* query_str = modutil_get_query(querybuf);
|
||||
|
||||
LOGIF(LE,
|
||||
(skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
@ -1073,15 +1071,15 @@ static int routeQuery(
|
||||
"backend server. %s.",
|
||||
STRPACKETTYPE(packet_type),
|
||||
STRQTYPE(qtype),
|
||||
(querystr == NULL ? "(empty)" : querystr),
|
||||
(query_str == NULL ? "(empty)" : query_str),
|
||||
(rses_is_closed ? "Router was closed" :
|
||||
"Router has no backend servers where to "
|
||||
"route to"))));
|
||||
free(querybuf);
|
||||
}
|
||||
goto return_ret;
|
||||
}
|
||||
inst->stats.n_queries++;
|
||||
startpos = (char *)&packet[5];
|
||||
|
||||
master_dcb = router_cli_ses->rses_master_ref->bref_dcb;
|
||||
CHK_DCB(master_dcb);
|
||||
@ -1105,44 +1103,16 @@ static int routeQuery(
|
||||
break;
|
||||
|
||||
case MYSQL_COM_QUERY:
|
||||
plainsqlbuf = gwbuf_clone_transform(querybuf,
|
||||
GWBUF_TYPE_PLAINSQL);
|
||||
len = GWBUF_LENGTH(plainsqlbuf);
|
||||
/** unnecessary if buffer includes additional terminating null */
|
||||
querystr = (char *)malloc(len+1);
|
||||
memcpy(querystr, startpos, len);
|
||||
memset(&querystr[len], 0, 1);
|
||||
/**
|
||||
* Use mysql handle to query information from parse tree.
|
||||
* call skygw_query_classifier_free before exit!
|
||||
*/
|
||||
qtype = skygw_query_classifier_get_type(querystr, 0, &mysql);
|
||||
qtype = query_classifier_get_type(querybuf);
|
||||
break;
|
||||
|
||||
case MYSQL_COM_STMT_PREPARE:
|
||||
plainsqlbuf = gwbuf_clone_transform(querybuf,
|
||||
GWBUF_TYPE_PLAINSQL);
|
||||
len = GWBUF_LENGTH(plainsqlbuf);
|
||||
/** unnecessary if buffer includes additional terminating null */
|
||||
querystr = (char *)malloc(len+1);
|
||||
memcpy(querystr, startpos, len);
|
||||
memset(&querystr[len], 0, 1);
|
||||
qtype = skygw_query_classifier_get_type(querystr, 0, &mysql);
|
||||
qtype = query_classifier_get_type(querybuf);
|
||||
qtype |= QUERY_TYPE_PREPARE_STMT;
|
||||
break;
|
||||
|
||||
case MYSQL_COM_STMT_EXECUTE:
|
||||
/** Parsing is not needed for this type of packet */
|
||||
#if defined(NOT_USED)
|
||||
plainsqlbuf = gwbuf_clone_transform(querybuf,
|
||||
GWBUF_TYPE_PLAINSQL);
|
||||
len = GWBUF_LENGTH(plainsqlbuf);
|
||||
/** unnecessary if buffer includes additional terminating null */
|
||||
querystr = (char *)malloc(len+1);
|
||||
memcpy(querystr, startpos, len);
|
||||
memset(&querystr[len], 0, 1);
|
||||
qtype = skygw_query_classifier_get_type(querystr, 0, &mysql);
|
||||
#endif
|
||||
qtype = QUERY_TYPE_EXEC_STMT;
|
||||
break;
|
||||
|
||||
@ -1207,7 +1177,7 @@ static int routeQuery(
|
||||
*/
|
||||
bool succp = route_session_write(
|
||||
router_cli_ses,
|
||||
querybuf,
|
||||
gwbuf_clone(querybuf),
|
||||
inst,
|
||||
packet_type,
|
||||
qtype);
|
||||
@ -1238,7 +1208,7 @@ static int routeQuery(
|
||||
|
||||
if (succp)
|
||||
{
|
||||
if ((ret = slave_dcb->func.write(slave_dcb, querybuf)) == 1)
|
||||
if ((ret = slave_dcb->func.write(slave_dcb, gwbuf_clone(querybuf))) == 1)
|
||||
{
|
||||
backend_ref_t* bref;
|
||||
|
||||
@ -1252,10 +1222,12 @@ static int routeQuery(
|
||||
}
|
||||
else
|
||||
{
|
||||
char* query_str = modutil_get_query(querybuf);
|
||||
LOGIF(LE, (skygw_log_write_flush(
|
||||
LOGFILE_ERROR,
|
||||
"Error : Routing query \"%s\" failed.",
|
||||
querystr)));
|
||||
(query_str == NULL ? "not available" : query_str))));
|
||||
free(query_str);
|
||||
}
|
||||
}
|
||||
rses_end_locked_router_action(router_cli_ses);
|
||||
@ -1296,7 +1268,7 @@ static int routeQuery(
|
||||
|
||||
if (succp)
|
||||
{
|
||||
if ((ret = master_dcb->func.write(master_dcb, querybuf)) == 1)
|
||||
if ((ret = master_dcb->func.write(master_dcb, gwbuf_clone(querybuf))) == 1)
|
||||
{
|
||||
backend_ref_t* bref;
|
||||
|
||||
@ -1323,11 +1295,10 @@ static int routeQuery(
|
||||
}
|
||||
return_ret:
|
||||
#if defined(SS_DEBUG)
|
||||
if (mysql != NULL && true)
|
||||
{
|
||||
char* canonical_query_str;
|
||||
|
||||
canonical_query_str = skygw_get_canonical(mysql, querystr);
|
||||
canonical_query_str = skygw_get_canonical(querybuf);
|
||||
|
||||
if (canonical_query_str != NULL)
|
||||
{
|
||||
@ -1339,18 +1310,7 @@ return_ret:
|
||||
}
|
||||
}
|
||||
#endif
|
||||
if (plainsqlbuf != NULL)
|
||||
{
|
||||
gwbuf_free(plainsqlbuf);
|
||||
}
|
||||
if (querystr != NULL)
|
||||
{
|
||||
free(querystr);
|
||||
}
|
||||
if (mysql != NULL)
|
||||
{
|
||||
skygw_query_classifier_free(mysql);
|
||||
}
|
||||
gwbuf_free(querybuf);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user